diff options
author | phintuka <phintuka> | 2009-07-21 10:31:43 +0000 |
---|---|---|
committer | phintuka <phintuka> | 2009-07-21 10:31:43 +0000 |
commit | 77318b0a6b74df085b4a4efbb2aba924b64d705c (patch) | |
tree | 14b0e1e0b36a12a1ed620a14dce3f7f5a37943dd /tools | |
parent | 9a04f8df1a7b5c8f32b4a7540d31113f629e2165 (diff) | |
download | xineliboutput-77318b0a6b74df085b4a4efbb2aba924b64d705c.tar.gz xineliboutput-77318b0a6b74df085b4a4efbb2aba924b64d705c.tar.bz2 |
Cosmetics
Diffstat (limited to 'tools')
-rw-r--r-- | tools/backgroundwriter.c | 298 |
1 files changed, 149 insertions, 149 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c index aa321dd7..7d5e1034 100644 --- a/tools/backgroundwriter.c +++ b/tools/backgroundwriter.c @@ -4,7 +4,7 @@ * See the main source file 'xineliboutput.c' for copyright information and * how to reach the author. * - * $Id: backgroundwriter.c,v 1.9 2009-07-21 09:48:35 phintuka Exp $ + * $Id: backgroundwriter.c,v 1.10 2009-07-21 10:31:43 phintuka Exp $ * */ @@ -34,9 +34,9 @@ // cBackgroundWriterI // -cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) +cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) : m_RingBuffer(Size, Margin) -{ +{ m_fd = fd; m_RingBuffer.SetTimeouts(0, 100); m_Active = true; @@ -72,22 +72,22 @@ cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) LOGDBG("cBackgroundWriterI initialized (buffer %d kb)", Size/1024); } -cBackgroundWriterI::~cBackgroundWriterI() +cBackgroundWriterI::~cBackgroundWriterI() { m_Active = false; Cancel(3); } -int cBackgroundWriterI::Free(void) +int cBackgroundWriterI::Free(void) { return m_RingBuffer.Free(); } -void cBackgroundWriterI::Clear(void) +void cBackgroundWriterI::Clear(void) { // Can't just drop buffer contents or PES frames will be broken. // Serialize with Put - LOCK_THREAD; + LOCK_THREAD; m_DiscardEnd = m_PutPos; } @@ -100,8 +100,8 @@ bool cBackgroundWriterI::Flush(int TimeoutMs) WaitEnd += (uint64_t)TimeoutMs; while(cTimeMs::Now() < WaitEnd && - m_Active && - m_RingBuffer.Available() > 0) + m_Active && + m_RingBuffer.Available() > 0) cCondWait::SleepMs(3); } @@ -134,7 +134,7 @@ bool cBackgroundWriterI::Flush(int TimeoutMs) cTcpWriter::cTcpWriter(int fd, int Size) : cBackgroundWriterI(fd, Size, sizeof(stream_tcp_header_t)) -{ +{ LOGDBG("cTcpWriter initialized (buffer %d kb)", Size/1024); Start(); } @@ -155,84 +155,84 @@ void cTcpWriter::Action(void) if(Data && Count > 0) { #ifndef DISABLE_DISCARD - Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) - if(m_DiscardEnd > GetPos) { + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + if(m_DiscardEnd > GetPos) { # ifdef LOG_DISCARDS - LOGMSG("TCP: queue: discard request: queue %d bytes, " - "next point %d bytes forward (Count=%d)", - m_RingBuffer.Available(), - NextHeaderPos - GetPos, - Count); + LOGMSG("TCP: queue: discard request: queue %d bytes, " + "next point %d bytes forward (Count=%d)", + m_RingBuffer.Available(), + NextHeaderPos - GetPos, + Count); # endif - if(NextHeaderPos == GetPos) { - // we're at frame boundary + if(NextHeaderPos == GetPos) { + // we're at frame boundary # ifdef LOG_DISCARDS - uint8_t *pkt = TCP_PAYLOAD(Data); - if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { - LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); - } + uint8_t *pkt = TCP_PAYLOAD(Data); + if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { + LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); + } # endif - Count = min(Count, (int)(m_DiscardEnd - GetPos)); + Count = min(Count, (int)(m_DiscardEnd - GetPos)); # ifdef LOG_DISCARDS - LOGMSG("Flushing %d bytes", Count); + LOGMSG("Flushing %d bytes", Count); #endif - Unlock(); + Unlock(); - m_RingBuffer.Del(Count); - GetPos += Count; - NextHeaderPos = GetPos; + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; # ifdef LOG_DISCARDS - LOGMSG("Queue now %d bytes", m_RingBuffer.Available()); - pkt = TCP_PAYLOAD(Data); - if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { - LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); + LOGMSG("Queue now %d bytes", m_RingBuffer.Available()); + pkt = TCP_PAYLOAD(Data); + if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { + LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); # endif - continue; - } - } - Unlock(); + continue; + } + } + Unlock(); #endif #ifndef DISABLE_DISCARD - if(GetPos == NextHeaderPos) { - if(Count < (int)sizeof(stream_tcp_header_t)) - LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); - - stream_tcp_header_t *header = (stream_tcp_header_t*)Data; - if(Count < (int)(ntohl(header->len) + sizeof(stream_tcp_header_t))) - ;//LOGMSG("Count = %d < %d", Count, - // header->len + sizeof(stream_tcp_header_t)); - else - Count = ntohl(header->len) + sizeof(stream_tcp_header_t); - NextHeaderPos = GetPos + ntohl(header->len) + sizeof(stream_tcp_header_t); - } else { - Count = min(Count, (int)(NextHeaderPos-GetPos)); - } + if(GetPos == NextHeaderPos) { + if(Count < (int)sizeof(stream_tcp_header_t)) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + + stream_tcp_header_t *header = (stream_tcp_header_t*)Data; + if(Count < (int)(ntohl(header->len) + sizeof(stream_tcp_header_t))) + ;//LOGMSG("Count = %d < %d", Count, + // header->len + sizeof(stream_tcp_header_t)); + else + Count = ntohl(header->len) + sizeof(stream_tcp_header_t); + NextHeaderPos = GetPos + ntohl(header->len) + sizeof(stream_tcp_header_t); + } else { + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } #endif - errno = 0; - n = write(m_fd, Data, Count); - - if(n == 0) { - LOGERR("cBackgroundWriter: Client disconnected data stream ?"); - break; - - } else if(n < 0) { - - if (errno == EINTR || errno == EWOULDBLOCK) { - TRACE("cBackgroundWriter: EINTR while writing to file handle " - <<m_fd<<" - retrying"); - continue; - - } else { - LOGERR("cBackgroundWriter: TCP write error"); - break; - } - } - - GetPos += n; - m_RingBuffer.Del(n); + errno = 0; + n = write(m_fd, Data, Count); + + if(n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; + + } else if(n < 0) { + + if (errno == EINTR || errno == EWOULDBLOCK) { + TRACE("cBackgroundWriter: EINTR while writing to file handle " + <<m_fd<<" - retrying"); + continue; + + } else { + LOGERR("cBackgroundWriter: TCP write error"); + break; + } + } + + GetPos += n; + m_RingBuffer.Del(n); } } } @@ -241,8 +241,8 @@ void cTcpWriter::Action(void) m_Active = false; } -int cTcpWriter::Put(uint64_t StreamPos, - const uchar *Data, int DataCount) +int cTcpWriter::Put(uint64_t StreamPos, + const uchar *Data, int DataCount) { stream_tcp_header_t header; header.pos = htonull(StreamPos); @@ -250,21 +250,21 @@ int cTcpWriter::Put(uint64_t StreamPos, return Put((uchar*)&header, sizeof(header), Data, DataCount); } -int cTcpWriter::Put(const uchar *Header, int HeaderCount, - const uchar *Data, int DataCount) +int cTcpWriter::Put(const uchar *Header, int HeaderCount, + const uchar *Data, int DataCount) { if(m_Active) { // Serialize Put access to keep Data and Header together - LOCK_THREAD; - + LOCK_THREAD; + if(m_RingBuffer.Free() < HeaderCount+DataCount) { //LOGMSG("cXinelibServer: TCP buffer overflow !"); if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { - LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); - m_RingBuffer.Clear(); - m_Active = false; - return 0; + LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); + m_RingBuffer.Clear(); + m_Active = false; + return 0; } return -HeaderCount-DataCount; } @@ -275,7 +275,7 @@ int cTcpWriter::Put(const uchar *Header, int HeaderCount, m_PutPos += n; return n; } - + LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); m_RingBuffer.Clear(); m_Active = false; @@ -315,65 +315,65 @@ void cRawWriter::Action(void) if(Data && Count > 0) { #ifndef DISABLE_DISCARD - Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) - if(m_DiscardEnd > GetPos) { - - if(NextHeaderPos == GetPos) { - // we're at frame boundary - Count = min(Count, (int)(m_DiscardEnd - GetPos)); - Unlock(); - - m_RingBuffer.Del(Count); - GetPos += Count; - NextHeaderPos = GetPos; - continue; - } - } - Unlock(); + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + if(m_DiscardEnd > GetPos) { + + if(NextHeaderPos == GetPos) { + // we're at frame boundary + Count = min(Count, (int)(m_DiscardEnd - GetPos)); + Unlock(); + + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; + continue; + } + } + Unlock(); #endif #ifndef DISABLE_DISCARD - if(GetPos == NextHeaderPos) { - if(Count < 6) - LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + if(GetPos == NextHeaderPos) { + if(Count < 6) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); #if VDRVERSNUM >= 10701 - int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count); + int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count); #else - int packlen = pes_packet_len(Data, Count); + int packlen = pes_packet_len(Data, Count); #endif - if(Count < packlen) - ;//LOGMSG("Count = %d < %d", Count, - // header->len + sizeof(stream_tcp_header_t)); - else - Count = packlen; - NextHeaderPos = GetPos + packlen; - } else { - Count = min(Count, (int)(NextHeaderPos-GetPos)); - } + if(Count < packlen) + ;//LOGMSG("Count = %d < %d", Count, + // header->len + sizeof(stream_tcp_header_t)); + else + Count = packlen; + NextHeaderPos = GetPos + packlen; + } else { + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } #endif - errno = 0; - n = write(m_fd, Data, Count); - - if(n == 0) { - LOGERR("cBackgroundWriter: Client disconnected data stream ?"); - break; - - } else if(n < 0) { - - if (errno == EINTR || errno == EWOULDBLOCK) { - TRACE("cBackgroundWriter: EINTR while writing to file handle " - <<m_fd<<" - retrying"); - continue; - - } else { - LOGERR("cBackgroundWriter: TCP write error"); - break; - } - } - - GetPos += n; - m_RingBuffer.Del(n); + errno = 0; + n = write(m_fd, Data, Count); + + if(n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; + + } else if(n < 0) { + + if (errno == EINTR || errno == EWOULDBLOCK) { + TRACE("cBackgroundWriter: EINTR while writing to file handle " + <<m_fd<<" - retrying"); + continue; + + } else { + LOGERR("cBackgroundWriter: TCP write error"); + break; + } + } + + GetPos += n; + m_RingBuffer.Del(n); } } } @@ -382,20 +382,20 @@ void cRawWriter::Action(void) m_Active = false; } -int cRawWriter::Put(uint64_t StreamPos, - const uchar *Data, int DataCount) +int cRawWriter::Put(uint64_t StreamPos, + const uchar *Data, int DataCount) { if(m_Active) { // Serialize Put access to keep Data and Header together - LOCK_THREAD; - + LOCK_THREAD; + if(m_RingBuffer.Free() < DataCount) { if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { - LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); - m_RingBuffer.Clear(); - m_Active = false; - return 0; + LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); + m_RingBuffer.Clear(); + m_Active = false; + return 0; } return -DataCount; } @@ -405,7 +405,7 @@ int cRawWriter::Put(uint64_t StreamPos, m_PutPos += n; return n; } - + LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); m_RingBuffer.Clear(); m_Active = false; @@ -422,7 +422,7 @@ int cRawWriter::Put(uint64_t StreamPos, cTsWriter::cTsWriter(int fd, int Size) : cBackgroundWriterI(fd, Size, 6) -{ +{ LOGDBG("cTsWriter initialized (buffer %d kb)", Size/1024); Start(); } @@ -442,13 +442,13 @@ int cTsWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) // cRtspMuxWriter // - RTSP multiplexed control+data // - Each encapsulated PES frame is written atomically to socket buffer -// - Atomic control data can be written directly to socket +// - Atomic control data can be written directly to socket // from another thread to bypass buffer // cRtspMuxWriter::cRtspMuxWriter(int fd, int Size) : cBackgroundWriterI(fd, Size, 6) -{ +{ LOGDBG("cRtspMuxWriter initialized (buffer %d kb)", Size/1024); Start(); } @@ -470,13 +470,13 @@ int cRtspMuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) // - encapsulate ES to RTP/AVP compatible frames // - Mux RTP/AVP ES streams to pipelined RTCP control connection // - Each encapsulated frame is written atomically to socket buffer -// - Atomic control data can be written directly to socket +// - Atomic control data can be written directly to socket // from another thread to bypass buffer // cRtspRemuxWriter::cRtspRemuxWriter(int fd, int Size) : cBackgroundWriterI(fd, Size, 6) -{ +{ LOGDBG("cRtspRemuxWriter initialized (buffer %d kb)", Size/1024); Start(); } |