From c1297db43441118349fc9acf8191d69668ed8b91 Mon Sep 17 00:00:00 2001 From: phintuka Date: Sun, 7 Jan 2007 05:36:30 +0000 Subject: Splitted to interface and several implementation classes --- tools/backgroundwriter.c | 331 +++++++++++++++++++++++++++++++++++++++-------- tools/backgroundwriter.h | 139 +++++++++++++++++--- 2 files changed, 397 insertions(+), 73 deletions(-) diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c index 0602ed7c..1f56054a 100644 --- a/tools/backgroundwriter.c +++ b/tools/backgroundwriter.c @@ -4,12 +4,13 @@ * See the main source file 'xineliboutput.c' for copyright information and * how to reach the author. * - * $Id: backgroundwriter.c,v 1.3 2006-12-15 16:35:31 phintuka Exp $ + * $Id: backgroundwriter.c,v 1.4 2007-01-07 05:36:30 phintuka Exp $ * */ #include #include +#include // CORK, NODELAY #include @@ -21,15 +22,19 @@ //#define DISABLE_DISCARD //#define LOG_DISCARDS -#define MAX_OVERFLOWS_BEFORE_DISCONNECT 500 // ~ 1 second +#define MAX_OVERFLOWS_BEFORE_DISCONNECT 1000 // ~ 1 second -cBackgroundWriter::cBackgroundWriter(int fd, int Size, bool Raw) - : m_RingBuffer(Size, sizeof(stream_tcp_header_t)) + +// +// cBackgroundWriterI +// + +cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin) + : m_RingBuffer(Size, Margin) { - m_fd = fd; + m_fd = fd; m_RingBuffer.SetTimeouts(0, 100); m_Active = true; - m_Raw = Raw; m_PutPos = 0; m_DiscardStart = 0; @@ -37,24 +42,77 @@ cBackgroundWriter::cBackgroundWriter(int fd, int Size, bool Raw) m_BufferOverflows = 0; - LOGDBG("cBackgroundWriter%s initialized (buffer %d kb)", - Raw?"(RAW)":"", Size/1024); + int iCork = 1; + if(setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) { + if(errno != ENOTSOCK) + LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed"); + m_IsSocket = false; + errno = 0; + } else { + m_IsSocket = true; + } - Start(); + LOGDBG("cBackgroundWriterI initialized (buffer %d kb)", Size/1024); } -cBackgroundWriter::~cBackgroundWriter() +cBackgroundWriterI::~cBackgroundWriterI() { m_Active = false; Cancel(3); } -int cBackgroundWriter::Free(void) +int cBackgroundWriterI::Free(void) { return m_RingBuffer.Free(); } -void cBackgroundWriter::Action(void) +void cBackgroundWriterI::Clear(void) +{ + // Can't just drop buffer contents or PES frames will be broken. + // Serialize with Put + LOCK_THREAD; + m_DiscardEnd = m_PutPos; +} + +bool cBackgroundWriterI::Flush(int TimeoutMs) +{ + uint64_t WaitEnd = cTimeMs::Now(); + + // wait for ring buffer to drain + if(TimeoutMs > 0) { + WaitEnd += (uint64_t)TimeoutMs; + + while(cTimeMs::Now() < WaitEnd && + m_Active && + m_RingBuffer.Available() > 0) + cCondWait::SleepMs(3); + } + + if(m_IsSocket && m_RingBuffer.Available() <= 0) { + // flush corked data too + int i = 1; + if(setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) { + LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed"); + errno = 0; + } + } + + return m_RingBuffer.Available() <= 0; +} + + +// +// cTcpWriter +// + +cTcpWriter::cTcpWriter(int fd, int Size) : + cBackgroundWriterI(fd, Size, sizeof(stream_tcp_header_t)) +{ + LOGDBG("cTcpWriter initialized (buffer %d kb)", Size/1024); + Start(); +} + +void cTcpWriter::Action(void) { uint64_t NextHeaderPos = 0ULL; uint64_t GetPos = 0ULL; @@ -127,12 +185,7 @@ void cBackgroundWriter::Action(void) #endif errno = 0; - if(!m_Raw) - n = write(m_fd, Data, Count); - else - n = write(m_fd, - Data + sizeof(stream_tcp_header_t), - Count - sizeof(stream_tcp_header_t)); + n = write(m_fd, Data, Count); if(n == 0) { LOGERR("cBackgroundWriter: Client disconnected data stream ?"); @@ -151,9 +204,6 @@ void cBackgroundWriter::Action(void) } } - if(m_Raw) - n += sizeof(stream_tcp_header_t); - GetPos += n; m_RingBuffer.Del(n); } @@ -164,35 +214,8 @@ void cBackgroundWriter::Action(void) m_Active = false; } -void cBackgroundWriter::Clear(void) -{ - // Can't just drop buffer contents or PES frames will be broken. - - // Serialize with Put - LOCK_THREAD; -#ifdef LOG_DISCARDS - LOGMSG("cBackgroundWriter::Clear() @%lld", m_PutPos); -#endif - m_DiscardEnd = m_PutPos; -} - -bool cBackgroundWriter::Flush(int TimeoutMs) -{ - uint64_t WaitEnd = cTimeMs::Now(); - - if(TimeoutMs > 0) - WaitEnd += (uint64_t)TimeoutMs; - - while(cTimeMs::Now() < WaitEnd && - m_Active && - m_RingBuffer.Available() > 0) - cCondWait::SleepMs(3); - - return m_RingBuffer.Available() <= 0; -} - -int cBackgroundWriter::Put(uint64_t StreamPos, - const uchar *Data, int DataCount) +int cTcpWriter::Put(uint64_t StreamPos, + const uchar *Data, int DataCount) { stream_tcp_header_t header; header.pos = htonull(StreamPos); @@ -200,8 +223,8 @@ int cBackgroundWriter::Put(uint64_t StreamPos, return Put((uchar*)&header, sizeof(header), Data, DataCount); } -int cBackgroundWriter::Put(const uchar *Header, int HeaderCount, - const uchar *Data, int DataCount) +int cTcpWriter::Put(const uchar *Header, int HeaderCount, + const uchar *Data, int DataCount) { if(m_Active) { @@ -233,3 +256,207 @@ int cBackgroundWriter::Put(const uchar *Header, int HeaderCount, return 0; } + + +// +// cRawWriter +// + +#include "pes.h" + +cRawWriter::cRawWriter(int fd, int Size) : + cBackgroundWriterI(fd, Size, 6) +{ + LOGDBG("cRawWriter initialized (buffer %d kb)", Size/1024); + Start(); +} + +void cRawWriter::Action(void) +{ + uint64_t NextHeaderPos = 0ULL; + uint64_t GetPos = 0ULL; + cPoller Poller(m_fd, true); + + while(m_Active) { + + if(Poller.Poll(100)) { + + int Count = 0, n; + uchar *Data = m_RingBuffer.Get(Count); + + if(Data && Count > 0) { + +#ifndef DISABLE_DISCARD + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + if(m_DiscardEnd > GetPos) { + + if(NextHeaderPos == GetPos) { + // we're at frame boundary + Count = min(Count, (int)(m_DiscardEnd - GetPos)); + Unlock(); + + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; + continue; + } + } + Unlock(); +#endif + +#ifndef DISABLE_DISCARD + if(GetPos == NextHeaderPos) { + if(Count < 6) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + bool dummy; + int packlen = pes_packet_len(Data, Count, dummy); + if(Count < packlen) + ;//LOGMSG("Count = %d < %d", Count, + // header->len + sizeof(stream_tcp_header_t)); + else + Count = packlen; + NextHeaderPos = GetPos + packlen; + } else { + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } +#endif + + errno = 0; + n = write(m_fd, Data, Count); + + if(n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; + + } else if(n < 0) { + + if (errno == EINTR || errno == EWOULDBLOCK) { + TRACE("cBackgroundWriter: EINTR while writing to file handle " + < MAX_OVERFLOWS_BEFORE_DISCONNECT) { + LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client"); + m_RingBuffer.Clear(); + m_Active = false; + return 0; + } + return -DataCount; + } + int n = m_RingBuffer.Put(Data, DataCount); + if(n == DataCount) { + m_BufferOverflows = 0; + m_PutPos += n; + return n; + } + + LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); + m_RingBuffer.Clear(); + m_Active = false; + } + + return 0; +} + + +// +// cTsWriter +// - Demux PES stream to PS +// + +cTsWriter::cTsWriter(int fd, int Size) : + cBackgroundWriterI(fd, Size, 6) +{ + LOGDBG("cTsWriter initialized (buffer %d kb)", Size/1024); + Start(); +} + + +void cTsWriter::Action(void) +{ +} + +int cTsWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) +{ + return 0; +} + + +// +// cRtspMuxWriter +// - RTSP multiplexed control+data +// - Each encapsulated PES frame is written atomically to socket buffer +// - Atomic control data can be written directly to socket +// from another thread to bypass buffer +// + +cRtspMuxWriter::cRtspMuxWriter(int fd, int Size) : + cBackgroundWriterI(fd, Size, 6) +{ + LOGDBG("cRtspMuxWriter initialized (buffer %d kb)", Size/1024); + Start(); +} + +void cRtspMuxWriter::Action(void) +{ +} + +int cRtspMuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) +{ + return 0; +} + + +// +// cRtspRemuxWriter +// - RTSP multiplexed control+data +// - Demux PES stream to independent ES streams +// - encapsulate ES to RTP/AVP compatible frames +// - Mux RTP/AVP ES streams to pipelined RTCP control connection +// - Each encapsulated frame is written atomically to socket buffer +// - Atomic control data can be written directly to socket +// from another thread to bypass buffer +// + +cRtspRemuxWriter::cRtspRemuxWriter(int fd, int Size) : + cBackgroundWriterI(fd, Size, 6) +{ + LOGDBG("cRtspRemuxWriter initialized (buffer %d kb)", Size/1024); + Start(); +} + +void cRtspRemuxWriter::Action(void) +{ +} + +int cRtspRemuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount) +{ + return 0; +} + + diff --git a/tools/backgroundwriter.h b/tools/backgroundwriter.h index ea69de67..7119047e 100644 --- a/tools/backgroundwriter.h +++ b/tools/backgroundwriter.h @@ -4,7 +4,7 @@ * See the main source file 'xineliboutput.c' for copyright information and * how to reach the author. * - * $Id: backgroundwriter.h,v 1.3 2006-12-15 16:35:31 phintuka Exp $ + * $Id: backgroundwriter.h,v 1.4 2007-01-07 05:36:30 phintuka Exp $ * */ @@ -16,34 +16,31 @@ #include #include - -class cBackgroundWriter : public cThread { - - private: +// +// cBackgroundWriterI +// - generic interface for buffered output +// +class cBackgroundWriterI : public cThread +{ + protected: cRingBufferLinear m_RingBuffer; volatile bool m_Active; - bool m_Raw; /* stream without stream_tcp_header_t */ - int m_fd; + int m_fd; + bool m_IsSocket; uint64_t m_PutPos; uint64_t m_DiscardStart; uint64_t m_DiscardEnd; - int m_BufferOverflows; + int m_BufferOverflows; protected: - virtual void Action(void); - - int Put(const uchar *Header, int HeaderCount, - const uchar *Data, int DataCount); + virtual void Action(void) = 0; public: - cBackgroundWriter(int fd, int Size = KILOBYTE(512), bool Raw = false); - virtual ~cBackgroundWriter() ; - - // Return largest possible Put size - int Free(void); + cBackgroundWriterI(int fd, int Size = KILOBYTE(512), int Margin = 0); + virtual ~cBackgroundWriterI(); // Add PES frame to buffer // @@ -52,12 +49,112 @@ class cBackgroundWriter : public cThread { // Error: 0 (write error ; socket disconnected) // Buffer full: -Count (no bytes will be pushed to queue) // - int Put(uint64_t StreamPos, const uchar *Data, int DataCount); + virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount) = 0; + + int Free(void); // Return largest possible Put size + void Clear(void); // Drop all data (only complete frames) from buffer + bool Flush(int TimeoutMs); // Flush buffer (wait for data to be sent) +}; + + +// +// cTcpWriter +// - xineliboutput TCP data steam +// - stream_tcp_header_t encapsulated PES frames +// +class cTcpWriter : public cBackgroundWriterI +{ + protected: + virtual void Action(void); + + int Put(const uchar *Header, int HeaderCount, + const uchar *Data, int DataCount); + + public: + cTcpWriter(int fd, int Size = KILOBYTE(512)); + virtual ~cTcpWriter() {}; + + virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount); +}; + + +// +// cRawWriter +// - Raw PES stream +// - Used with HTTP +// +class cRawWriter : public cBackgroundWriterI +{ + protected: + virtual void Action(void); + + public: + cRawWriter(int fd, int Size = KILOBYTE(512)); + virtual ~cRawWriter() {}; - // Drop all data (only complete frames) from buffer - void Clear(void); + virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount); +}; + + +// +// cTsWriter +// - Demux PES stream to PS +// +class cTsWriter : public cBackgroundWriterI +{ + protected: + virtual void Action(void); + + public: + cTsWriter(int fd, int Size = KILOBYTE(512)); + virtual ~cTsWriter() {}; + + virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount); +}; + + +// +// cRtspMuxWriter +// - RTSP multiplexed control+data +// - Each encapsulated PES frame is written atomically to socket buffer +// - Atomic control data can be written directly to socket +// from another thread to bypass buffer +// + +class cRtspMuxWriter : public cBackgroundWriterI +{ + protected: + virtual void Action(void); + + public: + cRtspMuxWriter(int fd, int Size = KILOBYTE(512)); + virtual ~cRtspMuxWriter() {}; + + virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount); +}; + + +// +// cRtspRemuxWriter +// - RTSP multiplexed control+data +// - Demux PES stream to independent ES streams +// - encapsulate ES to RTP/AVP compatible frames +// - Mux RTP/AVP ES streams to pipelined RTCP control connection +// - Each encapsulated frame is written atomically to socket buffer +// - Atomic control data can be written directly to socket +// from another thread to bypass buffer +// + +class cRtspRemuxWriter : public cBackgroundWriterI +{ + protected: + virtual void Action(void); + + public: + cRtspRemuxWriter(int fd, int Size = KILOBYTE(512)); + virtual ~cRtspRemuxWriter() {}; - bool Flush(int TimeoutMs); + virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount); }; -- cgit v1.2.3