summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/backgroundwriter.c331
-rw-r--r--tools/backgroundwriter.h139
2 files changed, 397 insertions, 73 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c
index 0602ed7c..1f56054a 100644
--- a/tools/backgroundwriter.c
+++ b/tools/backgroundwriter.c
@@ -4,12 +4,13 @@
* See the main source file 'xineliboutput.c' for copyright information and
* how to reach the author.
*
- * $Id: backgroundwriter.c,v 1.3 2006-12-15 16:35:31 phintuka Exp $
+ * $Id: backgroundwriter.c,v 1.4 2007-01-07 05:36:30 phintuka Exp $
*
*/
#include <stdint.h>
#include <unistd.h>
+#include <netinet/tcp.h> // CORK, NODELAY
#include <vdr/tools.h>
@@ -21,15 +22,19 @@
//#define DISABLE_DISCARD
//#define LOG_DISCARDS
-#define MAX_OVERFLOWS_BEFORE_DISCONNECT 500 // ~ 1 second
+#define MAX_OVERFLOWS_BEFORE_DISCONNECT 1000 // ~ 1 second
-cBackgroundWriter::cBackgroundWriter(int fd, int Size, bool Raw)
- : m_RingBuffer(Size, sizeof(stream_tcp_header_t))
+
+//
+// cBackgroundWriterI
+//
+
+cBackgroundWriterI::cBackgroundWriterI(int fd, int Size, int Margin)
+ : m_RingBuffer(Size, Margin)
{
- m_fd = fd;
+ m_fd = fd;
m_RingBuffer.SetTimeouts(0, 100);
m_Active = true;
- m_Raw = Raw;
m_PutPos = 0;
m_DiscardStart = 0;
@@ -37,24 +42,77 @@ cBackgroundWriter::cBackgroundWriter(int fd, int Size, bool Raw)
m_BufferOverflows = 0;
- LOGDBG("cBackgroundWriter%s initialized (buffer %d kb)",
- Raw?"(RAW)":"", Size/1024);
+ int iCork = 1;
+ if(setsockopt(m_fd, IPPROTO_TCP, TCP_CORK, &iCork, sizeof(int))) {
+ if(errno != ENOTSOCK)
+ LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed");
+ m_IsSocket = false;
+ errno = 0;
+ } else {
+ m_IsSocket = true;
+ }
- Start();
+ LOGDBG("cBackgroundWriterI initialized (buffer %d kb)", Size/1024);
}
-cBackgroundWriter::~cBackgroundWriter()
+cBackgroundWriterI::~cBackgroundWriterI()
{
m_Active = false;
Cancel(3);
}
-int cBackgroundWriter::Free(void)
+int cBackgroundWriterI::Free(void)
{
return m_RingBuffer.Free();
}
-void cBackgroundWriter::Action(void)
+void cBackgroundWriterI::Clear(void)
+{
+ // Can't just drop buffer contents or PES frames will be broken.
+ // Serialize with Put
+ LOCK_THREAD;
+ m_DiscardEnd = m_PutPos;
+}
+
+bool cBackgroundWriterI::Flush(int TimeoutMs)
+{
+ uint64_t WaitEnd = cTimeMs::Now();
+
+ // wait for ring buffer to drain
+ if(TimeoutMs > 0) {
+ WaitEnd += (uint64_t)TimeoutMs;
+
+ while(cTimeMs::Now() < WaitEnd &&
+ m_Active &&
+ m_RingBuffer.Available() > 0)
+ cCondWait::SleepMs(3);
+ }
+
+ if(m_IsSocket && m_RingBuffer.Available() <= 0) {
+ // flush corked data too
+ int i = 1;
+ if(setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(int))) {
+ LOGERR("cBackgroundWriter: setsockopt(TCP_CORK) failed");
+ errno = 0;
+ }
+ }
+
+ return m_RingBuffer.Available() <= 0;
+}
+
+
+//
+// cTcpWriter
+//
+
+cTcpWriter::cTcpWriter(int fd, int Size) :
+ cBackgroundWriterI(fd, Size, sizeof(stream_tcp_header_t))
+{
+ LOGDBG("cTcpWriter initialized (buffer %d kb)", Size/1024);
+ Start();
+}
+
+void cTcpWriter::Action(void)
{
uint64_t NextHeaderPos = 0ULL;
uint64_t GetPos = 0ULL;
@@ -127,12 +185,7 @@ void cBackgroundWriter::Action(void)
#endif
errno = 0;
- if(!m_Raw)
- n = write(m_fd, Data, Count);
- else
- n = write(m_fd,
- Data + sizeof(stream_tcp_header_t),
- Count - sizeof(stream_tcp_header_t));
+ n = write(m_fd, Data, Count);
if(n == 0) {
LOGERR("cBackgroundWriter: Client disconnected data stream ?");
@@ -151,9 +204,6 @@ void cBackgroundWriter::Action(void)
}
}
- if(m_Raw)
- n += sizeof(stream_tcp_header_t);
-
GetPos += n;
m_RingBuffer.Del(n);
}
@@ -164,35 +214,8 @@ void cBackgroundWriter::Action(void)
m_Active = false;
}
-void cBackgroundWriter::Clear(void)
-{
- // Can't just drop buffer contents or PES frames will be broken.
-
- // Serialize with Put
- LOCK_THREAD;
-#ifdef LOG_DISCARDS
- LOGMSG("cBackgroundWriter::Clear() @%lld", m_PutPos);
-#endif
- m_DiscardEnd = m_PutPos;
-}
-
-bool cBackgroundWriter::Flush(int TimeoutMs)
-{
- uint64_t WaitEnd = cTimeMs::Now();
-
- if(TimeoutMs > 0)
- WaitEnd += (uint64_t)TimeoutMs;
-
- while(cTimeMs::Now() < WaitEnd &&
- m_Active &&
- m_RingBuffer.Available() > 0)
- cCondWait::SleepMs(3);
-
- return m_RingBuffer.Available() <= 0;
-}
-
-int cBackgroundWriter::Put(uint64_t StreamPos,
- const uchar *Data, int DataCount)
+int cTcpWriter::Put(uint64_t StreamPos,
+ const uchar *Data, int DataCount)
{
stream_tcp_header_t header;
header.pos = htonull(StreamPos);
@@ -200,8 +223,8 @@ int cBackgroundWriter::Put(uint64_t StreamPos,
return Put((uchar*)&header, sizeof(header), Data, DataCount);
}
-int cBackgroundWriter::Put(const uchar *Header, int HeaderCount,
- const uchar *Data, int DataCount)
+int cTcpWriter::Put(const uchar *Header, int HeaderCount,
+ const uchar *Data, int DataCount)
{
if(m_Active) {
@@ -233,3 +256,207 @@ int cBackgroundWriter::Put(const uchar *Header, int HeaderCount,
return 0;
}
+
+
+//
+// cRawWriter
+//
+
+#include "pes.h"
+
+cRawWriter::cRawWriter(int fd, int Size) :
+ cBackgroundWriterI(fd, Size, 6)
+{
+ LOGDBG("cRawWriter initialized (buffer %d kb)", Size/1024);
+ Start();
+}
+
+void cRawWriter::Action(void)
+{
+ uint64_t NextHeaderPos = 0ULL;
+ uint64_t GetPos = 0ULL;
+ cPoller Poller(m_fd, true);
+
+ while(m_Active) {
+
+ if(Poller.Poll(100)) {
+
+ int Count = 0, n;
+ uchar *Data = m_RingBuffer.Get(Count);
+
+ if(Data && Count > 0) {
+
+#ifndef DISABLE_DISCARD
+ Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
+ if(m_DiscardEnd > GetPos) {
+
+ if(NextHeaderPos == GetPos) {
+ // we're at frame boundary
+ Count = min(Count, (int)(m_DiscardEnd - GetPos));
+ Unlock();
+
+ m_RingBuffer.Del(Count);
+ GetPos += Count;
+ NextHeaderPos = GetPos;
+ continue;
+ }
+ }
+ Unlock();
+#endif
+
+#ifndef DISABLE_DISCARD
+ if(GetPos == NextHeaderPos) {
+ if(Count < 6)
+ LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");
+ bool dummy;
+ int packlen = pes_packet_len(Data, Count, dummy);
+ if(Count < packlen)
+ ;//LOGMSG("Count = %d < %d", Count,
+ // header->len + sizeof(stream_tcp_header_t));
+ else
+ Count = packlen;
+ NextHeaderPos = GetPos + packlen;
+ } else {
+ Count = min(Count, (int)(NextHeaderPos-GetPos));
+ }
+#endif
+
+ errno = 0;
+ n = write(m_fd, Data, Count);
+
+ if(n == 0) {
+ LOGERR("cBackgroundWriter: Client disconnected data stream ?");
+ break;
+
+ } else if(n < 0) {
+
+ if (errno == EINTR || errno == EWOULDBLOCK) {
+ TRACE("cBackgroundWriter: EINTR while writing to file handle "
+ <<m_fd<<" - retrying");
+ continue;
+
+ } else {
+ LOGERR("cBackgroundWriter: TCP write error");
+ break;
+ }
+ }
+
+ GetPos += n;
+ m_RingBuffer.Del(n);
+ }
+ }
+ }
+
+ m_RingBuffer.Clear();
+ m_Active = false;
+}
+
+int cRawWriter::Put(uint64_t StreamPos,
+ const uchar *Data, int DataCount)
+{
+ if(m_Active) {
+
+ // Serialize Put access to keep Data and Header together
+ LOCK_THREAD;
+
+ if(m_RingBuffer.Free() < DataCount) {
+ if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) {
+ LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client");
+ m_RingBuffer.Clear();
+ m_Active = false;
+ return 0;
+ }
+ return -DataCount;
+ }
+ int n = m_RingBuffer.Put(Data, DataCount);
+ if(n == DataCount) {
+ m_BufferOverflows = 0;
+ m_PutPos += n;
+ return n;
+ }
+
+ LOGMSG("cXinelibServer: TCP buffer internal error ?!?");
+ m_RingBuffer.Clear();
+ m_Active = false;
+ }
+
+ return 0;
+}
+
+
+//
+// cTsWriter
+// - Demux PES stream to PS
+//
+
+cTsWriter::cTsWriter(int fd, int Size) :
+ cBackgroundWriterI(fd, Size, 6)
+{
+ LOGDBG("cTsWriter initialized (buffer %d kb)", Size/1024);
+ Start();
+}
+
+
+void cTsWriter::Action(void)
+{
+}
+
+int cTsWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount)
+{
+ return 0;
+}
+
+
+//
+// cRtspMuxWriter
+// - RTSP multiplexed control+data
+// - Each encapsulated PES frame is written atomically to socket buffer
+// - Atomic control data can be written directly to socket
+// from another thread to bypass buffer
+//
+
+cRtspMuxWriter::cRtspMuxWriter(int fd, int Size) :
+ cBackgroundWriterI(fd, Size, 6)
+{
+ LOGDBG("cRtspMuxWriter initialized (buffer %d kb)", Size/1024);
+ Start();
+}
+
+void cRtspMuxWriter::Action(void)
+{
+}
+
+int cRtspMuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount)
+{
+ return 0;
+}
+
+
+//
+// cRtspRemuxWriter
+// - RTSP multiplexed control+data
+// - Demux PES stream to independent ES streams
+// - encapsulate ES to RTP/AVP compatible frames
+// - Mux RTP/AVP ES streams to pipelined RTCP control connection
+// - Each encapsulated frame is written atomically to socket buffer
+// - Atomic control data can be written directly to socket
+// from another thread to bypass buffer
+//
+
+cRtspRemuxWriter::cRtspRemuxWriter(int fd, int Size) :
+ cBackgroundWriterI(fd, Size, 6)
+{
+ LOGDBG("cRtspRemuxWriter initialized (buffer %d kb)", Size/1024);
+ Start();
+}
+
+void cRtspRemuxWriter::Action(void)
+{
+}
+
+int cRtspRemuxWriter::Put(uint64_t StreamPos, const uchar *Data, int DataCount)
+{
+ return 0;
+}
+
+
diff --git a/tools/backgroundwriter.h b/tools/backgroundwriter.h
index ea69de67..7119047e 100644
--- a/tools/backgroundwriter.h
+++ b/tools/backgroundwriter.h
@@ -4,7 +4,7 @@
* See the main source file 'xineliboutput.c' for copyright information and
* how to reach the author.
*
- * $Id: backgroundwriter.h,v 1.3 2006-12-15 16:35:31 phintuka Exp $
+ * $Id: backgroundwriter.h,v 1.4 2007-01-07 05:36:30 phintuka Exp $
*
*/
@@ -16,34 +16,31 @@
#include <vdr/thread.h>
#include <vdr/ringbuffer.h>
-
-class cBackgroundWriter : public cThread {
-
- private:
+//
+// cBackgroundWriterI
+// - generic interface for buffered output
+//
+class cBackgroundWriterI : public cThread
+{
+ protected:
cRingBufferLinear m_RingBuffer;
volatile bool m_Active;
- bool m_Raw; /* stream without stream_tcp_header_t */
- int m_fd;
+ int m_fd;
+ bool m_IsSocket;
uint64_t m_PutPos;
uint64_t m_DiscardStart;
uint64_t m_DiscardEnd;
- int m_BufferOverflows;
+ int m_BufferOverflows;
protected:
- virtual void Action(void);
-
- int Put(const uchar *Header, int HeaderCount,
- const uchar *Data, int DataCount);
+ virtual void Action(void) = 0;
public:
- cBackgroundWriter(int fd, int Size = KILOBYTE(512), bool Raw = false);
- virtual ~cBackgroundWriter() ;
-
- // Return largest possible Put size
- int Free(void);
+ cBackgroundWriterI(int fd, int Size = KILOBYTE(512), int Margin = 0);
+ virtual ~cBackgroundWriterI();
// Add PES frame to buffer
//
@@ -52,12 +49,112 @@ class cBackgroundWriter : public cThread {
// Error: 0 (write error ; socket disconnected)
// Buffer full: -Count (no bytes will be pushed to queue)
//
- int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
+ virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount) = 0;
+
+ int Free(void); // Return largest possible Put size
+ void Clear(void); // Drop all data (only complete frames) from buffer
+ bool Flush(int TimeoutMs); // Flush buffer (wait for data to be sent)
+};
+
+
+//
+// cTcpWriter
+// - xineliboutput TCP data steam
+// - stream_tcp_header_t encapsulated PES frames
+//
+class cTcpWriter : public cBackgroundWriterI
+{
+ protected:
+ virtual void Action(void);
+
+ int Put(const uchar *Header, int HeaderCount,
+ const uchar *Data, int DataCount);
+
+ public:
+ cTcpWriter(int fd, int Size = KILOBYTE(512));
+ virtual ~cTcpWriter() {};
+
+ virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
+};
+
+
+//
+// cRawWriter
+// - Raw PES stream
+// - Used with HTTP
+//
+class cRawWriter : public cBackgroundWriterI
+{
+ protected:
+ virtual void Action(void);
+
+ public:
+ cRawWriter(int fd, int Size = KILOBYTE(512));
+ virtual ~cRawWriter() {};
- // Drop all data (only complete frames) from buffer
- void Clear(void);
+ virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
+};
+
+
+//
+// cTsWriter
+// - Demux PES stream to PS
+//
+class cTsWriter : public cBackgroundWriterI
+{
+ protected:
+ virtual void Action(void);
+
+ public:
+ cTsWriter(int fd, int Size = KILOBYTE(512));
+ virtual ~cTsWriter() {};
+
+ virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
+};
+
+
+//
+// cRtspMuxWriter
+// - RTSP multiplexed control+data
+// - Each encapsulated PES frame is written atomically to socket buffer
+// - Atomic control data can be written directly to socket
+// from another thread to bypass buffer
+//
+
+class cRtspMuxWriter : public cBackgroundWriterI
+{
+ protected:
+ virtual void Action(void);
+
+ public:
+ cRtspMuxWriter(int fd, int Size = KILOBYTE(512));
+ virtual ~cRtspMuxWriter() {};
+
+ virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
+};
+
+
+//
+// cRtspRemuxWriter
+// - RTSP multiplexed control+data
+// - Demux PES stream to independent ES streams
+// - encapsulate ES to RTP/AVP compatible frames
+// - Mux RTP/AVP ES streams to pipelined RTCP control connection
+// - Each encapsulated frame is written atomically to socket buffer
+// - Atomic control data can be written directly to socket
+// from another thread to bypass buffer
+//
+
+class cRtspRemuxWriter : public cBackgroundWriterI
+{
+ protected:
+ virtual void Action(void);
+
+ public:
+ cRtspRemuxWriter(int fd, int Size = KILOBYTE(512));
+ virtual ~cRtspRemuxWriter() {};
- bool Flush(int TimeoutMs);
+ virtual int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
};