diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/device.c | 155 | ||||
-rw-r--r-- | client/device.h | 10 | ||||
-rw-r--r-- | client/filter.c | 265 | ||||
-rw-r--r-- | client/filter.h | 45 | ||||
-rw-r--r-- | client/socket.c | 20 | ||||
-rw-r--r-- | client/socket.h | 3 |
6 files changed, 391 insertions, 107 deletions
diff --git a/client/device.c b/client/device.c index a2e4580..be79bb9 100644 --- a/client/device.c +++ b/client/device.c @@ -1,5 +1,5 @@ /* - * $Id: device.c,v 1.8 2007/01/15 12:15:12 schmirl Exp $ + * $Id: device.c,v 1.13 2007/05/07 12:18:18 schmirl Exp $ */ #include "client/device.h" @@ -42,6 +42,8 @@ cStreamdevDevice::cStreamdevDevice(void) { #endif m_Device = this; + m_Pids = 0; + m_DvrClosed = true; if (StreamdevClientSetup.SyncEPG) ClientSocket.SynchronizeEPG(); @@ -59,13 +61,22 @@ cStreamdevDevice::~cStreamdevDevice() { bool cStreamdevDevice::ProvidesSource(int Source) const { Dprintf("ProvidesSource, Source=%d\n", Source); - return false; + return true; } bool cStreamdevDevice::ProvidesTransponder(const cChannel *Channel) const { Dprintf("ProvidesTransponder\n"); - return false; + return true; +} + +bool cStreamdevDevice::IsTunedToTransponder(const cChannel *Channel) +{ + bool res = false; + if (ClientSocket.DataSocket(siLive) != NULL + && TRANSPONDER(Channel, m_Channel)) + res = true; + return res; } bool cStreamdevDevice::ProvidesChannel(const cChannel *Channel, int Priority, @@ -123,37 +134,118 @@ bool cStreamdevDevice::SetChannelDevice(const cChannel *Channel, bool cStreamdevDevice::SetPid(cPidHandle *Handle, int Type, bool On) { Dprintf("SetPid, Pid=%d, Type=%d, On=%d, used=%d\n", Handle->pid, Type, On, Handle->used); - if (Handle->pid && (On || !Handle->used)) - return ClientSocket.SetPid(Handle->pid, On); - return true; + LOCK_THREAD; + + if (On && !m_TSBuffer) { + Dprintf("SetPid: no data connection -> OpenDvr()"); + OpenDvrInt(); + } + + bool res = true; + if (Handle->pid && (On || !Handle->used)) { + res = ClientSocket.SetPid(Handle->pid, On); + + m_Pids += (!res) ? 0 : On ? 1 : -1; + if (m_Pids < 0) + m_Pids = 0; + + if(m_Pids < 1 && m_DvrClosed) { + Dprintf("SetPid: 0 pids left -> CloseDvr()"); + CloseDvrInt(); + } + } + + return res; } -bool cStreamdevDevice::OpenDvr(void) { - Dprintf("OpenDvr\n"); - CloseDvr(); +bool cStreamdevDevice::OpenDvrInt(void) { + Dprintf("OpenDvrInt\n"); + LOCK_THREAD; + + CloseDvrInt(); + if (m_TSBuffer) { + Dprintf("cStreamdevDevice::OpenDvrInt(): DVR connection already open\n"); + return true; + } + + Dprintf("cStreamdevDevice::OpenDvrInt(): Connecting ...\n"); if (ClientSocket.CreateDataConnection(siLive)) { - //m_Assembler = new cStreamdevAssembler(ClientSocket.DataSocket(siLive)); - //m_TSBuffer = new cTSBuffer(m_Assembler->ReadPipe(), MEGABYTE(2), CardIndex() + 1); m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1); - Dprintf("waiting\n"); - //m_Assembler->WaitForFill(); - Dprintf("resuming\n"); return true; } + esyslog("cStreamdevDevice::OpenDvrInt(): DVR connection FAILED"); return false; } -void cStreamdevDevice::CloseDvr(void) { - Dprintf("CloseDvr\n"); +bool cStreamdevDevice::OpenDvr(void) { + Dprintf("OpenDvr\n"); + LOCK_THREAD; + + m_DvrClosed = false; + return OpenDvrInt(); +} - //DELETENULL(m_Assembler); +void cStreamdevDevice::CloseDvrInt(void) { + Dprintf("CloseDvrInt\n"); + LOCK_THREAD; + + if (ClientSocket.CheckConnection()) { + if (!m_DvrClosed) { + Dprintf("cStreamdevDevice::CloseDvrInt(): m_DvrClosed=false -> not closing yet\n"); + return; + } + if (m_Pids > 0) { + Dprintf("cStreamdevDevice::CloseDvrInt(): %d active pids -> not closing yet\n", m_Pids); + return; + } + } else { + Dprintf("cStreamdevDevice::CloseDvrInt(): Control connection gone !\n"); + } + + Dprintf("cStreamdevDevice::CloseDvrInt(): Closing DVR connection\n"); DELETENULL(m_TSBuffer); ClientSocket.CloseDvr(); } +void cStreamdevDevice::CloseDvr(void) { + Dprintf("CloseDvr\n"); + LOCK_THREAD; + + m_DvrClosed = true; + CloseDvrInt(); +} + bool cStreamdevDevice::GetTSPacket(uchar *&Data) { if (m_TSBuffer) { Data = m_TSBuffer->Get(); +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // simple disconnect detection + static int m_TSFails = 0; + if (!Data) { + LOCK_THREAD; + if(!ClientSocket.DataSocket(siLive)) { + return false; // triggers CloseDvr() + OpenDvr() in cDevice + } + cPoller Poller(*ClientSocket.DataSocket(siLive)); + errno = 0; + if (Poller.Poll() && !errno) { + char tmp[1]; + if (recv(*ClientSocket.DataSocket(siLive), tmp, 1, MSG_PEEK) == 0 && !errno) { +esyslog("cStreamDevice::GetTSPacket: GetChecked: NOTHING (%d)", m_TSFails); + m_TSFails++; + if (m_TSFails > 10) { + isyslog("cStreamdevDevice::GetTSPacket(): disconnected"); + m_Pids = 0; + CloseDvrInt(); + m_TSFails = 0; + return false; + } + return true; + } + } + m_TSFails = 0; + } +#endif return true; } return false; @@ -162,11 +254,24 @@ bool cStreamdevDevice::GetTSPacket(uchar *&Data) { #if VDRVERSNUM >= 10300 int cStreamdevDevice::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { Dprintf("OpenFilter\n"); - if (StreamdevClientSetup.StreamFilters - && ClientSocket.SetFilter(Pid, Tid, Mask, true)) { - return m_Filters->OpenFilter(Pid, Tid, Mask); - } else + + if (!StreamdevClientSetup.StreamFilters) return -1; + + + if (!ClientSocket.DataSocket(siLiveFilter)) { + if (ClientSocket.CreateDataConnection(siLiveFilter)) { + m_Filters->SetConnection(*ClientSocket.DataSocket(siLiveFilter)); + } else { + isyslog("cStreamdevDevice::OpenFilter: connect failed: %m"); + return -1; + } + } + + if (ClientSocket.SetFilter(Pid, Tid, Mask, true)) + return m_Filters->OpenFilter(Pid, Tid, Mask); + + return -1; } #endif @@ -177,11 +282,17 @@ bool cStreamdevDevice::Init(void) { } bool cStreamdevDevice::ReInit(void) { + if(m_Device) { + m_Device->Lock(); + m_Device->m_Filters->SetConnection(-1); + m_Device->m_Pids = 0; + } ClientSocket.Quit(); ClientSocket.Reset(); if (m_Device != NULL) { - DELETENULL(m_Device->m_TSBuffer); + //DELETENULL(m_Device->m_TSBuffer); DELETENULL(m_Device->m_Assembler); + m_Device->Unlock(); } return StreamdevClientSetup.StartClient ? Init() : true; } diff --git a/client/device.h b/client/device.h index bdfabb6..525c1d4 100644 --- a/client/device.h +++ b/client/device.h @@ -1,5 +1,5 @@ /* - * $Id: device.h,v 1.3 2005/02/08 15:21:19 lordjaxom Exp $ + * $Id: device.h,v 1.5 2007/04/24 10:43:40 schmirl Exp $ */ #ifndef VDR_STREAMDEV_DEVICE_H @@ -25,9 +25,14 @@ private: #if VDRVERSNUM >= 10307 cStreamdevFilters *m_Filters; #endif + int m_Pids; + bool m_DvrClosed; static cStreamdevDevice *m_Device; + bool OpenDvrInt(void); + void CloseDvrInt(void); + protected: virtual bool SetChannelDevice(const cChannel *Channel, bool LiveView); virtual bool HasLock(int TimeoutMs) @@ -51,9 +56,10 @@ public: virtual ~cStreamdevDevice(); virtual bool ProvidesSource(int Source) const; - virtual bool ProvidesTransponder(const cChannel *Channel) const; + virtual bool ProvidesTransponder(const cChannel *Channel) const; virtual bool ProvidesChannel(const cChannel *Channel, int Priority = -1, bool *NeedsDetachReceivers = NULL) const; + virtual bool IsTunedToTransponder(const cChannel *Channel); static bool Init(void); static bool ReInit(void); diff --git a/client/filter.c b/client/filter.c index daf534a..91af8b7 100644 --- a/client/filter.c +++ b/client/filter.c @@ -1,5 +1,5 @@ /* - * $Id: filter.c,v 1.3 2005/11/06 16:43:58 lordjaxom Exp $ + * $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $ */ #include "client/filter.h" @@ -7,113 +7,288 @@ #include "tools/select.h" #include "common.h" -#include <vdr/ringbuffer.h> #include <vdr/device.h> #if VDRVERSNUM >= 10300 +// --- cStreamdevFilter ------------------------------------------------------ + +class cStreamdevFilter: public cListObject { +private: + uchar m_Buffer[4096]; + int m_Used; + int m_Pipe[2]; + u_short m_Pid; + u_char m_Tid; + u_char m_Mask; + +public: + cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); + virtual ~cStreamdevFilter(); + + bool Matches(u_short Pid, u_char Tid); + bool PutSection(const uchar *Data, int Length, bool Pusi); + int ReadPipe(void) const { return m_Pipe[0]; } + + bool IsClosed(void); + void Reset(void); + + u_short Pid(void) const { return m_Pid; } + u_char Tid(void) const { return m_Tid; } + u_char Mask(void) const { return m_Mask; } +}; + +inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) { + return m_Pid == Pid && m_Tid == (Tid & m_Mask); +} + cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { m_Used = 0; m_Pid = Pid; m_Tid = Tid; m_Mask = Mask; + m_Pipe[0] = m_Pipe[1] = -1; - if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) { - esyslog("streamev-client: coudln't open section filter pipe: %m"); - m_Pipe[0] = m_Pipe[1] = -1; +#ifdef SOCK_SEQPACKET + // SOCK_SEQPACKET (since kernel 2.6.4) + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) { + esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM"); + } +#endif + if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) { + esyslog("streamdev-client: couldn't open section filter socket: %m"); + } + + else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 || + fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) { + esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m"); } } cStreamdevFilter::~cStreamdevFilter() { Dprintf("~cStreamdevFilter %p\n", this); - if (m_Pipe[0] >= 0) - close(m_Pipe[0]); + + // ownership of handle m_Pipe[0] has been transferred to VDR section handler + //if (m_Pipe[0] >= 0) + // close(m_Pipe[0]); if (m_Pipe[1] >= 0) close(m_Pipe[1]); } -bool cStreamdevFilter::PutSection(const uchar *Data, int Length) { +bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { + + if (!m_Used && !Pusi) /* wait for payload unit start indicator */ + return true; + if (m_Used && Pusi) /* reset at payload unit start */ + Reset(); + if (m_Used + Length >= (int)sizeof(m_Buffer)) { esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)", Length); - m_Used = 0; + Reset(); return true; } + memcpy(m_Buffer + m_Used, Data, Length); m_Used += Length; - if (m_Used > 3) { int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3; if (m_Used == length) { - if (write(m_Pipe[1], m_Buffer, length) < 0) - return false; m_Used = 0; + if (write(m_Pipe[1], m_Buffer, length) < 0) { + if(errno == EAGAIN || errno == EWOULDBLOCK) + dsyslog("cStreamdevFilter::PutSection socket overflow, " + "Pid %4d Tid %3d", m_Pid, m_Tid); + + else + return false; + } } + + if (m_Used > length) { + dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d " + "(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length); + if(Length < TS_SIZE-5) { + // TS packet not full -> this must be last TS packet of section data -> safe to reset now + Reset(); + } + } + } return true; } +void cStreamdevFilter::Reset(void) { + if(m_Used) + dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used); + m_Used = 0; +} + +bool cStreamdevFilter::IsClosed(void) { + char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */ + + // Test if pipe/socket has been closed by writing empty section + if (write(m_Pipe[1], m_Buffer, 3) < 0 && + errno != EAGAIN && + errno != EWOULDBLOCK) { + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) + esyslog("cStreamdevFilter::IsClosed failed: %m"); + + return true; + } + + return false; +} + +// --- cStreamdevFilters ----------------------------------------------------- + cStreamdevFilters::cStreamdevFilters(void): cThread("streamdev-client: sections assembler") { - m_Active = false; - m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true); - Start(); + m_TSBuffer = NULL; } cStreamdevFilters::~cStreamdevFilters() { - if (m_Active) { - m_Active = false; - Cancel(3); - } - delete m_RingBuffer; + SetConnection(-1); } int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { + CarbageCollect(); + cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask); + int fh = f->ReadPipe(); + + Lock(); Add(f); - return f->ReadPipe(); + Unlock(); + + return fh; +} + +void cStreamdevFilters::CarbageCollect(void) { + LOCK_THREAD; + for (cStreamdevFilter *fi = First(); fi;) { + if (fi->IsClosed()) { + if (errno == ECONNREFUSED || + errno == ECONNRESET || + errno == EPIPE) { + ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false); + Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + + cStreamdevFilter *next = Prev(fi); + Del(fi); + fi = next ? Next(next) : First(); + } else { + esyslog("cStreamdevFilters::CarbageCollector() error: " + "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + LOG_ERROR; + fi = Next(fi); + } + } else { + fi = Next(fi); + } + } } -cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) { - for (cStreamdevFilter *f = First(); f; f = Next(f)) { - if (f->Matches(Pid, Tid)) - return f; +bool cStreamdevFilters::ReActivateFilters(void) +{ + LOCK_THREAD; + + bool res = true; + CarbageCollect(); + for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { + res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res; + Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL"); } - return NULL; + return res; } -void cStreamdevFilters::Put(const uchar *Data) { - int p = m_RingBuffer->Put(Data, TS_SIZE); - if (p != TS_SIZE) - m_RingBuffer->ReportOverflow(TS_SIZE - p); +void cStreamdevFilters::SetConnection(int Handle) { + + Cancel(2); + DELETENULL(m_TSBuffer); + + if (Handle >= 0) { + m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1); + ReActivateFilters(); + Start(); + } } void cStreamdevFilters::Action(void) { - m_Active = true; - while (m_Active) { - int recvd; - const uchar *block = m_RingBuffer->Get(recvd); + int fails = 0; - if (block && recvd > 0) { - cStreamdevFilter *f; + while (Running()) { + const uchar *block = m_TSBuffer->Get(); + if (block) { u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2]; u_char tid = block[3]; - - if ((f = Matches(pid, tid)) != NULL) { - int len = block[4]; - if (!f->PutSection(block + 5, len)) { - if (errno != EPIPE) { - esyslog("streamdev-client: couldn't send section packet: %m"); + bool Pusi = block[1] & 0x40; + int len = block[4]; +#if 0 + if (block[1] == 0xff && + block[2] == 0xff && + block[3] == 0xff && + block[4] == 0x7f) + isyslog("*********** TRANSPONDER -> %s **********", block+5); +#endif + LOCK_THREAD; + cStreamdevFilter *f = First(); + while (f) { + cStreamdevFilter *next = Next(f); + if (f->Matches(pid, tid)) { + + if (f->PutSection(block + 5, len, Pusi)) + break; + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) { Dprintf("FATAL ERROR: %m\n"); + esyslog("streamdev-client: couldn't send section packet: %m"); } ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false); Del(f); + // Filter was closed. + // - need to check remaining filters for another match } + f = next; } - m_RingBuffer->Del(TS_SIZE); - } else - usleep(1); + } else { +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // Check disconnection + int fd = *ClientSocket.DataSocket(siLiveFilter); + if(fd < 0) + break; + cPoller Poller(fd); + if (Poller.Poll()) { + char tmp[1]; + errno = 0; + Dprintf("cStreamdevFilters::Action(): checking connection"); + if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) { + ++fails; + if (fails >= 10) { + esyslog("cStreamdevFilters::Action(): stream disconnected ?"); + ClientSocket.CloseDataConnection(siLiveFilter); + break; + } + } else { + fails = 0; + } + } else { + fails = 0; + } + cCondWait::SleepMs(10); +#endif + } } + + DELETENULL(m_TSBuffer); + dsyslog("StreamdevFilters::Action() ended"); } #endif // VDRVERSNUM >= 10300 diff --git a/client/filter.h b/client/filter.h index cb46bf0..e0a1575 100644 --- a/client/filter.h +++ b/client/filter.h @@ -1,5 +1,5 @@ /* - * $Id: filter.h,v 1.1.1.1 2004/12/30 22:44:04 lordjaxom Exp $ + * $Id: filter.h,v 1.4 2007/04/24 11:23:16 schmirl Exp $ */ #ifndef VDR_STREAMDEV_FILTER_H @@ -12,52 +12,25 @@ #include <vdr/tools.h> #include <vdr/thread.h> -class cRingBufferFrame; -class cRingBufferLinear; - -class cStreamdevFilter: public cListObject { -private: - uchar m_Buffer[4096]; - int m_Used; - int m_Pipe[2]; - u_short m_Pid; - u_char m_Tid; - u_char m_Mask; - cRingBufferFrame *m_RingBuffer; - -public: - cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); - virtual ~cStreamdevFilter(); - - bool Matches(u_short Pid, u_char Tid); - bool PutSection(const uchar *Data, int Length); - int ReadPipe(void) const { return m_Pipe[0]; } - - u_short Pid(void) const { return m_Pid; } - u_char Tid(void) const { return m_Tid; } - u_char Mask(void) const { return m_Mask; } - -}; - -inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) { - return m_Pid == Pid && m_Tid == (Tid & m_Mask); -} +class cTSBuffer; +class cStreamdevFilter; class cStreamdevFilters: public cList<cStreamdevFilter>, public cThread { private: - bool m_Active; - cRingBufferLinear *m_RingBuffer; - + cTSBuffer *m_TSBuffer; + protected: virtual void Action(void); + void CarbageCollect(void); + + bool ReActivateFilters(void); public: cStreamdevFilters(void); virtual ~cStreamdevFilters(); + void SetConnection(int Handle); int OpenFilter(u_short Pid, u_char Tid, u_char Mask); - cStreamdevFilter *Matches(u_short Pid, u_char Tid); - void Put(const uchar *Data); }; # endif // VDRVERSNUM >= 10300 diff --git a/client/socket.c b/client/socket.c index e59c705..5db6efe 100644 --- a/client/socket.c +++ b/client/socket.c @@ -1,5 +1,5 @@ /* - * $Id: socket.c,v 1.7 2007/01/15 11:45:48 schmirl Exp $ + * $Id: socket.c,v 1.8 2007/04/24 10:57:34 schmirl Exp $ */ #include <tools/select.h> @@ -215,6 +215,24 @@ bool cClientSocket::CreateDataConnection(eSocketId Id) { return true; } +bool cClientSocket::CloseDataConnection(eSocketId Id) { + //if (!CheckConnection()) return false; + + CMD_LOCK; + + if(Id == siLive || Id == siLiveFilter) + if (m_DataSockets[Id] != NULL) { + std::string command = (std::string)"ABRT " + (const char*)itoa(Id); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't cleanly close data connection"); + //return false; + } + DELETENULL(m_DataSockets[Id]); + } + return true; +} + bool cClientSocket::SetChannelDevice(const cChannel *Channel) { if (!CheckConnection()) return false; diff --git a/client/socket.h b/client/socket.h index 17478ce..e839223 100644 --- a/client/socket.h +++ b/client/socket.h @@ -1,5 +1,5 @@ /* - * $Id: socket.h,v 1.3 2005/02/08 17:22:35 lordjaxom Exp $ + * $Id: socket.h,v 1.4 2007/04/24 10:57:34 schmirl Exp $ */ #ifndef VDR_STREAMDEV_CLIENT_CONNECTION_H @@ -47,6 +47,7 @@ public: bool CheckConnection(void); bool ProvidesChannel(const cChannel *Channel, int Priority); bool CreateDataConnection(eSocketId Id); + bool CloseDataConnection(eSocketId Id); bool SetChannelDevice(const cChannel *Channel); bool SetPid(int Pid, bool On); #if VDRVERSNUM >= 10300 |