summaryrefslogtreecommitdiff
path: root/client/filter.c
diff options
context:
space:
mode:
authorlordjaxom <lordjaxom>2004-12-30 22:43:55 +0000
committerlordjaxom <lordjaxom>2004-12-30 22:43:55 +0000
commit302fa2e67276bd0674e81e2a9a01b9e91dd45d8c (patch)
treea454884a16e0ffa48b5ce3e4ce1a66eb874a9de0 /client/filter.c
downloadvdr-plugin-streamdev-302fa2e67276bd0674e81e2a9a01b9e91dd45d8c.tar.gz
vdr-plugin-streamdev-302fa2e67276bd0674e81e2a9a01b9e91dd45d8c.tar.bz2
Initial revision
Diffstat (limited to 'client/filter.c')
-rw-r--r--client/filter.c141
1 files changed, 141 insertions, 0 deletions
diff --git a/client/filter.c b/client/filter.c
new file mode 100644
index 0000000..dad86f3
--- /dev/null
+++ b/client/filter.c
@@ -0,0 +1,141 @@
+/*
+ * $Id: filter.c,v 1.1 2004/12/30 22:44:04 lordjaxom Exp $
+ */
+
+#include "client/filter.h"
+#include "client/socket.h"
+#include "tools/select.h"
+#include "common.h"
+
+#include <vdr/ringbuffer.h>
+
+#if VDRVERSNUM >= 10300
+
+cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
+ m_Used = 0;
+ m_Pid = Pid;
+ m_Tid = Tid;
+ m_Mask = Mask;
+
+ if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) {
+ esyslog("streamev-client: coudln't open section filter pipe: %m");
+ m_Pipe[0] = m_Pipe[1] = -1;
+ }
+}
+
+cStreamdevFilter::~cStreamdevFilter() {
+ Dprintf("~cStreamdevFilter %p\n", this);
+ if (m_Pipe[0] >= 0)
+ close(m_Pipe[0]);
+ if (m_Pipe[1] >= 0)
+ close(m_Pipe[1]);
+}
+
+bool cStreamdevFilter::PutSection(const uchar *Data, int Length) {
+ if (m_Used + Length >= (int)sizeof(m_Buffer)) {
+ esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
+ Length);
+ m_Used = 0;
+ return true;
+ }
+ memcpy(m_Buffer + m_Used, Data, Length);
+ m_Used += Length;
+
+ if (m_Used > 3) {
+ int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3;
+ if (m_Used == length) {
+ if (write(m_Pipe[1], m_Buffer, length) < 0)
+ return false;
+ m_Used = 0;
+ }
+ }
+ return true;
+}
+
+cStreamdevFilters::cStreamdevFilters(void):
+ cThread("streamdev-client: sections assembler") {
+ m_Active = false;
+ m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true);
+ Start();
+}
+
+cStreamdevFilters::~cStreamdevFilters() {
+ if (m_Active) {
+ m_Active = false;
+ Cancel(3);
+ }
+ delete m_RingBuffer;
+}
+
+int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
+ cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask);
+ Add(f);
+ return f->ReadPipe();
+}
+
+cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
+ for (cStreamdevFilter *f = First(); f; f = Next(f)) {
+ if (f->Matches(Pid, Tid))
+ return f;
+ }
+ return NULL;
+}
+
+void cStreamdevFilters::Put(const uchar *Data) {
+ static time_t firsterr = 0;
+ static int errcnt = 0;
+ static bool showerr = true;
+
+ int p = m_RingBuffer->Put(Data, TS_SIZE);
+ if (p != TS_SIZE) {
+ ++errcnt;
+ if (showerr) {
+ if (firsterr == 0)
+ firsterr = time_ms();
+ else if (firsterr + BUFOVERTIME > time_ms() && errcnt > BUFOVERCOUNT) {
+ esyslog("ERROR: too many buffer overflows, logging stopped");
+ showerr = false;
+ firsterr = time_ms();
+ }
+ } else if (firsterr + BUFOVERTIME < time_ms()) {
+ showerr = true;
+ firsterr = 0;
+ errcnt = 0;
+ }
+
+ if (showerr)
+ esyslog("ERROR: ring buffer overflow (%d bytes dropped)", TS_SIZE - p);
+ else
+ firsterr = time_ms();
+ }
+}
+
+void cStreamdevFilters::Action(void) {
+ m_Active = true;
+ while (m_Active) {
+ int recvd;
+ const uchar *block = m_RingBuffer->Get(recvd);
+
+ if (block && recvd > 0) {
+ cStreamdevFilter *f;
+ u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
+ u_char tid = block[3];
+
+ if ((f = Matches(pid, tid)) != NULL) {
+ int len = block[4];
+ if (!f->PutSection(block + 5, len)) {
+ if (errno != EPIPE) {
+ esyslog("streamdev-client: couldn't send section packet: %m");
+ Dprintf("FATAL ERROR: %m\n");
+ }
+ ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
+ Del(f);
+ }
+ }
+ m_RingBuffer->Del(TS_SIZE);
+ } else
+ usleep(1);
+ }
+}
+
+#endif // VDRVERSNUM >= 10300