summaryrefslogtreecommitdiff
path: root/client/filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'client/filter.c')
-rw-r--r--client/filter.c155
1 files changed, 100 insertions, 55 deletions
diff --git a/client/filter.c b/client/filter.c
index 8606770..cf18fd5 100644
--- a/client/filter.c
+++ b/client/filter.c
@@ -6,12 +6,16 @@
#include "client/socket.h"
#include "tools/select.h"
#include "common.h"
+#include <sys/ioctl.h>
+#include <string.h>
#include <vdr/device.h>
#define PID_MASK_HI 0x1F
// --- cStreamdevFilter ------------------------------------------------------
+static int FilterSockBufSize_warn = 0;
+
class cStreamdevFilter: public cListObject {
private:
uchar m_Buffer[4096];
@@ -20,6 +24,10 @@ private:
u_short m_Pid;
u_char m_Tid;
u_char m_Mask;
+#ifdef TIOCOUTQ
+ unsigned long m_maxq;
+ unsigned long m_flushed;
+#endif
public:
cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
@@ -29,7 +37,6 @@ public:
bool PutSection(const uchar *Data, int Length, bool Pusi);
int ReadPipe(void) const { return m_Pipe[0]; }
- bool IsClosed(void);
void Reset(void);
u_short Pid(void) const { return m_Pid; }
@@ -47,6 +54,10 @@ cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
m_Tid = Tid;
m_Mask = Mask;
m_Pipe[0] = m_Pipe[1] = -1;
+#ifdef TIOCOUTQ
+ m_flushed = 0;
+ m_maxq = 0;
+#endif
#ifdef SOCK_SEQPACKET
// SOCK_SEQPACKET (since kernel 2.6.4)
@@ -58,7 +69,46 @@ cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
esyslog("streamdev-client: couldn't open section filter socket: %m");
}
- else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 ||
+ // Set buffer for socketpair. During certain situations, such as startup, channel/transponder
+ // change, VDR may lag in reading data. Instead of discarding it, we can buffer it.
+ // Buffer size required may be up to 4MByte.
+
+ if(StreamdevClientSetup.FilterSockBufSize) {
+ int sbs = StreamdevClientSetup.FilterSockBufSize;
+ int sbs2;
+ unsigned int sbss = sizeof(sbs);
+ int r;
+
+ r = setsockopt(m_Pipe[1], SOL_SOCKET, SO_SNDBUF, (char *)&sbs, sbss);
+
+ if(r < 0) {
+ isyslog("streamdev-client: setsockopt(SO_SNDBUF, %d) = %s", sbs, strerror(errno));
+ }
+ sbs2 = 0;
+ r = getsockopt(m_Pipe[1], SOL_SOCKET, SO_SNDBUF, (char *)&sbs2, &sbss);
+ if(r < 0 || !sbss || !sbs2) {
+ isyslog("streamdev-client: getsockopt(SO_SNDBUF, &%d, &%d) = %s", sbs2, sbss, strerror(errno));
+ } else {
+ // Linux actually returns double the requested size
+ // if everything works fine. And it actually buffers up to that double amount
+ // as can be seen from observing TIOCOUTQ (kernel 3.7/2014).
+
+ if(sbs2 > sbs)
+ sbs2 /= 2;
+ if(sbs2 < sbs) {
+ if(FilterSockBufSize_warn != sbs2) {
+ isyslog("streamdev-client: ******************************************************");
+ isyslog("streamdev-client: getsockopt(SO_SNDBUF) = %d < %d (configured).", sbs2, sbs);
+ isyslog("streamdev-client: Consider increasing system buffer size:");
+ isyslog("streamdev-client: 'sysctl net.core.wmem_max=%d'", sbs);
+ isyslog("streamdev-client: ******************************************************");
+ FilterSockBufSize_warn = sbs2;
+ }
+ }
+ }
+ }
+
+ if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 ||
fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) {
esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m");
}
@@ -67,11 +117,12 @@ cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
cStreamdevFilter::~cStreamdevFilter() {
Dprintf("~cStreamdevFilter %p\n", this);
- // ownership of handle m_Pipe[0] has been transferred to VDR section handler
- //if (m_Pipe[0] >= 0)
- // close(m_Pipe[0]);
- if (m_Pipe[1] >= 0)
+ if (m_Pipe[0] >= 0) {
+ close(m_Pipe[0]);
+ }
+ if (m_Pipe[1] >= 0) {
close(m_Pipe[1]);
+ }
}
bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) {
@@ -94,13 +145,42 @@ bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) {
int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3;
if (m_Used == length) {
m_Used = 0;
- if (write(m_Pipe[1], m_Buffer, length) < 0) {
- if(errno == EAGAIN || errno == EWOULDBLOCK)
- dsyslog("cStreamdevFilter::PutSection socket overflow, "
- "Pid %4d Tid %3d", m_Pid, m_Tid);
+#ifdef TIOCOUTQ
+ // If we can determine the queue size of the socket,
+ // we flush rather then let the socket drop random packets.
+ // This ensures that we have more contiguous set of packets
+ // on the receiver side.
+ if(m_flushed) {
+ unsigned long queue = 0;
+ ioctl(m_Pipe[1], TIOCOUTQ, &queue);
+ if(queue > m_maxq)
+ m_maxq = queue;
+ if(queue * 2 < m_maxq) {
+ dsyslog("cStreamdevFilter::PutSection(Pid:%d Tid: %d): "
+ "Flushed %ld bytes, max queue: %ld",
+ m_Pid, m_Tid, m_flushed, m_maxq);
+ m_flushed = m_maxq = 0;
- else
+ } else {
+ m_flushed += length;
+ }
+ }
+ if(!m_flushed)
+#endif
+ if(write(m_Pipe[1], m_Buffer, length) < 0) {
+ if(errno != EAGAIN && errno != EWOULDBLOCK) {
+ dsyslog("cStreamdevFilter::PutSection(Pid:%d Tid: %d): error: %s",
+ m_Pid, m_Tid, strerror(errno));
return false;
+ } else {
+#ifdef TIOCOUTQ
+ m_flushed += length;
+#else
+ dsyslog("cStreamdevFilter::PutSection(Pid:%d Tid: %d): "
+ "Dropping packet %ld bytes (queue overflow)",
+ m_Pid, m_Tid, length);
+#endif
+ }
}
}
@@ -123,25 +203,6 @@ void cStreamdevFilter::Reset(void) {
m_Used = 0;
}
-bool cStreamdevFilter::IsClosed(void) {
- char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */
-
- // Test if pipe/socket has been closed by writing empty section
- if (write(m_Pipe[1], m_Buffer, 3) < 0 &&
- errno != EAGAIN &&
- errno != EWOULDBLOCK) {
-
- if (errno != ECONNREFUSED &&
- errno != ECONNRESET &&
- errno != EPIPE)
- esyslog("cStreamdevFilter::IsClosed failed: %m");
-
- return true;
- }
-
- return false;
-}
-
// --- cStreamdevFilters -----------------------------------------------------
cStreamdevFilters::cStreamdevFilters(cClientSocket *ClientSocket):
@@ -155,8 +216,6 @@ cStreamdevFilters::~cStreamdevFilters() {
}
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
- CarbageCollect();
-
cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask);
int fh = f->ReadPipe();
@@ -167,31 +226,18 @@ int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
return fh;
}
-void cStreamdevFilters::CarbageCollect(void) {
+void cStreamdevFilters::CloseFilter(int Handle) {
LOCK_THREAD;
- for (cStreamdevFilter *fi = First(); fi;) {
- if (fi->IsClosed()) {
- if (errno == ECONNREFUSED ||
- errno == ECONNRESET ||
- errno == EPIPE) {
- m_ClientSocket->SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false);
- Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)",
- (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
-
- cStreamdevFilter *next = Prev(fi);
- Del(fi);
- fi = next ? Next(next) : First();
- } else {
- esyslog("cStreamdevFilters::CarbageCollector() error: "
- "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed",
- (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
- LOG_ERROR;
- fi = Next(fi);
- }
- } else {
- fi = Next(fi);
+
+ for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) {
+ if(fi->ReadPipe() == Handle) {
+ // isyslog("cStreamdevFilters::CloseFilter(%d): Pid %4d, Tid %3d, Mask %2x (%d filters left)\n",
+ // Handle, (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
+ Del(fi);
+ return;
}
}
+ esyslog("cStreamdevFilters::CloseFilter(%d): failed (%d filters left)\n", Handle, Count()-1);
}
bool cStreamdevFilters::ReActivateFilters(void)
@@ -199,7 +245,6 @@ bool cStreamdevFilters::ReActivateFilters(void)
LOCK_THREAD;
bool res = true;
- CarbageCollect();
for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) {
res = m_ClientSocket->SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res;
Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL");