diff options
-rw-r--r-- | tools/backgroundwriter.c | 256 |
1 files changed, 128 insertions, 128 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c index 0e07be64..8016b3d1 100644 --- a/tools/backgroundwriter.c +++ b/tools/backgroundwriter.c @@ -4,7 +4,7 @@ * See the main source file 'xineliboutput.c' for copyright information and * how to reach the author. * - * $Id: backgroundwriter.c,v 1.20 2010-03-12 22:42:02 phintuka Exp $ + * $Id: backgroundwriter.c,v 1.21 2010-03-12 22:43:18 phintuka Exp $ * */ @@ -48,8 +48,8 @@ cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) #if defined(TCP_CORK) int iCork = 1; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) { - if(errno != ENOTSOCK) + if (setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) { + if (errno != ENOTSOCK) LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed"); m_IsSocket = false; errno = 0; @@ -58,8 +58,8 @@ cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) } #elif defined(TCP_NOPUSH) int iCork = 1; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &iCork, sizeof(int))) { - if(errno != ENOTSOCK) + if (setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &iCork, sizeof(int))) { + if (errno != ENOTSOCK) LOGERR("cBackgroundWriter: setsockopt(TCP_NOPUSH) failed"); m_IsSocket = false; errno = 0; @@ -94,14 +94,14 @@ void cBackgroundWriterI::Cork(void) if (m_IsSocket) { #if defined(TCP_CORK) int i = 1; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) { + if (setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) { LOGERR("cBackgroundWriter: setsockopt(TCP_NODELAY) failed"); errno = 0; } #elif defined(TCP_NOPUSH) int On = 1, Off = 0; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &Off, sizeof(int)) || - setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &On, sizeof(int))) { + if (setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &Off, sizeof(int)) || + setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &On, sizeof(int))) { LOGERR("cBackgroundWriter: setsockopt(TCP_NOPUSH) failed"); errno = 0; } @@ -114,17 +114,17 @@ bool cBackgroundWriterI::Flush(int TimeoutMs) uint64_t WaitEnd = cTimeMs::Now(); // wait for ring buffer to drain - if(TimeoutMs > 0) { + if (TimeoutMs > 0) { WaitEnd += (uint64_t)TimeoutMs; - while(cTimeMs::Now() < WaitEnd && - Running() && - m_RingBuffer.Available() > 0) + while (cTimeMs::Now() < WaitEnd && + Running() && + m_RingBuffer.Available() > 0) cCondWait::SleepMs(3); } int Available = m_RingBuffer.Available(); - if(m_IsSocket && Available <= 0) { + if (m_IsSocket && Available <= 0) { // flush corked data too Cork(); } @@ -156,99 +156,99 @@ void cTcpWriter::Action(void) if (!Poller.Poll(100)) continue; - if (CorkReq && m_RingBuffer.Available() <= 0) { - // Force TCP packet to avoid delaying control messages - Cork(); - CorkReq = false; - } + if (CorkReq && m_RingBuffer.Available() <= 0) { + // Force TCP packet to avoid delaying control messages + Cork(); + CorkReq = false; + } - uint64_t StartPos; - int Count = 0; - int n; - uchar *Data = m_RingBuffer.Get(Count); + uint64_t StartPos; + int Count = 0; + int n; + uchar *Data = m_RingBuffer.Get(Count); if (!Data || Count <= 0) continue; - Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) - StartPos = m_DiscardEnd; - Unlock(); + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + StartPos = m_DiscardEnd; + Unlock(); // Next frame ? if (NextHeaderPos == GetPos) { - // Discard data ? - if(StartPos > GetPos) { - // we're at frame boundary - // drop only data packets, not control messages - uint8_t *pkt = TCP_PAYLOAD(Data); - if (DATA_IS_PES(pkt) || DATA_IS_TS(pkt)) { - Count = min(Count, (int)(StartPos - GetPos)); - - // size of next (complete) packet. - // drop only one packet at time. - stream_tcp_header_t *header = (stream_tcp_header_t*)Data; - int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t); - if (Count >= pkt_len) { - // drop only complete packets. - // some packets are not dropped (packets overlapping end of ringbuffer) - Count = pkt_len; - - m_RingBuffer.Del(Count); - GetPos += Count; - NextHeaderPos = GetPos; - - CorkReq = true; // force sending last frame - - continue; - } - } + // Discard data ? + if (StartPos > GetPos) { + // we're at frame boundary + // drop only data packets, not control messages + uint8_t *pkt = TCP_PAYLOAD(Data); + if (DATA_IS_PES(pkt) || DATA_IS_TS(pkt)) { + Count = min(Count, (int)(StartPos - GetPos)); + + // size of next (complete) packet. + // drop only one packet at time. + stream_tcp_header_t *header = (stream_tcp_header_t*)Data; + int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t); + if (Count >= pkt_len) { + // drop only complete packets. + // some packets are not dropped (packets overlapping end of ringbuffer) + Count = pkt_len; + + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; + + CorkReq = true; // force sending last frame + + continue; } + } + } // Next frame - if(Count < (int)sizeof(stream_tcp_header_t)) - LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + if (Count < (int)sizeof(stream_tcp_header_t)) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); - // limit single write to size of next (complete) packet. - // (we need to track packet boundaries) - stream_tcp_header_t *header = (stream_tcp_header_t*)Data; - int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t); - if (Count > pkt_len) - Count = pkt_len; - // next packet start position in stream - NextHeaderPos = GetPos + pkt_len; + // limit single write to size of next (complete) packet. + // (we need to track packet boundaries) + stream_tcp_header_t *header = (stream_tcp_header_t*)Data; + int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t); + if (Count > pkt_len) + Count = pkt_len; + // next packet start position in stream + NextHeaderPos = GetPos + pkt_len; - // check for control message - uint8_t *pkt = TCP_PAYLOAD(Data); - if (!DATA_IS_PES(pkt) && !DATA_IS_TS(pkt)) - CorkReq = true; + // check for control message + uint8_t *pkt = TCP_PAYLOAD(Data); + if (!DATA_IS_PES(pkt) && !DATA_IS_TS(pkt)) + CorkReq = true; - } else { + } else { // end of prev frame - Count = min(Count, (int)(NextHeaderPos-GetPos)); - } + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } - errno = 0; - n = write(m_fd, Data, Count); + errno = 0; + n = write(m_fd, Data, Count); if (n <= 0) { - if(n == 0) { - LOGERR("cBackgroundWriter: Client disconnected data stream ?"); - break; + if (n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; } if (errno == EINTR || errno == EWOULDBLOCK) - continue; + continue; - LOGERR("cBackgroundWriter: TCP write error"); - break; - } + LOGERR("cBackgroundWriter: TCP write error"); + break; + } - GetPos += n; - m_RingBuffer.Del(n); - } + GetPos += n; + m_RingBuffer.Del(n); + } m_RingBuffer.Clear(); } @@ -271,9 +271,9 @@ int cTcpWriter::Put(const uchar *Header, int HeaderCount, // Serialize Put access to keep Data and Header together LOCK_THREAD; - if(m_RingBuffer.Free() < HeaderCount+DataCount) { + if (m_RingBuffer.Free() < HeaderCount+DataCount) { //LOGMSG("cXinelibServer: TCP buffer overflow !"); - if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { + if (m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); m_RingBuffer.Clear(); Cancel(-1); @@ -283,7 +283,7 @@ int cTcpWriter::Put(const uchar *Header, int HeaderCount, } int n = m_RingBuffer.Put(Header, HeaderCount) + m_RingBuffer.Put(Data, DataCount); - if(n == HeaderCount+DataCount) { + if (n == HeaderCount + DataCount) { m_BufferOverflows = 0; m_PutPos += n; return n; @@ -312,78 +312,78 @@ cRawWriter::cRawWriter(int fd, int Size) : void cRawWriter::Action(void) { uint64_t NextHeaderPos = 0ULL; - uint64_t GetPos = 0ULL; - cPoller Poller(m_fd, true); + uint64_t GetPos = 0ULL; + cPoller Poller(m_fd, true); while (Running()) { if (!Poller.Poll(100)) continue; - uint64_t StartPos; - int Count = 0; - int n; - uchar *Data = m_RingBuffer.Get(Count); + uint64_t StartPos; + int Count = 0; + int n; + uchar *Data = m_RingBuffer.Get(Count); if (!Data || Count <= 0) continue; - Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) - StartPos = m_DiscardEnd; - Unlock(); + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + StartPos = m_DiscardEnd; + Unlock(); // Next frame ? if (NextHeaderPos == GetPos) { - // Discard data ? - if(StartPos > GetPos) { - // we're at frame boundary - Count = min(Count, (int)(StartPos - GetPos)); + // Discard data ? + if (StartPos > GetPos) { + // we're at frame boundary + Count = min(Count, (int)(StartPos - GetPos)); - m_RingBuffer.Del(Count); - GetPos += Count; - NextHeaderPos = GetPos; - continue; - } + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; + continue; + } // Next frame - if(Count < 6) - LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + if (Count < 6) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); - int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count); + int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count); - if(Count < packlen) - ;//LOGMSG("Count = %d < %d", Count, - // header->len + sizeof(stream_tcp_header_t)); - else - Count = packlen; - NextHeaderPos = GetPos + packlen; + if (Count < packlen) + ;//LOGMSG("Count = %d < %d", Count, + // header->len + sizeof(stream_tcp_header_t)); + else + Count = packlen; + NextHeaderPos = GetPos + packlen; - } else { + } else { // end of prev frame - Count = min(Count, (int)(NextHeaderPos-GetPos)); - } + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } - errno = 0; - n = write(m_fd, Data, Count); + errno = 0; + n = write(m_fd, Data, Count); if (n <= 0) { - if(n == 0) { - LOGERR("cBackgroundWriter: Client disconnected data stream ?"); - break; + if (n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; } if (errno == EINTR || errno == EWOULDBLOCK) - continue; + continue; - LOGERR("cBackgroundWriter: TCP write error"); - break; - } + LOGERR("cBackgroundWriter: TCP write error"); + break; + } - GetPos += n; - m_RingBuffer.Del(n); - } + GetPos += n; + m_RingBuffer.Del(n); + } m_RingBuffer.Clear(); } @@ -396,8 +396,8 @@ int cRawWriter::Put(uint64_t StreamPos, // Serialize Put access to keep Data and Header together LOCK_THREAD; - if(m_RingBuffer.Free() < DataCount) { - if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { + if (m_RingBuffer.Free() < DataCount) { + if (m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); m_RingBuffer.Clear(); Cancel(-1); @@ -406,7 +406,7 @@ int cRawWriter::Put(uint64_t StreamPos, return -DataCount; } int n = m_RingBuffer.Put(Data, DataCount); - if(n == DataCount) { + if (n == DataCount) { m_BufferOverflows = 0; m_PutPos += n; return n; |