diff options
author | Frank Schmirler <vdr@schmirler.de> | 2010-12-02 08:59:14 +0100 |
---|---|---|
committer | Frank Schmirler <vdr@schmirler.de> | 2010-12-02 08:59:14 +0100 |
commit | 5a270cc3ab659a98b4bb674acb77982f7e1ecb14 (patch) | |
tree | 5f2f51c096f192a9b32af9ffd8244eeb6637ad06 /client/filter.c | |
parent | e6249bf957a943920b11abbd9efac1efa18b1d00 (diff) | |
download | vdr-plugin-streamdev-5a270cc3ab659a98b4bb674acb77982f7e1ecb14.tar.gz vdr-plugin-streamdev-5a270cc3ab659a98b4bb674acb77982f7e1ecb14.tar.bz2 |
Snapshot 2007-05-09
Diffstat (limited to 'client/filter.c')
-rw-r--r-- | client/filter.c | 265 |
1 files changed, 220 insertions, 45 deletions
diff --git a/client/filter.c b/client/filter.c index daf534a..91af8b7 100644 --- a/client/filter.c +++ b/client/filter.c @@ -1,5 +1,5 @@ /* - * $Id: filter.c,v 1.3 2005/11/06 16:43:58 lordjaxom Exp $ + * $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $ */ #include "client/filter.h" @@ -7,113 +7,288 @@ #include "tools/select.h" #include "common.h" -#include <vdr/ringbuffer.h> #include <vdr/device.h> #if VDRVERSNUM >= 10300 +// --- cStreamdevFilter ------------------------------------------------------ + +class cStreamdevFilter: public cListObject { +private: + uchar m_Buffer[4096]; + int m_Used; + int m_Pipe[2]; + u_short m_Pid; + u_char m_Tid; + u_char m_Mask; + +public: + cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); + virtual ~cStreamdevFilter(); + + bool Matches(u_short Pid, u_char Tid); + bool PutSection(const uchar *Data, int Length, bool Pusi); + int ReadPipe(void) const { return m_Pipe[0]; } + + bool IsClosed(void); + void Reset(void); + + u_short Pid(void) const { return m_Pid; } + u_char Tid(void) const { return m_Tid; } + u_char Mask(void) const { return m_Mask; } +}; + +inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) { + return m_Pid == Pid && m_Tid == (Tid & m_Mask); +} + cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { m_Used = 0; m_Pid = Pid; m_Tid = Tid; m_Mask = Mask; + m_Pipe[0] = m_Pipe[1] = -1; - if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) { - esyslog("streamev-client: coudln't open section filter pipe: %m"); - m_Pipe[0] = m_Pipe[1] = -1; +#ifdef SOCK_SEQPACKET + // SOCK_SEQPACKET (since kernel 2.6.4) + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) { + esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM"); + } +#endif + if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) { + esyslog("streamdev-client: couldn't open section filter socket: %m"); + } + + else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 || + fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) { + esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m"); } } cStreamdevFilter::~cStreamdevFilter() { Dprintf("~cStreamdevFilter %p\n", this); - if (m_Pipe[0] >= 0) - close(m_Pipe[0]); + + // ownership of handle m_Pipe[0] has been transferred to VDR section handler + //if (m_Pipe[0] >= 0) + // close(m_Pipe[0]); if (m_Pipe[1] >= 0) close(m_Pipe[1]); } -bool cStreamdevFilter::PutSection(const uchar *Data, int Length) { +bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { + + if (!m_Used && !Pusi) /* wait for payload unit start indicator */ + return true; + if (m_Used && Pusi) /* reset at payload unit start */ + Reset(); + if (m_Used + Length >= (int)sizeof(m_Buffer)) { esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)", Length); - m_Used = 0; + Reset(); return true; } + memcpy(m_Buffer + m_Used, Data, Length); m_Used += Length; - if (m_Used > 3) { int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3; if (m_Used == length) { - if (write(m_Pipe[1], m_Buffer, length) < 0) - return false; m_Used = 0; + if (write(m_Pipe[1], m_Buffer, length) < 0) { + if(errno == EAGAIN || errno == EWOULDBLOCK) + dsyslog("cStreamdevFilter::PutSection socket overflow, " + "Pid %4d Tid %3d", m_Pid, m_Tid); + + else + return false; + } } + + if (m_Used > length) { + dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d " + "(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length); + if(Length < TS_SIZE-5) { + // TS packet not full -> this must be last TS packet of section data -> safe to reset now + Reset(); + } + } + } return true; } +void cStreamdevFilter::Reset(void) { + if(m_Used) + dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used); + m_Used = 0; +} + +bool cStreamdevFilter::IsClosed(void) { + char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */ + + // Test if pipe/socket has been closed by writing empty section + if (write(m_Pipe[1], m_Buffer, 3) < 0 && + errno != EAGAIN && + errno != EWOULDBLOCK) { + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) + esyslog("cStreamdevFilter::IsClosed failed: %m"); + + return true; + } + + return false; +} + +// --- cStreamdevFilters ----------------------------------------------------- + cStreamdevFilters::cStreamdevFilters(void): cThread("streamdev-client: sections assembler") { - m_Active = false; - m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true); - Start(); + m_TSBuffer = NULL; } cStreamdevFilters::~cStreamdevFilters() { - if (m_Active) { - m_Active = false; - Cancel(3); - } - delete m_RingBuffer; + SetConnection(-1); } int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { + CarbageCollect(); + cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask); + int fh = f->ReadPipe(); + + Lock(); Add(f); - return f->ReadPipe(); + Unlock(); + + return fh; +} + +void cStreamdevFilters::CarbageCollect(void) { + LOCK_THREAD; + for (cStreamdevFilter *fi = First(); fi;) { + if (fi->IsClosed()) { + if (errno == ECONNREFUSED || + errno == ECONNRESET || + errno == EPIPE) { + ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false); + Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + + cStreamdevFilter *next = Prev(fi); + Del(fi); + fi = next ? Next(next) : First(); + } else { + esyslog("cStreamdevFilters::CarbageCollector() error: " + "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + LOG_ERROR; + fi = Next(fi); + } + } else { + fi = Next(fi); + } + } } -cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) { - for (cStreamdevFilter *f = First(); f; f = Next(f)) { - if (f->Matches(Pid, Tid)) - return f; +bool cStreamdevFilters::ReActivateFilters(void) +{ + LOCK_THREAD; + + bool res = true; + CarbageCollect(); + for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { + res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res; + Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL"); } - return NULL; + return res; } -void cStreamdevFilters::Put(const uchar *Data) { - int p = m_RingBuffer->Put(Data, TS_SIZE); - if (p != TS_SIZE) - m_RingBuffer->ReportOverflow(TS_SIZE - p); +void cStreamdevFilters::SetConnection(int Handle) { + + Cancel(2); + DELETENULL(m_TSBuffer); + + if (Handle >= 0) { + m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1); + ReActivateFilters(); + Start(); + } } void cStreamdevFilters::Action(void) { - m_Active = true; - while (m_Active) { - int recvd; - const uchar *block = m_RingBuffer->Get(recvd); + int fails = 0; - if (block && recvd > 0) { - cStreamdevFilter *f; + while (Running()) { + const uchar *block = m_TSBuffer->Get(); + if (block) { u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2]; u_char tid = block[3]; - - if ((f = Matches(pid, tid)) != NULL) { - int len = block[4]; - if (!f->PutSection(block + 5, len)) { - if (errno != EPIPE) { - esyslog("streamdev-client: couldn't send section packet: %m"); + bool Pusi = block[1] & 0x40; + int len = block[4]; +#if 0 + if (block[1] == 0xff && + block[2] == 0xff && + block[3] == 0xff && + block[4] == 0x7f) + isyslog("*********** TRANSPONDER -> %s **********", block+5); +#endif + LOCK_THREAD; + cStreamdevFilter *f = First(); + while (f) { + cStreamdevFilter *next = Next(f); + if (f->Matches(pid, tid)) { + + if (f->PutSection(block + 5, len, Pusi)) + break; + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) { Dprintf("FATAL ERROR: %m\n"); + esyslog("streamdev-client: couldn't send section packet: %m"); } ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false); Del(f); + // Filter was closed. + // - need to check remaining filters for another match } + f = next; } - m_RingBuffer->Del(TS_SIZE); - } else - usleep(1); + } else { +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // Check disconnection + int fd = *ClientSocket.DataSocket(siLiveFilter); + if(fd < 0) + break; + cPoller Poller(fd); + if (Poller.Poll()) { + char tmp[1]; + errno = 0; + Dprintf("cStreamdevFilters::Action(): checking connection"); + if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) { + ++fails; + if (fails >= 10) { + esyslog("cStreamdevFilters::Action(): stream disconnected ?"); + ClientSocket.CloseDataConnection(siLiveFilter); + break; + } + } else { + fails = 0; + } + } else { + fails = 0; + } + cCondWait::SleepMs(10); +#endif + } } + + DELETENULL(m_TSBuffer); + dsyslog("StreamdevFilters::Action() ended"); } #endif // VDRVERSNUM >= 10300 |