summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/backgroundwriter.c256
1 files changed, 128 insertions, 128 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c
index 0e07be64..8016b3d1 100644
--- a/tools/backgroundwriter.c
+++ b/tools/backgroundwriter.c
@@ -4,7 +4,7 @@
* See the main source file 'xineliboutput.c' for copyright information and
* how to reach the author.
*
- * $Id: backgroundwriter.c,v 1.20 2010-03-12 22:42:02 phintuka Exp $
+ * $Id: backgroundwriter.c,v 1.21 2010-03-12 22:43:18 phintuka Exp $
*
*/
@@ -48,8 +48,8 @@ cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin)
#if defined(TCP_CORK)
int iCork = 1;
- if(setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) {
- if(errno != ENOTSOCK)
+ if (setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) {
+ if (errno != ENOTSOCK)
LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed");
m_IsSocket = false;
errno = 0;
@@ -58,8 +58,8 @@ cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin)
}
#elif defined(TCP_NOPUSH)
int iCork = 1;
- if(setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &iCork, sizeof(int))) {
- if(errno != ENOTSOCK)
+ if (setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &iCork, sizeof(int))) {
+ if (errno != ENOTSOCK)
LOGERR("cBackgroundWriter: setsockopt(TCP_NOPUSH) failed");
m_IsSocket = false;
errno = 0;
@@ -94,14 +94,14 @@ void cBackgroundWriterI::Cork(void)
if (m_IsSocket) {
#if defined(TCP_CORK)
int i = 1;
- if(setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) {
+ if (setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) {
LOGERR("cBackgroundWriter: setsockopt(TCP_NODELAY) failed");
errno = 0;
}
#elif defined(TCP_NOPUSH)
int On = 1, Off = 0;
- if(setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &Off, sizeof(int)) ||
- setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &On, sizeof(int))) {
+ if (setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &Off, sizeof(int)) ||
+ setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &On, sizeof(int))) {
LOGERR("cBackgroundWriter: setsockopt(TCP_NOPUSH) failed");
errno = 0;
}
@@ -114,17 +114,17 @@ bool cBackgroundWriterI::Flush(int TimeoutMs)
uint64_t WaitEnd = cTimeMs::Now();
// wait for ring buffer to drain
- if(TimeoutMs > 0) {
+ if (TimeoutMs > 0) {
WaitEnd += (uint64_t)TimeoutMs;
- while(cTimeMs::Now() < WaitEnd &&
- Running() &&
- m_RingBuffer.Available() > 0)
+ while (cTimeMs::Now() < WaitEnd &&
+ Running() &&
+ m_RingBuffer.Available() > 0)
cCondWait::SleepMs(3);
}
int Available = m_RingBuffer.Available();
- if(m_IsSocket && Available <= 0) {
+ if (m_IsSocket && Available <= 0) {
// flush corked data too
Cork();
}
@@ -156,99 +156,99 @@ void cTcpWriter::Action(void)
if (!Poller.Poll(100))
continue;
- if (CorkReq && m_RingBuffer.Available() <= 0) {
- // Force TCP packet to avoid delaying control messages
- Cork();
- CorkReq = false;
- }
+ if (CorkReq && m_RingBuffer.Available() <= 0) {
+ // Force TCP packet to avoid delaying control messages
+ Cork();
+ CorkReq = false;
+ }
- uint64_t StartPos;
- int Count = 0;
- int n;
- uchar *Data = m_RingBuffer.Get(Count);
+ uint64_t StartPos;
+ int Count = 0;
+ int n;
+ uchar *Data = m_RingBuffer.Get(Count);
if (!Data || Count <= 0)
continue;
- Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
- StartPos = m_DiscardEnd;
- Unlock();
+ Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
+ StartPos = m_DiscardEnd;
+ Unlock();
// Next frame ?
if (NextHeaderPos == GetPos) {
- // Discard data ?
- if(StartPos > GetPos) {
- // we're at frame boundary
- // drop only data packets, not control messages
- uint8_t *pkt = TCP_PAYLOAD(Data);
- if (DATA_IS_PES(pkt) || DATA_IS_TS(pkt)) {
- Count = min(Count, (int)(StartPos - GetPos));
-
- // size of next (complete) packet.
- // drop only one packet at time.
- stream_tcp_header_t *header = (stream_tcp_header_t*)Data;
- int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t);
- if (Count >= pkt_len) {
- // drop only complete packets.
- // some packets are not dropped (packets overlapping end of ringbuffer)
- Count = pkt_len;
-
- m_RingBuffer.Del(Count);
- GetPos += Count;
- NextHeaderPos = GetPos;
-
- CorkReq = true; // force sending last frame
-
- continue;
- }
- }
+ // Discard data ?
+ if (StartPos > GetPos) {
+ // we're at frame boundary
+ // drop only data packets, not control messages
+ uint8_t *pkt = TCP_PAYLOAD(Data);
+ if (DATA_IS_PES(pkt) || DATA_IS_TS(pkt)) {
+ Count = min(Count, (int)(StartPos - GetPos));
+
+ // size of next (complete) packet.
+ // drop only one packet at time.
+ stream_tcp_header_t *header = (stream_tcp_header_t*)Data;
+ int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t);
+ if (Count >= pkt_len) {
+ // drop only complete packets.
+ // some packets are not dropped (packets overlapping end of ringbuffer)
+ Count = pkt_len;
+
+ m_RingBuffer.Del(Count);
+ GetPos += Count;
+ NextHeaderPos = GetPos;
+
+ CorkReq = true; // force sending last frame
+
+ continue;
}
+ }
+ }
// Next frame
- if(Count < (int)sizeof(stream_tcp_header_t))
- LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");
+ if (Count < (int)sizeof(stream_tcp_header_t))
+ LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");
- // limit single write to size of next (complete) packet.
- // (we need to track packet boundaries)
- stream_tcp_header_t *header = (stream_tcp_header_t*)Data;
- int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t);
- if (Count > pkt_len)
- Count = pkt_len;
- // next packet start position in stream
- NextHeaderPos = GetPos + pkt_len;
+ // limit single write to size of next (complete) packet.
+ // (we need to track packet boundaries)
+ stream_tcp_header_t *header = (stream_tcp_header_t*)Data;
+ int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t);
+ if (Count > pkt_len)
+ Count = pkt_len;
+ // next packet start position in stream
+ NextHeaderPos = GetPos + pkt_len;
- // check for control message
- uint8_t *pkt = TCP_PAYLOAD(Data);
- if (!DATA_IS_PES(pkt) && !DATA_IS_TS(pkt))
- CorkReq = true;
+ // check for control message
+ uint8_t *pkt = TCP_PAYLOAD(Data);
+ if (!DATA_IS_PES(pkt) && !DATA_IS_TS(pkt))
+ CorkReq = true;
- } else {
+ } else {
// end of prev frame
- Count = min(Count, (int)(NextHeaderPos-GetPos));
- }
+ Count = min(Count, (int)(NextHeaderPos-GetPos));
+ }
- errno = 0;
- n = write(m_fd, Data, Count);
+ errno = 0;
+ n = write(m_fd, Data, Count);
if (n <= 0) {
- if(n == 0) {
- LOGERR("cBackgroundWriter: Client disconnected data stream ?");
- break;
+ if (n == 0) {
+ LOGERR("cBackgroundWriter: Client disconnected data stream ?");
+ break;
}
if (errno == EINTR || errno == EWOULDBLOCK)
- continue;
+ continue;
- LOGERR("cBackgroundWriter: TCP write error");
- break;
- }
+ LOGERR("cBackgroundWriter: TCP write error");
+ break;
+ }
- GetPos += n;
- m_RingBuffer.Del(n);
- }
+ GetPos += n;
+ m_RingBuffer.Del(n);
+ }
m_RingBuffer.Clear();
}
@@ -271,9 +271,9 @@ int cTcpWriter::Put(const uchar *Header, int HeaderCount,
// Serialize Put access to keep Data and Header together
LOCK_THREAD;
- if(m_RingBuffer.Free() < HeaderCount+DataCount) {
+ if (m_RingBuffer.Free() < HeaderCount+DataCount) {
//LOGMSG("cXinelibServer: TCP buffer overflow !");
- if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) {
+ if (m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) {
LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client");
m_RingBuffer.Clear();
Cancel(-1);
@@ -283,7 +283,7 @@ int cTcpWriter::Put(const uchar *Header, int HeaderCount,
}
int n = m_RingBuffer.Put(Header, HeaderCount) +
m_RingBuffer.Put(Data, DataCount);
- if(n == HeaderCount+DataCount) {
+ if (n == HeaderCount + DataCount) {
m_BufferOverflows = 0;
m_PutPos += n;
return n;
@@ -312,78 +312,78 @@ cRawWriter::cRawWriter(int fd, int Size) :
void cRawWriter::Action(void)
{
uint64_t NextHeaderPos = 0ULL;
- uint64_t GetPos = 0ULL;
- cPoller Poller(m_fd, true);
+ uint64_t GetPos = 0ULL;
+ cPoller Poller(m_fd, true);
while (Running()) {
if (!Poller.Poll(100))
continue;
- uint64_t StartPos;
- int Count = 0;
- int n;
- uchar *Data = m_RingBuffer.Get(Count);
+ uint64_t StartPos;
+ int Count = 0;
+ int n;
+ uchar *Data = m_RingBuffer.Get(Count);
if (!Data || Count <= 0)
continue;
- Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
- StartPos = m_DiscardEnd;
- Unlock();
+ Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
+ StartPos = m_DiscardEnd;
+ Unlock();
// Next frame ?
if (NextHeaderPos == GetPos) {
- // Discard data ?
- if(StartPos > GetPos) {
- // we're at frame boundary
- Count = min(Count, (int)(StartPos - GetPos));
+ // Discard data ?
+ if (StartPos > GetPos) {
+ // we're at frame boundary
+ Count = min(Count, (int)(StartPos - GetPos));
- m_RingBuffer.Del(Count);
- GetPos += Count;
- NextHeaderPos = GetPos;
- continue;
- }
+ m_RingBuffer.Del(Count);
+ GetPos += Count;
+ NextHeaderPos = GetPos;
+ continue;
+ }
// Next frame
- if(Count < 6)
- LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");
+ if (Count < 6)
+ LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");
- int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count);
+ int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count);
- if(Count < packlen)
- ;//LOGMSG("Count = %d < %d", Count,
- // header->len + sizeof(stream_tcp_header_t));
- else
- Count = packlen;
- NextHeaderPos = GetPos + packlen;
+ if (Count < packlen)
+ ;//LOGMSG("Count = %d < %d", Count,
+ // header->len + sizeof(stream_tcp_header_t));
+ else
+ Count = packlen;
+ NextHeaderPos = GetPos + packlen;
- } else {
+ } else {
// end of prev frame
- Count = min(Count, (int)(NextHeaderPos-GetPos));
- }
+ Count = min(Count, (int)(NextHeaderPos-GetPos));
+ }
- errno = 0;
- n = write(m_fd, Data, Count);
+ errno = 0;
+ n = write(m_fd, Data, Count);
if (n <= 0) {
- if(n == 0) {
- LOGERR("cBackgroundWriter: Client disconnected data stream ?");
- break;
+ if (n == 0) {
+ LOGERR("cBackgroundWriter: Client disconnected data stream ?");
+ break;
}
if (errno == EINTR || errno == EWOULDBLOCK)
- continue;
+ continue;
- LOGERR("cBackgroundWriter: TCP write error");
- break;
- }
+ LOGERR("cBackgroundWriter: TCP write error");
+ break;
+ }
- GetPos += n;
- m_RingBuffer.Del(n);
- }
+ GetPos += n;
+ m_RingBuffer.Del(n);
+ }
m_RingBuffer.Clear();
}
@@ -396,8 +396,8 @@ int cRawWriter::Put(uint64_t StreamPos,
// Serialize Put access to keep Data and Header together
LOCK_THREAD;
- if(m_RingBuffer.Free() < DataCount) {
- if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) {
+ if (m_RingBuffer.Free() < DataCount) {
+ if (m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) {
LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client");
m_RingBuffer.Clear();
Cancel(-1);
@@ -406,7 +406,7 @@ int cRawWriter::Put(uint64_t StreamPos,
return -DataCount;
}
int n = m_RingBuffer.Put(Data, DataCount);
- if(n == DataCount) {
+ if (n == DataCount) {
m_BufferOverflows = 0;
m_PutPos += n;
return n;