diff options
author | lordjaxom <lordjaxom> | 2004-12-30 22:43:55 +0000 |
---|---|---|
committer | lordjaxom <lordjaxom> | 2004-12-30 22:43:55 +0000 |
commit | 302fa2e67276bd0674e81e2a9a01b9e91dd45d8c (patch) | |
tree | a454884a16e0ffa48b5ce3e4ce1a66eb874a9de0 /client/filter.c | |
download | vdr-plugin-streamdev-302fa2e67276bd0674e81e2a9a01b9e91dd45d8c.tar.gz vdr-plugin-streamdev-302fa2e67276bd0674e81e2a9a01b9e91dd45d8c.tar.bz2 |
Initial revision
Diffstat (limited to 'client/filter.c')
-rw-r--r-- | client/filter.c | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/client/filter.c b/client/filter.c new file mode 100644 index 0000000..dad86f3 --- /dev/null +++ b/client/filter.c @@ -0,0 +1,141 @@ +/* + * $Id: filter.c,v 1.1 2004/12/30 22:44:04 lordjaxom Exp $ + */ + +#include "client/filter.h" +#include "client/socket.h" +#include "tools/select.h" +#include "common.h" + +#include <vdr/ringbuffer.h> + +#if VDRVERSNUM >= 10300 + +cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { + m_Used = 0; + m_Pid = Pid; + m_Tid = Tid; + m_Mask = Mask; + + if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) { + esyslog("streamev-client: coudln't open section filter pipe: %m"); + m_Pipe[0] = m_Pipe[1] = -1; + } +} + +cStreamdevFilter::~cStreamdevFilter() { + Dprintf("~cStreamdevFilter %p\n", this); + if (m_Pipe[0] >= 0) + close(m_Pipe[0]); + if (m_Pipe[1] >= 0) + close(m_Pipe[1]); +} + +bool cStreamdevFilter::PutSection(const uchar *Data, int Length) { + if (m_Used + Length >= (int)sizeof(m_Buffer)) { + esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)", + Length); + m_Used = 0; + return true; + } + memcpy(m_Buffer + m_Used, Data, Length); + m_Used += Length; + + if (m_Used > 3) { + int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3; + if (m_Used == length) { + if (write(m_Pipe[1], m_Buffer, length) < 0) + return false; + m_Used = 0; + } + } + return true; +} + +cStreamdevFilters::cStreamdevFilters(void): + cThread("streamdev-client: sections assembler") { + m_Active = false; + m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true); + Start(); +} + +cStreamdevFilters::~cStreamdevFilters() { + if (m_Active) { + m_Active = false; + Cancel(3); + } + delete m_RingBuffer; +} + +int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { + cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask); + Add(f); + return f->ReadPipe(); +} + +cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) { + for (cStreamdevFilter *f = First(); f; f = Next(f)) { + if (f->Matches(Pid, Tid)) + return f; + } + return NULL; +} + +void cStreamdevFilters::Put(const uchar *Data) { + static time_t firsterr = 0; + static int errcnt = 0; + static bool showerr = true; + + int p = m_RingBuffer->Put(Data, TS_SIZE); + if (p != TS_SIZE) { + ++errcnt; + if (showerr) { + if (firsterr == 0) + firsterr = time_ms(); + else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) { + esyslog("ERROR: too many buffer overflows, logging stopped"); + showerr = false; + firsterr = time_ms(); + } + } else if (firsterr + BUFOVERTIME < time_ms()) { + showerr = true; + firsterr = 0; + errcnt = 0; + } + + if (showerr) + esyslog("ERROR: ring buffer overflow (%d bytes dropped)", TS_SIZE - p); + else + firsterr = time_ms(); + } +} + +void cStreamdevFilters::Action(void) { + m_Active = true; + while (m_Active) { + int recvd; + const uchar *block = m_RingBuffer->Get(recvd); + + if (block && recvd > 0) { + cStreamdevFilter *f; + u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2]; + u_char tid = block[3]; + + if ((f = Matches(pid, tid)) != NULL) { + int len = block[4]; + if (!f->PutSection(block + 5, len)) { + if (errno != EPIPE) { + esyslog("streamdev-client: couldn't send section packet: %m"); + Dprintf("FATAL ERROR: %m\n"); + } + ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false); + Del(f); + } + } + m_RingBuffer->Del(TS_SIZE); + } else + usleep(1); + } +} + +#endif // VDRVERSNUM >= 10300 |