From 518886b08b5913cf8278f69905e7981424d46825 Mon Sep 17 00:00:00 2001 From: schmirl Date: Tue, 24 Apr 2007 11:23:16 +0000 Subject: client_filter-data-handling.patch by Petri Hintukainen - regonize PUSI flag in TS packets (bullet-proof section start+end indicator) - Use own TS buffer to read directly from socket, no need for ring buffer anymore - Re-activate all active filters after re-connection to server - Simplify thread start/stop/running detection to current VDR style - Update "filter closed by VDR" detection (datagram sockets return different errno's than pipes) - Deliver data to first matching and active filter (do not drop data if first matching filter has been closed, there is quite likely new filter for it) - Add disconnect detection to avoid 100% CPU usage in cTSBuffer::Action() Modified Files: client/filter.c client/filter.h --- client/filter.c | 125 ++++++++++++++++++++++++++++++++++++++++---------------- client/filter.h | 13 +++--- 2 files changed, 96 insertions(+), 42 deletions(-) (limited to 'client') diff --git a/client/filter.c b/client/filter.c index db4a7c7..91af8b7 100644 --- a/client/filter.c +++ b/client/filter.c @@ -1,5 +1,5 @@ /* - * $Id: filter.c,v 1.10 2007/04/23 12:52:28 schmirl Exp $ + * $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $ */ #include "client/filter.h" @@ -27,7 +27,7 @@ public: virtual ~cStreamdevFilter(); bool Matches(u_short Pid, u_char Tid); - bool PutSection(const uchar *Data, int Length); + bool PutSection(const uchar *Data, int Length, bool Pusi); int ReadPipe(void) const { return m_Pipe[0]; } bool IsClosed(void); @@ -75,7 +75,13 @@ cStreamdevFilter::~cStreamdevFilter() { close(m_Pipe[1]); } -bool cStreamdevFilter::PutSection(const uchar *Data, int Length) { +bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { + + if (!m_Used && !Pusi) /* wait for payload unit start indicator */ + return true; + if (m_Used && Pusi) /* reset at payload unit start */ + Reset(); + if (m_Used + Length >= (int)sizeof(m_Buffer)) { esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)", Length); @@ -141,17 +147,11 @@ bool cStreamdevFilter::IsClosed(void) { cStreamdevFilters::cStreamdevFilters(void): cThread("streamdev-client: sections assembler") { - m_Active = false; - m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true); - Start(); + m_TSBuffer = NULL; } cStreamdevFilters::~cStreamdevFilters() { - if (m_Active) { - m_Active = false; - Cancel(3); - } - delete m_RingBuffer; + SetConnection(-1); } int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { @@ -194,46 +194,101 @@ void cStreamdevFilters::CarbageCollect(void) { } } -cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) { - for (cStreamdevFilter *f = First(); f; f = Next(f)) { - if (f->Matches(Pid, Tid)) - return f; +bool cStreamdevFilters::ReActivateFilters(void) +{ + LOCK_THREAD; + + bool res = true; + CarbageCollect(); + for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { + res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res; + Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL"); } - return NULL; + return res; } -void cStreamdevFilters::Put(const uchar *Data) { - int p = m_RingBuffer->Put(Data, TS_SIZE); - if (p != TS_SIZE) - m_RingBuffer->ReportOverflow(TS_SIZE - p); +void cStreamdevFilters::SetConnection(int Handle) { + + Cancel(2); + DELETENULL(m_TSBuffer); + + if (Handle >= 0) { + m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1); + ReActivateFilters(); + Start(); + } } void cStreamdevFilters::Action(void) { - m_Active = true; - while (m_Active) { - int recvd; - const uchar *block = m_RingBuffer->Get(recvd); + int fails = 0; - if (block && recvd > 0) { - cStreamdevFilter *f; + while (Running()) { + const uchar *block = m_TSBuffer->Get(); + if (block) { u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2]; u_char tid = block[3]; - - if ((f = Matches(pid, tid)) != NULL) { - int len = block[4]; - if (!f->PutSection(block + 5, len)) { - if (errno != EPIPE) { - esyslog("streamdev-client: couldn't send section packet: %m"); + bool Pusi = block[1] & 0x40; + int len = block[4]; +#if 0 + if (block[1] == 0xff && + block[2] == 0xff && + block[3] == 0xff && + block[4] == 0x7f) + isyslog("*********** TRANSPONDER -> %s **********", block+5); +#endif + LOCK_THREAD; + cStreamdevFilter *f = First(); + while (f) { + cStreamdevFilter *next = Next(f); + if (f->Matches(pid, tid)) { + + if (f->PutSection(block + 5, len, Pusi)) + break; + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) { Dprintf("FATAL ERROR: %m\n"); + esyslog("streamdev-client: couldn't send section packet: %m"); } ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false); Del(f); + // Filter was closed. + // - need to check remaining filters for another match } + f = next; } - m_RingBuffer->Del(TS_SIZE); - } else - usleep(1); + } else { +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // Check disconnection + int fd = *ClientSocket.DataSocket(siLiveFilter); + if(fd < 0) + break; + cPoller Poller(fd); + if (Poller.Poll()) { + char tmp[1]; + errno = 0; + Dprintf("cStreamdevFilters::Action(): checking connection"); + if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) { + ++fails; + if (fails >= 10) { + esyslog("cStreamdevFilters::Action(): stream disconnected ?"); + ClientSocket.CloseDataConnection(siLiveFilter); + break; + } + } else { + fails = 0; + } + } else { + fails = 0; + } + cCondWait::SleepMs(10); +#endif + } } + + DELETENULL(m_TSBuffer); + dsyslog("StreamdevFilters::Action() ended"); } #endif // VDRVERSNUM >= 10300 diff --git a/client/filter.h b/client/filter.h index 04e8c75..e0a1575 100644 --- a/client/filter.h +++ b/client/filter.h @@ -1,5 +1,5 @@ /* - * $Id: filter.h,v 1.3 2007/04/23 12:52:28 schmirl Exp $ + * $Id: filter.h,v 1.4 2007/04/24 11:23:16 schmirl Exp $ */ #ifndef VDR_STREAMDEV_FILTER_H @@ -12,26 +12,25 @@ #include #include -class cRingBufferLinear; class cTSBuffer; class cStreamdevFilter; class cStreamdevFilters: public cList, public cThread { private: - bool m_Active; - cRingBufferLinear *m_RingBuffer; - + cTSBuffer *m_TSBuffer; + protected: virtual void Action(void); void CarbageCollect(void); + bool ReActivateFilters(void); + public: cStreamdevFilters(void); virtual ~cStreamdevFilters(); + void SetConnection(int Handle); int OpenFilter(u_short Pid, u_char Tid, u_char Mask); - cStreamdevFilter *Matches(u_short Pid, u_char Tid); - void Put(const uchar *Data); }; # endif // VDRVERSNUM >= 10300 -- cgit v1.2.3