diff options
Diffstat (limited to 'tools/backgroundwriter.c')
-rw-r--r-- | tools/backgroundwriter.c | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c new file mode 100644 index 00000000..26e1f34b --- /dev/null +++ b/tools/backgroundwriter.c @@ -0,0 +1,215 @@ +/* + * backgroundwriter.h: Buffered socket/file writing thread + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: backgroundwriter.c,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#include <stdint.h> +#include <unistd.h> + +#include <vdr/tools.h> + +#include "../logdefs.h" +#include "../xine_input_vdr_net.h" // stream_tcp_header_t + +#include "backgroundwriter.h" + +//#define DISABLE_DISCARD +//#define LOG_DISCARDS + + +cBackgroundWriter::cBackgroundWriter(int fd, int Size) + : m_RingBuffer(Size, sizeof(stream_tcp_header_t)) +{ + m_fd = fd; + m_RingBuffer.SetTimeouts(0, 100); + m_Active = true; + + m_PutPos = 0; + m_DiscardStart = 0; + m_DiscardEnd = 0; + + LOGDBG("cBackgroundWriter initialized (buffer %d kb)", Size/1024); + + Start(); +} + +cBackgroundWriter::~cBackgroundWriter() +{ + m_Active = false; + Cancel(3); +} + +int cBackgroundWriter::Free(void) +{ + return m_RingBuffer.Free(); +} + +void cBackgroundWriter::Action(void) +{ + uint64_t NextHeaderPos = 0ULL; + uint64_t GetPos = 0ULL; + cPoller Poller(m_fd, true); + + while(m_Active) { + + if(Poller.Poll(100)) { + + int Count = 0; + uchar *Data = m_RingBuffer.Get(Count); + + if(Data && Count > 0) { + +#ifndef DISABLE_DISCARD + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + if(m_DiscardEnd > GetPos) { + +# ifdef LOG_DISCARDS + LOGMSG("TCP: queue: discard request: queue %d bytes, " + "next point %d bytes forward (Count=%d)", + m_RingBuffer.Available(), + NextHeaderPos - GetPos, + Count); +# endif + if(NextHeaderPos == GetPos) { + // we're at frame boundary +# ifdef LOG_DISCARDS + uint8_t *pkt = TCP_PAYLOAD(Data); + if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { + LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); + } +# endif + Count = min(Count, (int)(m_DiscardEnd - GetPos)); +# ifdef LOG_DISCARDS + LOGMSG("Flushing %d bytes", Count); +#endif + Unlock(); + + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; +# ifdef LOG_DISCARDS + LOGMSG("Queue now %d bytes", m_RingBuffer.Available()); + pkt = TCP_PAYLOAD(Data); + if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { + LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); +# endif + continue; + } + } + Unlock(); +#endif + +#ifndef DISABLE_DISCARD + if(GetPos == NextHeaderPos) { + if(Count < (int)sizeof(stream_tcp_header_t)) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + + stream_tcp_header_t *header = (stream_tcp_header_t*)Data; + if(Count < (int)(ntohl(header->len) + sizeof(stream_tcp_header_t))) + ;//LOGMSG("Count = %d < %d", Count, + // header->len + sizeof(stream_tcp_header_t)); + else + Count = ntohl(header->len) + sizeof(stream_tcp_header_t); + NextHeaderPos = GetPos + ntohl(header->len) + sizeof(stream_tcp_header_t); + } else { + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } +#endif + + errno = 0; + int n = write(m_fd, Data, Count); + + if(n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; + + } else if(n < 0) { + + if (errno == EINTR || errno == EWOULDBLOCK) { + TRACE("cBackgroundWriter: EINTR while writing to file handle " + <<m_fd<<" - retrying"); + continue; + + } else { + LOGERR("cBackgroundWriter: TCP write error"); + break; + } + } + + GetPos += n; + m_RingBuffer.Del(n); + } + } + } + + m_RingBuffer.Clear(); + m_Active = false; +} + +void cBackgroundWriter::Clear(void) +{ + // Can't just drop buffer contents or PES frames will be broken. + + // Serialize with Put + LOCK_THREAD; +#ifdef LOG_DISCARDS + LOGMSG("cBackgroundWriter::Clear() @%lld", m_PutPos); +#endif + m_DiscardEnd = m_PutPos; +} + +bool cBackgroundWriter::Flush(int TimeoutMs) +{ + uint64_t WaitEnd = cTimeMs::Now(); + + if(TimeoutMs > 0) + WaitEnd += (uint64_t)TimeoutMs; + + while(cTimeMs::Now() < WaitEnd && + m_Active && + m_RingBuffer.Available() > 0) + cCondWait::SleepMs(3); + + return m_RingBuffer.Available() <= 0; +} + +int cBackgroundWriter::Put(uint64_t StreamPos, + const uchar *Data, int DataCount) +{ + stream_tcp_header_t header; + header.pos = htonull(StreamPos); + header.len = htonl(DataCount); + return Put((uchar*)&header, sizeof(header), Data, DataCount); +} + +int cBackgroundWriter::Put(const uchar *Header, int HeaderCount, + const uchar *Data, int DataCount) +{ + if(m_Active) { + + // Serialize Put access to keep Data and Header together + LOCK_THREAD; + + if(m_RingBuffer.Free() < HeaderCount+DataCount) { + LOGMSG("cXinelibServer: TCP buffer overflow !"); + return -HeaderCount-DataCount; + } + int n = m_RingBuffer.Put(Header, HeaderCount) + + m_RingBuffer.Put(Data, DataCount); + if(n == HeaderCount+DataCount) { + m_PutPos += n; + return n; + } + + LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); + m_RingBuffer.Clear(); + m_Active = false; + } + + return 0; +} |