summaryrefslogtreecommitdiff
path: root/client/filter.c
diff options
context:
space:
mode:
authorFrank Schmirler <vdr@schmirler.de>2010-12-02 08:59:14 +0100
committerFrank Schmirler <vdr@schmirler.de>2010-12-02 08:59:14 +0100
commit5a270cc3ab659a98b4bb674acb77982f7e1ecb14 (patch)
tree5f2f51c096f192a9b32af9ffd8244eeb6637ad06 /client/filter.c
parente6249bf957a943920b11abbd9efac1efa18b1d00 (diff)
downloadvdr-plugin-streamdev-5a270cc3ab659a98b4bb674acb77982f7e1ecb14.tar.gz
vdr-plugin-streamdev-5a270cc3ab659a98b4bb674acb77982f7e1ecb14.tar.bz2
Snapshot 2007-05-09
Diffstat (limited to 'client/filter.c')
-rw-r--r--client/filter.c265
1 files changed, 220 insertions, 45 deletions
diff --git a/client/filter.c b/client/filter.c
index daf534a..91af8b7 100644
--- a/client/filter.c
+++ b/client/filter.c
@@ -1,5 +1,5 @@
/*
- * $Id: filter.c,v 1.3 2005/11/06 16:43:58 lordjaxom Exp $
+ * $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $
*/
#include "client/filter.h"
@@ -7,113 +7,288 @@
#include "tools/select.h"
#include "common.h"
-#include <vdr/ringbuffer.h>
#include <vdr/device.h>
#if VDRVERSNUM >= 10300
+// --- cStreamdevFilter ------------------------------------------------------
+
+class cStreamdevFilter: public cListObject {
+private:
+ uchar m_Buffer[4096];
+ int m_Used;
+ int m_Pipe[2];
+ u_short m_Pid;
+ u_char m_Tid;
+ u_char m_Mask;
+
+public:
+ cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
+ virtual ~cStreamdevFilter();
+
+ bool Matches(u_short Pid, u_char Tid);
+ bool PutSection(const uchar *Data, int Length, bool Pusi);
+ int ReadPipe(void) const { return m_Pipe[0]; }
+
+ bool IsClosed(void);
+ void Reset(void);
+
+ u_short Pid(void) const { return m_Pid; }
+ u_char Tid(void) const { return m_Tid; }
+ u_char Mask(void) const { return m_Mask; }
+};
+
+inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) {
+ return m_Pid == Pid && m_Tid == (Tid & m_Mask);
+}
+
cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
m_Used = 0;
m_Pid = Pid;
m_Tid = Tid;
m_Mask = Mask;
+ m_Pipe[0] = m_Pipe[1] = -1;
- if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) {
- esyslog("streamev-client: coudln't open section filter pipe: %m");
- m_Pipe[0] = m_Pipe[1] = -1;
+#ifdef SOCK_SEQPACKET
+ // SOCK_SEQPACKET (since kernel 2.6.4)
+ if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) {
+ esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM");
+ }
+#endif
+ if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) {
+ esyslog("streamdev-client: couldn't open section filter socket: %m");
+ }
+
+ else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 ||
+ fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) {
+ esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m");
}
}
cStreamdevFilter::~cStreamdevFilter() {
Dprintf("~cStreamdevFilter %p\n", this);
- if (m_Pipe[0] >= 0)
- close(m_Pipe[0]);
+
+ // ownership of handle m_Pipe[0] has been transferred to VDR section handler
+ //if (m_Pipe[0] >= 0)
+ // close(m_Pipe[0]);
if (m_Pipe[1] >= 0)
close(m_Pipe[1]);
}
-bool cStreamdevFilter::PutSection(const uchar *Data, int Length) {
+bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) {
+
+ if (!m_Used && !Pusi) /* wait for payload unit start indicator */
+ return true;
+ if (m_Used && Pusi) /* reset at payload unit start */
+ Reset();
+
if (m_Used + Length >= (int)sizeof(m_Buffer)) {
esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
Length);
- m_Used = 0;
+ Reset();
return true;
}
+
memcpy(m_Buffer + m_Used, Data, Length);
m_Used += Length;
-
if (m_Used > 3) {
int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3;
if (m_Used == length) {
- if (write(m_Pipe[1], m_Buffer, length) < 0)
- return false;
m_Used = 0;
+ if (write(m_Pipe[1], m_Buffer, length) < 0) {
+ if(errno == EAGAIN || errno == EWOULDBLOCK)
+ dsyslog("cStreamdevFilter::PutSection socket overflow, "
+ "Pid %4d Tid %3d", m_Pid, m_Tid);
+
+ else
+ return false;
+ }
}
+
+ if (m_Used > length) {
+ dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d "
+ "(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length);
+ if(Length < TS_SIZE-5) {
+ // TS packet not full -> this must be last TS packet of section data -> safe to reset now
+ Reset();
+ }
+ }
+
}
return true;
}
+void cStreamdevFilter::Reset(void) {
+ if(m_Used)
+ dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used);
+ m_Used = 0;
+}
+
+bool cStreamdevFilter::IsClosed(void) {
+ char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */
+
+ // Test if pipe/socket has been closed by writing empty section
+ if (write(m_Pipe[1], m_Buffer, 3) < 0 &&
+ errno != EAGAIN &&
+ errno != EWOULDBLOCK) {
+
+ if (errno != ECONNREFUSED &&
+ errno != ECONNRESET &&
+ errno != EPIPE)
+ esyslog("cStreamdevFilter::IsClosed failed: %m");
+
+ return true;
+ }
+
+ return false;
+}
+
+// --- cStreamdevFilters -----------------------------------------------------
+
cStreamdevFilters::cStreamdevFilters(void):
cThread("streamdev-client: sections assembler") {
- m_Active = false;
- m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true);
- Start();
+ m_TSBuffer = NULL;
}
cStreamdevFilters::~cStreamdevFilters() {
- if (m_Active) {
- m_Active = false;
- Cancel(3);
- }
- delete m_RingBuffer;
+ SetConnection(-1);
}
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
+ CarbageCollect();
+
cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask);
+ int fh = f->ReadPipe();
+
+ Lock();
Add(f);
- return f->ReadPipe();
+ Unlock();
+
+ return fh;
+}
+
+void cStreamdevFilters::CarbageCollect(void) {
+ LOCK_THREAD;
+ for (cStreamdevFilter *fi = First(); fi;) {
+ if (fi->IsClosed()) {
+ if (errno == ECONNREFUSED ||
+ errno == ECONNRESET ||
+ errno == EPIPE) {
+ ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false);
+ Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)",
+ (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
+
+ cStreamdevFilter *next = Prev(fi);
+ Del(fi);
+ fi = next ? Next(next) : First();
+ } else {
+ esyslog("cStreamdevFilters::CarbageCollector() error: "
+ "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed",
+ (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
+ LOG_ERROR;
+ fi = Next(fi);
+ }
+ } else {
+ fi = Next(fi);
+ }
+ }
}
-cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
- for (cStreamdevFilter *f = First(); f; f = Next(f)) {
- if (f->Matches(Pid, Tid))
- return f;
+bool cStreamdevFilters::ReActivateFilters(void)
+{
+ LOCK_THREAD;
+
+ bool res = true;
+ CarbageCollect();
+ for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) {
+ res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res;
+ Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL");
}
- return NULL;
+ return res;
}
-void cStreamdevFilters::Put(const uchar *Data) {
- int p = m_RingBuffer->Put(Data, TS_SIZE);
- if (p != TS_SIZE)
- m_RingBuffer->ReportOverflow(TS_SIZE - p);
+void cStreamdevFilters::SetConnection(int Handle) {
+
+ Cancel(2);
+ DELETENULL(m_TSBuffer);
+
+ if (Handle >= 0) {
+ m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1);
+ ReActivateFilters();
+ Start();
+ }
}
void cStreamdevFilters::Action(void) {
- m_Active = true;
- while (m_Active) {
- int recvd;
- const uchar *block = m_RingBuffer->Get(recvd);
+ int fails = 0;
- if (block && recvd > 0) {
- cStreamdevFilter *f;
+ while (Running()) {
+ const uchar *block = m_TSBuffer->Get();
+ if (block) {
u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
u_char tid = block[3];
-
- if ((f = Matches(pid, tid)) != NULL) {
- int len = block[4];
- if (!f->PutSection(block + 5, len)) {
- if (errno != EPIPE) {
- esyslog("streamdev-client: couldn't send section packet: %m");
+ bool Pusi = block[1] & 0x40;
+ int len = block[4];
+#if 0
+ if (block[1] == 0xff &&
+ block[2] == 0xff &&
+ block[3] == 0xff &&
+ block[4] == 0x7f)
+ isyslog("*********** TRANSPONDER -> %s **********", block+5);
+#endif
+ LOCK_THREAD;
+ cStreamdevFilter *f = First();
+ while (f) {
+ cStreamdevFilter *next = Next(f);
+ if (f->Matches(pid, tid)) {
+
+ if (f->PutSection(block + 5, len, Pusi))
+ break;
+
+ if (errno != ECONNREFUSED &&
+ errno != ECONNRESET &&
+ errno != EPIPE) {
Dprintf("FATAL ERROR: %m\n");
+ esyslog("streamdev-client: couldn't send section packet: %m");
}
ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
Del(f);
+ // Filter was closed.
+ // - need to check remaining filters for another match
}
+ f = next;
}
- m_RingBuffer->Del(TS_SIZE);
- } else
- usleep(1);
+ } else {
+#if 1 // TODO: this should be fixed in vdr cTSBuffer
+ // Check disconnection
+ int fd = *ClientSocket.DataSocket(siLiveFilter);
+ if(fd < 0)
+ break;
+ cPoller Poller(fd);
+ if (Poller.Poll()) {
+ char tmp[1];
+ errno = 0;
+ Dprintf("cStreamdevFilters::Action(): checking connection");
+ if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) {
+ ++fails;
+ if (fails >= 10) {
+ esyslog("cStreamdevFilters::Action(): stream disconnected ?");
+ ClientSocket.CloseDataConnection(siLiveFilter);
+ break;
+ }
+ } else {
+ fails = 0;
+ }
+ } else {
+ fails = 0;
+ }
+ cCondWait::SleepMs(10);
+#endif
+ }
}
+
+ DELETENULL(m_TSBuffer);
+ dsyslog("StreamdevFilters::Action() ended");
}
#endif // VDRVERSNUM >= 10300