diff options
author | cvs2svn <admin@example.com> | 2009-10-21 00:02:02 +0000 |
---|---|---|
committer | cvs2svn <admin@example.com> | 2009-10-21 00:02:02 +0000 |
commit | 97a97ca3358eb48de3eb7a222e487e800566569f (patch) | |
tree | 97c920d0225a1c9773a3bce2207f261d7d230123 /tools/backgroundwriter.c | |
parent | a61961358c5a2ec92340b3f8e056bab55438f103 (diff) | |
download | xineliboutput-CVS.tar.gz xineliboutput-CVS.tar.bz2 |
This commit was manufactured by cvs2svn to create branch 'CVS'.CVS
Diffstat (limited to 'tools/backgroundwriter.c')
-rw-r--r-- | tools/backgroundwriter.c | 500 |
1 files changed, 0 insertions, 500 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c deleted file mode 100644 index 6b7c8ff4..00000000 --- a/tools/backgroundwriter.c +++ /dev/null @@ -1,500 +0,0 @@ -/* - * backgroundwriter.h: Buffered socket/file writing thread - * - * See the main source file 'xineliboutput.c' for copyright information and - * how to reach the author. - * - * $Id: backgroundwriter.c,v 1.18 2009-07-24 18:11:20 phintuka Exp $ - * - */ - -#define __STDC_FORMAT_MACROS -#define __STDC_CONSTANT_MACROS -#include <inttypes.h> - -#include <stdint.h> -#include <unistd.h> -#include <netinet/tcp.h> // CORK, NODELAY - -#include <vdr/tools.h> -#include <vdr/config.h> // VDRVERSNUM - -#include "../logdefs.h" -#include "../xine_input_vdr_net.h" // stream_tcp_header_t -#include "ts.h" -#include "pes.h" - -#include "backgroundwriter.h" - - -#define MAX_OVERFLOWS_BEFORE_DISCONNECT 1000 // ~ 1 second - - -// -// cBackgroundWriterI -// - -cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) - : m_RingBuffer(Size, Margin) -{ - m_fd = fd; - m_RingBuffer.SetTimeouts(0, 100); - - m_PutPos = 0; - m_DiscardStart = 0; - m_DiscardEnd = 0; - - m_BufferOverflows = 0; - -#if defined(TCP_CORK) - int iCork = 1; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) { - if(errno != ENOTSOCK) - LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed"); - m_IsSocket = false; - errno = 0; - } else { - m_IsSocket = true; - } -#elif defined(TCP_NOPUSH) - int iCork = 1; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &iCork, sizeof(int))) { - if(errno != ENOTSOCK) - LOGERR("cBackgroundWriter: setsockopt(TCP_NOPUSH) failed"); - m_IsSocket = false; - errno = 0; - } else { - m_IsSocket = true; - } -#endif - - LOGDBG("cBackgroundWriterI initialized (buffer %d kb)", Size/1024); -} - -cBackgroundWriterI::~cBackgroundWriterI() -{ - Cancel(3); -} - -int cBackgroundWriterI::Free(void) -{ - return m_RingBuffer.Free(); -} - -void cBackgroundWriterI::Clear(void) -{ - // Can't just drop buffer contents or PES frames will be broken. - // Serialize with Put - LOCK_THREAD; - m_DiscardEnd = m_PutPos; -} - -void cBackgroundWriterI::Cork(void) -{ - if (m_IsSocket) { -#if defined(TCP_CORK) - int i = 1; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) { - LOGERR("cBackgroundWriter: setsockopt(TCP_NODELAY) failed"); - errno = 0; - } -#elif defined(TCP_NOPUSH) - int On = 1, Off = 0; - if(setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &Off, sizeof(int)) || - setsockopt(m_fd, IPPROTO_TCP, TCP_NOPUSH, &On, sizeof(int))) { - LOGERR("cBackgroundWriter: setsockopt(TCP_NOPUSH) failed"); - errno = 0; - } -#endif - } -} - -bool cBackgroundWriterI::Flush(int TimeoutMs) -{ - uint64_t WaitEnd = cTimeMs::Now(); - - // wait for ring buffer to drain - if(TimeoutMs > 0) { - WaitEnd += (uint64_t)TimeoutMs; - - while(cTimeMs::Now() < WaitEnd && - Running() && - m_RingBuffer.Available() > 0) - cCondWait::SleepMs(3); - } - - int Available = m_RingBuffer.Available(); - if(m_IsSocket && Available <= 0) { - // flush corked data too - Cork(); - } - - return Available <= 0; -} - - -// -// cTcpWriter -// - -cTcpWriter::cTcpWriter(int fd, int Size) : - cBackgroundWriterI(fd, Size, sizeof(stream_tcp_header_t)) -{ - LOGDBG("cTcpWriter initialized (buffer %d kb)", Size/1024); - Start(); -} - -void cTcpWriter::Action(void) -{ - uint64_t NextHeaderPos = 0; - uint64_t GetPos = 0; - cPoller Poller (m_fd, true); - bool CorkReq = false; - - while (Running()) { - - if(Poller.Poll(100)) { - - if (CorkReq && m_RingBuffer.Available() <= 0) { - // Force TCP packet to avoid delaying control messages - Cork(); - CorkReq = false; - } - - uint64_t StartPos; - int Count = 0; - int n; - uchar *Data = m_RingBuffer.Get(Count); - - if(Data && Count > 0) { - - Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) - StartPos = m_DiscardEnd; - Unlock(); - - // Discard data ? - if(StartPos > GetPos) { - if(NextHeaderPos == GetPos) { - // we're at frame boundary - // drop only data packets, not control messages - uint8_t *pkt = TCP_PAYLOAD(Data); - if (DATA_IS_PES(pkt) || DATA_IS_TS(pkt)) { - Count = min(Count, (int)(StartPos - GetPos)); - - // size of next (complete) packet. - // drop only one packet at time. - stream_tcp_header_t *header = (stream_tcp_header_t*)Data; - int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t); - if (Count >= pkt_len) { - // drop only complete packets. - // some packets are not dropped (packets overlapping end of ringbuffer) - Count = pkt_len; - - m_RingBuffer.Del(Count); - GetPos += Count; - NextHeaderPos = GetPos; - - CorkReq = true; // force sending last frame - - continue; - } - } - } - } - - // Next frame ? - if(GetPos == NextHeaderPos) { - if(Count < (int)sizeof(stream_tcp_header_t)) - LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); - - // limit single write to size of next (complete) packet. - // (we need to track packet boundaries) - stream_tcp_header_t *header = (stream_tcp_header_t*)Data; - int pkt_len = ntohl(header->len) + sizeof(stream_tcp_header_t); - if (Count > pkt_len) - Count = pkt_len; - // next packet start position in stream - NextHeaderPos = GetPos + pkt_len; - - // check for control message - uint8_t *pkt = TCP_PAYLOAD(Data); - if (!DATA_IS_PES(pkt) && !DATA_IS_TS(pkt)) - CorkReq = true; - - } else { - Count = min(Count, (int)(NextHeaderPos-GetPos)); - } - - errno = 0; - n = write(m_fd, Data, Count); - - if(n == 0) { - LOGERR("cBackgroundWriter: Client disconnected data stream ?"); - break; - - } else if(n < 0) { - - if (errno == EINTR || errno == EWOULDBLOCK) { - TRACE("cBackgroundWriter: EINTR while writing to file handle " - <<m_fd<<" - retrying"); - continue; - - } else { - LOGERR("cBackgroundWriter: TCP write error"); - break; - } - } - - GetPos += n; - m_RingBuffer.Del(n); - } - } - } - - m_RingBuffer.Clear(); -} - -int cTcpWriter::Put(uint64_t StreamPos, - const uchar *Data, int DataCount) -{ - stream_tcp_header_t header; - header.pos = htonull(StreamPos); - header.len = htonl(DataCount); - return Put((uchar*)&header, sizeof(header), Data, DataCount); -} - -int cTcpWriter::Put(const uchar *Header, int HeaderCount, - const uchar *Data, int DataCount) -{ - if (Running()) { - - // Serialize Put access to keep Data and Header together - LOCK_THREAD; - - if(m_RingBuffer.Free() < HeaderCount+DataCount) { - //LOGMSG("cXinelibServer: TCP buffer overflow !"); - if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { - LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); - m_RingBuffer.Clear(); - Cancel(-1); - return 0; - } - return -HeaderCount-DataCount; - } - int n = m_RingBuffer.Put(Header, HeaderCount) + - m_RingBuffer.Put(Data, DataCount); - if(n == HeaderCount+DataCount) { - m_BufferOverflows = 0; - m_PutPos += n; - return n; - } - - LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); - m_RingBuffer.Clear(); - Cancel(-1); - } - - return 0; -} - - -// -// cRawWriter -// - -cRawWriter::cRawWriter(int fd, int Size) : - cBackgroundWriterI(fd, Size, 6) -{ - LOGDBG("cRawWriter initialized (buffer %d kb)", Size/1024); - Start(); -} - -void cRawWriter::Action(void) -{ - uint64_t NextHeaderPos = 0ULL; - uint64_t GetPos = 0ULL; - cPoller Poller(m_fd, true); - - while (Running()) { - - if(Poller.Poll(100)) { - - uint64_t StartPos; - int Count = 0; - int n; - uchar *Data = m_RingBuffer.Get(Count); - - if(Data && Count > 0) { - - Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) - StartPos = m_DiscardEnd; - Unlock(); - - // Discard data ? - if(StartPos > GetPos) { - if(NextHeaderPos == GetPos) { - // we're at frame boundary - Count = min(Count, (int)(StartPos - GetPos)); - - m_RingBuffer.Del(Count); - GetPos += Count; - NextHeaderPos = GetPos; - continue; - } - } - - // Next frame ? - if(GetPos == NextHeaderPos) { - if(Count < 6) - LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); - - int packlen = DATA_IS_TS(Data) ? TS_SIZE : pes_packet_len(Data, Count); - - if(Count < packlen) - ;//LOGMSG("Count = %d < %d", Count, - // header->len + sizeof(stream_tcp_header_t)); - else - Count = packlen; - NextHeaderPos = GetPos + packlen; - } else { - Count = min(Count, (int)(NextHeaderPos-GetPos)); - } - - errno = 0; - n = write(m_fd, Data, Count); - - if(n == 0) { - LOGERR("cBackgroundWriter: Client disconnected data stream ?"); - break; - - } else if(n < 0) { - - if (errno == EINTR || errno == EWOULDBLOCK) { - TRACE("cBackgroundWriter: EINTR while writing to file handle " - <<m_fd<<" - retrying"); - continue; - - } else { - LOGERR("cBackgroundWriter: TCP write error"); - break; - } - } - - GetPos += n; - m_RingBuffer.Del(n); - } - } - } - - m_RingBuffer.Clear(); -} - -int cRawWriter::Put(uint64_t StreamPos, - const uchar *Data, int DataCount) -{ - if (Running()) { - - // Serialize Put access to keep Data and Header together - LOCK_THREAD; - - if(m_RingBuffer.Free() < DataCount) { - if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) { - LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); - m_RingBuffer.Clear(); - Cancel(-1); - return 0; - } - return -DataCount; - } - int n = m_RingBuffer.Put(Data, DataCount); - if(n == DataCount) { - m_BufferOverflows = 0; - m_PutPos += n; - return n; - } - - LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); - m_RingBuffer.Clear(); - Cancel(-1); - } - - return 0; -} - - -// -// cTsWriter -// - Demux PES stream to PS -// - -cTsWriter::cTsWriter(int fd, int Size) : - cBackgroundWriterI(fd, Size, 6) -{ - LOGDBG("cTsWriter initialized (buffer %d kb)", Size/1024); - Start(); -} - - -void cTsWriter::Action(void) -{ -} - -int cTsWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) -{ - return 0; -} - - -// -// cRtspMuxWriter -// - RTSP multiplexed control+data -// - Each encapsulated PES frame is written atomically to socket buffer -// - Atomic control data can be written directly to socket -// from another thread to bypass buffer -// - -cRtspMuxWriter::cRtspMuxWriter(int fd, int Size) : - cBackgroundWriterI(fd, Size, 6) -{ - LOGDBG("cRtspMuxWriter initialized (buffer %d kb)", Size/1024); - Start(); -} - -void cRtspMuxWriter::Action(void) -{ -} - -int cRtspMuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) -{ - return 0; -} - - -// -// cRtspRemuxWriter -// - RTSP multiplexed control+data -// - Demux PES stream to independent ES streams -// - encapsulate ES to RTP/AVP compatible frames -// - Mux RTP/AVP ES streams to pipelined RTCP control connection -// - Each encapsulated frame is written atomically to socket buffer -// - Atomic control data can be written directly to socket -// from another thread to bypass buffer -// - -cRtspRemuxWriter::cRtspRemuxWriter(int fd, int Size) : - cBackgroundWriterI(fd, Size, 6) -{ - LOGDBG("cRtspRemuxWriter initialized (buffer %d kb)", Size/1024); - Start(); -} - -void cRtspRemuxWriter::Action(void) -{ -} - -int cRtspRemuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) -{ - return 0; -} - - |