diff options
Diffstat (limited to 'client/filter.c')
-rw-r--r-- | client/filter.c | 155 |
1 files changed, 100 insertions, 55 deletions
diff --git a/client/filter.c b/client/filter.c index 8606770..cf18fd5 100644 --- a/client/filter.c +++ b/client/filter.c @@ -6,12 +6,16 @@ #include "client/socket.h" #include "tools/select.h" #include "common.h" +#include <sys/ioctl.h> +#include <string.h> #include <vdr/device.h> #define PID_MASK_HI 0x1F // --- cStreamdevFilter ------------------------------------------------------ +static int FilterSockBufSize_warn = 0; + class cStreamdevFilter: public cListObject { private: uchar m_Buffer[4096]; @@ -20,6 +24,10 @@ private: u_short m_Pid; u_char m_Tid; u_char m_Mask; +#ifdef TIOCOUTQ + unsigned long m_maxq; + unsigned long m_flushed; +#endif public: cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); @@ -29,7 +37,6 @@ public: bool PutSection(const uchar *Data, int Length, bool Pusi); int ReadPipe(void) const { return m_Pipe[0]; } - bool IsClosed(void); void Reset(void); u_short Pid(void) const { return m_Pid; } @@ -47,6 +54,10 @@ cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { m_Tid = Tid; m_Mask = Mask; m_Pipe[0] = m_Pipe[1] = -1; +#ifdef TIOCOUTQ + m_flushed = 0; + m_maxq = 0; +#endif #ifdef SOCK_SEQPACKET // SOCK_SEQPACKET (since kernel 2.6.4) @@ -58,7 +69,46 @@ cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { esyslog("streamdev-client: couldn't open section filter socket: %m"); } - else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 || + // Set buffer for socketpair. During certain situations, such as startup, channel/transponder + // change, VDR may lag in reading data. Instead of discarding it, we can buffer it. + // Buffer size required may be up to 4MByte. + + if(StreamdevClientSetup.FilterSockBufSize) { + int sbs = StreamdevClientSetup.FilterSockBufSize; + int sbs2; + unsigned int sbss = sizeof(sbs); + int r; + + r = setsockopt(m_Pipe[1], SOL_SOCKET, SO_SNDBUF, (char *)&sbs, sbss); + + if(r < 0) { + isyslog("streamdev-client: setsockopt(SO_SNDBUF, %d) = %s", sbs, strerror(errno)); + } + sbs2 = 0; + r = getsockopt(m_Pipe[1], SOL_SOCKET, SO_SNDBUF, (char *)&sbs2, &sbss); + if(r < 0 || !sbss || !sbs2) { + isyslog("streamdev-client: getsockopt(SO_SNDBUF, &%d, &%d) = %s", sbs2, sbss, strerror(errno)); + } else { + // Linux actually returns double the requested size + // if everything works fine. And it actually buffers up to that double amount + // as can be seen from observing TIOCOUTQ (kernel 3.7/2014). + + if(sbs2 > sbs) + sbs2 /= 2; + if(sbs2 < sbs) { + if(FilterSockBufSize_warn != sbs2) { + isyslog("streamdev-client: ******************************************************"); + isyslog("streamdev-client: getsockopt(SO_SNDBUF) = %d < %d (configured).", sbs2, sbs); + isyslog("streamdev-client: Consider increasing system buffer size:"); + isyslog("streamdev-client: 'sysctl net.core.wmem_max=%d'", sbs); + isyslog("streamdev-client: ******************************************************"); + FilterSockBufSize_warn = sbs2; + } + } + } + } + + if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 || fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) { esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m"); } @@ -67,11 +117,12 @@ cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { cStreamdevFilter::~cStreamdevFilter() { Dprintf("~cStreamdevFilter %p\n", this); - // ownership of handle m_Pipe[0] has been transferred to VDR section handler - //if (m_Pipe[0] >= 0) - // close(m_Pipe[0]); - if (m_Pipe[1] >= 0) + if (m_Pipe[0] >= 0) { + close(m_Pipe[0]); + } + if (m_Pipe[1] >= 0) { close(m_Pipe[1]); + } } bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { @@ -94,13 +145,42 @@ bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3; if (m_Used == length) { m_Used = 0; - if (write(m_Pipe[1], m_Buffer, length) < 0) { - if(errno == EAGAIN || errno == EWOULDBLOCK) - dsyslog("cStreamdevFilter::PutSection socket overflow, " - "Pid %4d Tid %3d", m_Pid, m_Tid); +#ifdef TIOCOUTQ + // If we can determine the queue size of the socket, + // we flush rather then let the socket drop random packets. + // This ensures that we have more contiguous set of packets + // on the receiver side. + if(m_flushed) { + unsigned long queue = 0; + ioctl(m_Pipe[1], TIOCOUTQ, &queue); + if(queue > m_maxq) + m_maxq = queue; + if(queue * 2 < m_maxq) { + dsyslog("cStreamdevFilter::PutSection(Pid:%d Tid: %d): " + "Flushed %ld bytes, max queue: %ld", + m_Pid, m_Tid, m_flushed, m_maxq); + m_flushed = m_maxq = 0; - else + } else { + m_flushed += length; + } + } + if(!m_flushed) +#endif + if(write(m_Pipe[1], m_Buffer, length) < 0) { + if(errno != EAGAIN && errno != EWOULDBLOCK) { + dsyslog("cStreamdevFilter::PutSection(Pid:%d Tid: %d): error: %s", + m_Pid, m_Tid, strerror(errno)); return false; + } else { +#ifdef TIOCOUTQ + m_flushed += length; +#else + dsyslog("cStreamdevFilter::PutSection(Pid:%d Tid: %d): " + "Dropping packet %ld bytes (queue overflow)", + m_Pid, m_Tid, length); +#endif + } } } @@ -123,25 +203,6 @@ void cStreamdevFilter::Reset(void) { m_Used = 0; } -bool cStreamdevFilter::IsClosed(void) { - char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */ - - // Test if pipe/socket has been closed by writing empty section - if (write(m_Pipe[1], m_Buffer, 3) < 0 && - errno != EAGAIN && - errno != EWOULDBLOCK) { - - if (errno != ECONNREFUSED && - errno != ECONNRESET && - errno != EPIPE) - esyslog("cStreamdevFilter::IsClosed failed: %m"); - - return true; - } - - return false; -} - // --- cStreamdevFilters ----------------------------------------------------- cStreamdevFilters::cStreamdevFilters(cClientSocket *ClientSocket): @@ -155,8 +216,6 @@ cStreamdevFilters::~cStreamdevFilters() { } int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { - CarbageCollect(); - cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask); int fh = f->ReadPipe(); @@ -167,31 +226,18 @@ int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { return fh; } -void cStreamdevFilters::CarbageCollect(void) { +void cStreamdevFilters::CloseFilter(int Handle) { LOCK_THREAD; - for (cStreamdevFilter *fi = First(); fi;) { - if (fi->IsClosed()) { - if (errno == ECONNREFUSED || - errno == ECONNRESET || - errno == EPIPE) { - m_ClientSocket->SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false); - Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)", - (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); - - cStreamdevFilter *next = Prev(fi); - Del(fi); - fi = next ? Next(next) : First(); - } else { - esyslog("cStreamdevFilters::CarbageCollector() error: " - "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed", - (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); - LOG_ERROR; - fi = Next(fi); - } - } else { - fi = Next(fi); + + for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { + if(fi->ReadPipe() == Handle) { + // isyslog("cStreamdevFilters::CloseFilter(%d): Pid %4d, Tid %3d, Mask %2x (%d filters left)\n", + // Handle, (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + Del(fi); + return; } } + esyslog("cStreamdevFilters::CloseFilter(%d): failed (%d filters left)\n", Handle, Count()-1); } bool cStreamdevFilters::ReActivateFilters(void) @@ -199,7 +245,6 @@ bool cStreamdevFilters::ReActivateFilters(void) LOCK_THREAD; bool res = true; - CarbageCollect(); for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { res = m_ClientSocket->SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res; Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL"); |