diff options
| author | kwacker <vdr@w-i-r.com> | 2010-04-11 13:46:11 +0200 |
|---|---|---|
| committer | kwacker <vdr@w-i-r.com> | 2010-04-11 13:46:11 +0200 |
| commit | 9b144d30e0ea8ce900c37b96ba2cbdda14b0ae88 (patch) | |
| tree | 3a52de029f950dcd9f9856a53fd67abef8519e68 /plugins/streamdev/streamdev-cvs/client | |
| parent | 9cd931834ecadbf5efefdf484abb966e9248ebbb (diff) | |
| download | x-vdr-9b144d30e0ea8ce900c37b96ba2cbdda14b0ae88.tar.gz x-vdr-9b144d30e0ea8ce900c37b96ba2cbdda14b0ae88.tar.bz2 | |
Burn 0.2.0-beta3 und Streamdev mit Paches aktualisiert
Diffstat (limited to 'plugins/streamdev/streamdev-cvs/client')
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/CVS/Entries | 9 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/CVS/Repository | 1 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/CVS/Root | 1 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/device.c | 306 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/device.h | 68 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/filter.c | 292 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/filter.h | 33 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/setup.c | 80 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/setup.h | 39 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/socket.c | 374 | ||||
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/client/socket.h | 60 |
11 files changed, 1263 insertions, 0 deletions
diff --git a/plugins/streamdev/streamdev-cvs/client/CVS/Entries b/plugins/streamdev/streamdev-cvs/client/CVS/Entries new file mode 100644 index 0000000..8647f66 --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/CVS/Entries @@ -0,0 +1,9 @@ +/device.c/1.25/Wed Feb 17 12:39:03 2010// +/device.h/1.9/Tue Jun 23 10:26:54 2009// +/filter.c/1.14/Fri Feb 13 13:02:39 2009// +/filter.h/1.5/Mon Apr 7 14:27:28 2008// +/setup.c/1.9/Fri Sep 18 10:43:26 2009// +/setup.h/1.6/Fri Sep 18 10:43:26 2009// +/socket.c/1.12/Tue Apr 8 14:18:16 2008// +/socket.h/1.6/Mon Apr 7 14:40:40 2008// +D diff --git a/plugins/streamdev/streamdev-cvs/client/CVS/Repository b/plugins/streamdev/streamdev-cvs/client/CVS/Repository new file mode 100644 index 0000000..885b4a3 --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/CVS/Repository @@ -0,0 +1 @@ +streamdev/client diff --git a/plugins/streamdev/streamdev-cvs/client/CVS/Root b/plugins/streamdev/streamdev-cvs/client/CVS/Root new file mode 100644 index 0000000..2c7f6ce --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/CVS/Root @@ -0,0 +1 @@ +:pserver:anoncvs@vdr-developer.org:/var/cvsroot diff --git a/plugins/streamdev/streamdev-cvs/client/device.c b/plugins/streamdev/streamdev-cvs/client/device.c new file mode 100644 index 0000000..551d7c2 --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/device.c @@ -0,0 +1,306 @@ +/* + * $Id: device.c,v 1.25 2010/02/17 12:39:03 schmirl Exp $ + */ + +#include "client/device.h" +#include "client/setup.h" +#include "client/filter.h" + +#include "tools/select.h" + +#include <vdr/channels.h> +#include <vdr/ringbuffer.h> +#include <vdr/eit.h> +#include <vdr/timers.h> + +#include <time.h> +#include <iostream> + +using namespace std; + +#define VIDEOBUFSIZE MEGABYTE(3) + +cStreamdevDevice *cStreamdevDevice::m_Device = NULL; + +cStreamdevDevice::cStreamdevDevice(void) { + m_Channel = NULL; + m_TSBuffer = NULL; + + m_Filters = new cStreamdevFilters; + StartSectionHandler(); + isyslog("streamdev-client: got device number %d", CardIndex() + 1); + + m_Device = this; + m_Pids = 0; + m_DvrClosed = true; + + if (StreamdevClientSetup.SyncEPG) + ClientSocket.SynchronizeEPG(); +} + +cStreamdevDevice::~cStreamdevDevice() { + Dprintf("Device gets destructed\n"); + + Lock(); + m_Device = NULL; + m_Filters->SetConnection(-1); + ClientSocket.Quit(); + ClientSocket.Reset(); + Unlock(); + + Cancel(3); + +#if APIVERSNUM >= 10515 + StopSectionHandler(); +#endif + DELETENULL(m_Filters); + DELETENULL(m_TSBuffer); +} + +bool cStreamdevDevice::ProvidesSource(int Source) const { + Dprintf("ProvidesSource, Source=%d\n", Source); + return true; +} + +bool cStreamdevDevice::ProvidesTransponder(const cChannel *Channel) const +{ + Dprintf("ProvidesTransponder\n"); + return true; +} + +bool cStreamdevDevice::IsTunedToTransponder(const cChannel *Channel) +{ + bool res = false; + if (ClientSocket.DataSocket(siLive) != NULL + && TRANSPONDER(Channel, m_Channel) + && Channel->Ca() == CA_FTA + && m_Channel->Ca() == CA_FTA) + res = true; + return res; +} + +bool cStreamdevDevice::ProvidesChannel(const cChannel *Channel, int Priority, + bool *NeedsDetachReceivers) const { + bool res = false; + bool prio = Priority < 0 || Priority > this->Priority(); + bool ndr = false; + + if (!StreamdevClientSetup.StartClient) + return false; + + Dprintf("ProvidesChannel, Channel=%s, Prio=%d\n", Channel->Name(), Priority); + + if (StreamdevClientSetup.MinPriority <= StreamdevClientSetup.MaxPriority) + { + if (Priority < StreamdevClientSetup.MinPriority || + Priority > StreamdevClientSetup.MaxPriority) + return false; + } + else + { + if (Priority < StreamdevClientSetup.MinPriority && + Priority > StreamdevClientSetup.MaxPriority) + return false; + } + + if (ClientSocket.DataSocket(siLive) != NULL + && TRANSPONDER(Channel, m_Channel)) + res = true; + else { + res = prio && ClientSocket.ProvidesChannel(Channel, Priority); + ndr = true; + } + + if (NeedsDetachReceivers) + *NeedsDetachReceivers = ndr; + Dprintf("prov res = %d, ndr = %d\n", res, ndr); + return res; +} + +bool cStreamdevDevice::SetChannelDevice(const cChannel *Channel, + bool LiveView) { + Dprintf("SetChannelDevice Channel: %s, LiveView: %s\n", Channel->Name(), + LiveView ? "true" : "false"); + + if (LiveView) + return false; + + if (ClientSocket.DataSocket(siLive) != NULL + && TRANSPONDER(Channel, m_Channel) + && Channel->Ca() == CA_FTA + && m_Channel->Ca() == CA_FTA) + return true; + + DetachAllReceivers(); + m_Channel = Channel; + bool r = ClientSocket.SetChannelDevice(m_Channel); + Dprintf("setchanneldevice r=%d\n", r); + return r; +} + +bool cStreamdevDevice::SetPid(cPidHandle *Handle, int Type, bool On) { + Dprintf("SetPid, Pid=%d, Type=%d, On=%d, used=%d\n", Handle->pid, Type, On, + Handle->used); + LOCK_THREAD; + + if (On && !m_TSBuffer) { + Dprintf("SetPid: no data connection -> OpenDvr()"); + OpenDvrInt(); + } + + bool res = true; + if (Handle->pid && (On || !Handle->used)) { + res = ClientSocket.SetPid(Handle->pid, On); + + m_Pids += (!res) ? 0 : On ? 1 : -1; + if (m_Pids < 0) + m_Pids = 0; + + if(m_Pids < 1 && m_DvrClosed) { + Dprintf("SetPid: 0 pids left -> CloseDvr()"); + CloseDvrInt(); + } + } + + return res; +} + +bool cStreamdevDevice::OpenDvrInt(void) { + Dprintf("OpenDvrInt\n"); + LOCK_THREAD; + + CloseDvrInt(); + if (m_TSBuffer) { + Dprintf("cStreamdevDevice::OpenDvrInt(): DVR connection already open\n"); + return true; + } + + Dprintf("cStreamdevDevice::OpenDvrInt(): Connecting ...\n"); + if (ClientSocket.CreateDataConnection(siLive)) { + m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1); + return true; + } + esyslog("cStreamdevDevice::OpenDvrInt(): DVR connection FAILED"); + return false; +} + +bool cStreamdevDevice::OpenDvr(void) { + Dprintf("OpenDvr\n"); + LOCK_THREAD; + + m_DvrClosed = false; + return OpenDvrInt(); +} + +void cStreamdevDevice::CloseDvrInt(void) { + Dprintf("CloseDvrInt\n"); + LOCK_THREAD; + + if (ClientSocket.CheckConnection()) { + if (!m_DvrClosed) { + Dprintf("cStreamdevDevice::CloseDvrInt(): m_DvrClosed=false -> not closing yet\n"); + return; + } + if (m_Pids > 0) { + Dprintf("cStreamdevDevice::CloseDvrInt(): %d active pids -> not closing yet\n", m_Pids); + return; + } + } else { + Dprintf("cStreamdevDevice::CloseDvrInt(): Control connection gone !\n"); + } + + Dprintf("cStreamdevDevice::CloseDvrInt(): Closing DVR connection\n"); + // Hack for VDR 1.5.x clients (sometimes sending ABRT after TUNE) + // TODO: Find a clean solution to fix this + ClientSocket.SetChannelDevice(m_Channel); + ClientSocket.CloseDvr(); + DELETENULL(m_TSBuffer); +} + +void cStreamdevDevice::CloseDvr(void) { + Dprintf("CloseDvr\n"); + LOCK_THREAD; + + m_DvrClosed = true; + CloseDvrInt(); +} + +bool cStreamdevDevice::GetTSPacket(uchar *&Data) { + if (m_TSBuffer && m_Device) { + Data = m_TSBuffer->Get(); +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // simple disconnect detection + static int m_TSFails = 0; + if (!Data) { + LOCK_THREAD; + if(!ClientSocket.DataSocket(siLive)) { + return false; // triggers CloseDvr() + OpenDvr() in cDevice + } + cPoller Poller(*ClientSocket.DataSocket(siLive)); + errno = 0; + if (Poller.Poll() && !errno) { + char tmp[1]; + if (recv(*ClientSocket.DataSocket(siLive), tmp, 1, MSG_PEEK) == 0 && !errno) { +esyslog("cStreamDevice::GetTSPacket: GetChecked: NOTHING (%d)", m_TSFails); + m_TSFails++; + if (m_TSFails > 10) { + isyslog("cStreamdevDevice::GetTSPacket(): disconnected"); + m_Pids = 0; + CloseDvrInt(); + m_TSFails = 0; + return false; + } + return true; + } + } + m_TSFails = 0; + } +#endif + return true; + } + return false; +} + +int cStreamdevDevice::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { + Dprintf("OpenFilter\n"); + + if (!StreamdevClientSetup.StreamFilters) + return -1; + + + if (!ClientSocket.DataSocket(siLiveFilter)) { + if (ClientSocket.CreateDataConnection(siLiveFilter)) { + m_Filters->SetConnection(*ClientSocket.DataSocket(siLiveFilter)); + } else { + isyslog("cStreamdevDevice::OpenFilter: connect failed: %m"); + return -1; + } + } + + if (ClientSocket.SetFilter(Pid, Tid, Mask, true)) + return m_Filters->OpenFilter(Pid, Tid, Mask); + + return -1; +} + +bool cStreamdevDevice::Init(void) { + if (m_Device == NULL && StreamdevClientSetup.StartClient) + new cStreamdevDevice; + return true; +} + +bool cStreamdevDevice::ReInit(void) { + if(m_Device) { + m_Device->Lock(); + m_Device->m_Filters->SetConnection(-1); + m_Device->m_Pids = 0; + } + ClientSocket.Quit(); + ClientSocket.Reset(); + if (m_Device != NULL) { + //DELETENULL(m_Device->m_TSBuffer); + m_Device->Unlock(); + } + return StreamdevClientSetup.StartClient ? Init() : true; +} + diff --git a/plugins/streamdev/streamdev-cvs/client/device.h b/plugins/streamdev/streamdev-cvs/client/device.h new file mode 100644 index 0000000..e96b05f --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/device.h @@ -0,0 +1,68 @@ +/* + * $Id: device.h,v 1.9 2009/06/23 10:26:54 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_DEVICE_H +#define VDR_STREAMDEV_DEVICE_H + +#include <vdr/device.h> + +#include "client/socket.h" +#include "client/filter.h" + +class cTBString; + +#define CMD_LOCK_OBJ(x) cMutexLock CmdLock((cMutex*)&(x)->m_Mutex) + +class cStreamdevDevice: public cDevice { + friend class cRemoteRecordings; + +private: + const cChannel *m_Channel; + cTSBuffer *m_TSBuffer; + cStreamdevFilters *m_Filters; + int m_Pids; + bool m_DvrClosed; + + static cStreamdevDevice *m_Device; + + bool OpenDvrInt(void); + void CloseDvrInt(void); + +protected: + virtual bool SetChannelDevice(const cChannel *Channel, bool LiveView); + virtual bool HasLock(int TimeoutMs) + { + //printf("HasLock is %d\n", (ClientSocket.DataSocket(siLive) != NULL)); + //return ClientSocket.DataSocket(siLive) != NULL; + return true; + } + + virtual bool SetPid(cPidHandle *Handle, int Type, bool On); + virtual bool OpenDvr(void); + virtual void CloseDvr(void); + virtual bool GetTSPacket(uchar *&Data); + + virtual int OpenFilter(u_short Pid, u_char Tid, u_char Mask); + +public: + cStreamdevDevice(void); + virtual ~cStreamdevDevice(); + + virtual bool HasInternalCam(void) { return true; } + virtual bool ProvidesSource(int Source) const; + virtual bool ProvidesTransponder(const cChannel *Channel) const; + virtual bool ProvidesChannel(const cChannel *Channel, int Priority = -1, + bool *NeedsDetachReceivers = NULL) const; +#if APIVERSNUM >= 10700 + virtual int NumProvidedSystems(void) const { return 1; } +#endif + virtual bool IsTunedToTransponder(const cChannel *Channel); + + static bool Init(void); + static bool ReInit(void); + + static cStreamdevDevice *GetDevice(void) { return m_Device; } +}; + +#endif // VDR_STREAMDEV_DEVICE_H diff --git a/plugins/streamdev/streamdev-cvs/client/filter.c b/plugins/streamdev/streamdev-cvs/client/filter.c new file mode 100644 index 0000000..c187e05 --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/filter.c @@ -0,0 +1,292 @@ +/* + * $Id: filter.c,v 1.14 2009/02/13 13:02:39 schmirl Exp $ + */ + +#include "client/filter.h" +#include "client/socket.h" +#include "tools/select.h" +#include "common.h" + +#include <vdr/device.h> + +#define PID_MASK_HI 0x1F +// --- cStreamdevFilter ------------------------------------------------------ + +class cStreamdevFilter: public cListObject { +private: + uchar m_Buffer[4096]; + int m_Used; + int m_Pipe[2]; + u_short m_Pid; + u_char m_Tid; + u_char m_Mask; + +public: + cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask); + virtual ~cStreamdevFilter(); + + bool Matches(u_short Pid, u_char Tid); + bool PutSection(const uchar *Data, int Length, bool Pusi); + int ReadPipe(void) const { return m_Pipe[0]; } + + bool IsClosed(void); + void Reset(void); + + u_short Pid(void) const { return m_Pid; } + u_char Tid(void) const { return m_Tid; } + u_char Mask(void) const { return m_Mask; } +}; + +inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) { + return m_Pid == Pid && m_Tid == (Tid & m_Mask); +} + +cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) { + m_Used = 0; + m_Pid = Pid; + m_Tid = Tid; + m_Mask = Mask; + m_Pipe[0] = m_Pipe[1] = -1; + +#ifdef SOCK_SEQPACKET + // SOCK_SEQPACKET (since kernel 2.6.4) + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) { + esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM"); + } +#endif + if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) { + esyslog("streamdev-client: couldn't open section filter socket: %m"); + } + + else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 || + fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) { + esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m"); + } +} + +cStreamdevFilter::~cStreamdevFilter() { + Dprintf("~cStreamdevFilter %p\n", this); + + // ownership of handle m_Pipe[0] has been transferred to VDR section handler + //if (m_Pipe[0] >= 0) + // close(m_Pipe[0]); + if (m_Pipe[1] >= 0) + close(m_Pipe[1]); +} + +bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) { + + if (!m_Used && !Pusi) /* wait for payload unit start indicator */ + return true; + if (m_Used && Pusi) /* reset at payload unit start */ + Reset(); + + if (m_Used + Length >= (int)sizeof(m_Buffer)) { + esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)", + Length); + Reset(); + return true; + } + + memcpy(m_Buffer + m_Used, Data, Length); + m_Used += Length; + if (m_Used > 3) { + int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3; + if (m_Used == length) { + m_Used = 0; + if (write(m_Pipe[1], m_Buffer, length) < 0) { + if(errno == EAGAIN || errno == EWOULDBLOCK) + dsyslog("cStreamdevFilter::PutSection socket overflow, " + "Pid %4d Tid %3d", m_Pid, m_Tid); + + else + return false; + } + } + + if (m_Used > length) { + dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d " + "(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length); + if(Length < TS_SIZE-5) { + // TS packet not full -> this must be last TS packet of section data -> safe to reset now + Reset(); + } + } + + } + return true; +} + +void cStreamdevFilter::Reset(void) { + if(m_Used) + dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used); + m_Used = 0; +} + +bool cStreamdevFilter::IsClosed(void) { + char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */ + + // Test if pipe/socket has been closed by writing empty section + if (write(m_Pipe[1], m_Buffer, 3) < 0 && + errno != EAGAIN && + errno != EWOULDBLOCK) { + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) + esyslog("cStreamdevFilter::IsClosed failed: %m"); + + return true; + } + + return false; +} + +// --- cStreamdevFilters ----------------------------------------------------- + +cStreamdevFilters::cStreamdevFilters(void): + cThread("streamdev-client: sections assembler") { + m_TSBuffer = NULL; +} + +cStreamdevFilters::~cStreamdevFilters() { + SetConnection(-1); +} + +int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) { + CarbageCollect(); + + cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask); + int fh = f->ReadPipe(); + + Lock(); + Add(f); + Unlock(); + + return fh; +} + +void cStreamdevFilters::CarbageCollect(void) { + LOCK_THREAD; + for (cStreamdevFilter *fi = First(); fi;) { + if (fi->IsClosed()) { + if (errno == ECONNREFUSED || + errno == ECONNRESET || + errno == EPIPE) { + ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false); + Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + + cStreamdevFilter *next = Prev(fi); + Del(fi); + fi = next ? Next(next) : First(); + } else { + esyslog("cStreamdevFilters::CarbageCollector() error: " + "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed", + (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1); + LOG_ERROR; + fi = Next(fi); + } + } else { + fi = Next(fi); + } + } +} + +bool cStreamdevFilters::ReActivateFilters(void) +{ + LOCK_THREAD; + + bool res = true; + CarbageCollect(); + for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) { + res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res; + Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL"); + } + return res; +} + +void cStreamdevFilters::SetConnection(int Handle) { + + Cancel(2); + DELETENULL(m_TSBuffer); + + if (Handle >= 0) { + m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1); + ReActivateFilters(); + Start(); + } +} + +void cStreamdevFilters::Action(void) { + int fails = 0; + + while (Running()) { + const uchar *block = m_TSBuffer->Get(); + if (block) { + u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2]; + u_char tid = block[3]; + bool Pusi = block[1] & 0x40; + // proprietary extension + int len = block[4]; +#if 0 + if (block[1] == 0xff && + block[2] == 0xff && + block[3] == 0xff && + block[4] == 0x7f) + isyslog("*********** TRANSPONDER -> %s **********", block+5); +#endif + LOCK_THREAD; + cStreamdevFilter *f = First(); + while (f) { + cStreamdevFilter *next = Next(f); + if (f->Matches(pid, tid)) { + + if (f->PutSection(block + 5, len, Pusi)) + break; + + if (errno != ECONNREFUSED && + errno != ECONNRESET && + errno != EPIPE) { + Dprintf("FATAL ERROR: %m\n"); + esyslog("streamdev-client: couldn't send section packet: %m"); + } + ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false); + Del(f); + // Filter was closed. + // - need to check remaining filters for another match + } + f = next; + } + } else { +#if 1 // TODO: this should be fixed in vdr cTSBuffer + // Check disconnection + int fd = *ClientSocket.DataSocket(siLiveFilter); + if(fd < 0) + break; + cPoller Poller(fd); + if (Poller.Poll()) { + char tmp[1]; + errno = 0; + Dprintf("cStreamdevFilters::Action(): checking connection"); + if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) { + ++fails; + if (fails >= 10) { + esyslog("cStreamdevFilters::Action(): stream disconnected ?"); + ClientSocket.CloseDataConnection(siLiveFilter); + break; + } + } else { + fails = 0; + } + } else { + fails = 0; + } + cCondWait::SleepMs(10); +#endif + } + } + + DELETENULL(m_TSBuffer); + dsyslog("StreamdevFilters::Action() ended"); +} diff --git a/plugins/streamdev/streamdev-cvs/client/filter.h b/plugins/streamdev/streamdev-cvs/client/filter.h new file mode 100644 index 0000000..889006a --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/filter.h @@ -0,0 +1,33 @@ +/* + * $Id: filter.h,v 1.5 2008/04/07 14:27:28 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_FILTER_H +#define VDR_STREAMDEV_FILTER_H + +#include <vdr/config.h> +#include <vdr/tools.h> +#include <vdr/thread.h> + +class cTSBuffer; +class cStreamdevFilter; + +class cStreamdevFilters: public cList<cStreamdevFilter>, public cThread { +private: + cTSBuffer *m_TSBuffer; + +protected: + virtual void Action(void); + void CarbageCollect(void); + + bool ReActivateFilters(void); + +public: + cStreamdevFilters(void); + virtual ~cStreamdevFilters(); + + void SetConnection(int Handle); + int OpenFilter(u_short Pid, u_char Tid, u_char Mask); +}; + +#endif // VDR_STREAMDEV_FILTER_H diff --git a/plugins/streamdev/streamdev-cvs/client/setup.c b/plugins/streamdev/streamdev-cvs/client/setup.c new file mode 100644 index 0000000..2ac359f --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/setup.c @@ -0,0 +1,80 @@ +/* + * $Id: setup.c,v 1.9 2009/09/18 10:43:26 schmirl Exp $ + */ + +#include <vdr/menuitems.h> + +#include "client/setup.h" +#include "client/device.h" + +cStreamdevClientSetup StreamdevClientSetup; + +cStreamdevClientSetup::cStreamdevClientSetup(void) { + StartClient = false; + RemotePort = 2004; + StreamFilters = false; + SyncEPG = false; + HideMenuEntry = false; + MinPriority = -1; + MaxPriority = MAXPRIORITY; + strcpy(RemoteIp, ""); +} + +bool cStreamdevClientSetup::SetupParse(const char *Name, const char *Value) { + if (strcmp(Name, "StartClient") == 0) StartClient = atoi(Value); + else if (strcmp(Name, "RemoteIp") == 0) { + if (strcmp(Value, "-none-") == 0) + strcpy(RemoteIp, ""); + else + strcpy(RemoteIp, Value); + } + else if (strcmp(Name, "RemotePort") == 0) RemotePort = atoi(Value); + else if (strcmp(Name, "StreamFilters") == 0) StreamFilters = atoi(Value); + else if (strcmp(Name, "SyncEPG") == 0) SyncEPG = atoi(Value); + else if (strcmp(Name, "HideMenuEntry") == 0) HideMenuEntry = atoi(Value); + else if (strcmp(Name, "MinPriority") == 0) MinPriority = atoi(Value); + else if (strcmp(Name, "MaxPriority") == 0) MaxPriority = atoi(Value); + else return false; + return true; +} + +cStreamdevClientMenuSetupPage::cStreamdevClientMenuSetupPage(void) { + m_NewSetup = StreamdevClientSetup; + + Add(new cMenuEditBoolItem(tr("Hide Mainmenu Entry"), &m_NewSetup.HideMenuEntry)); + Add(new cMenuEditBoolItem(tr("Start Client"), &m_NewSetup.StartClient)); + Add(new cMenuEditIpItem (tr("Remote IP"), m_NewSetup.RemoteIp)); + Add(new cMenuEditIntItem (tr("Remote Port"), &m_NewSetup.RemotePort, 0, 65535)); + Add(new cMenuEditBoolItem(tr("Filter Streaming"), &m_NewSetup.StreamFilters)); + Add(new cMenuEditBoolItem(tr("Synchronize EPG"), &m_NewSetup.SyncEPG)); + Add(new cMenuEditIntItem (tr("Minimum Priority"), &m_NewSetup.MinPriority, -1, MAXPRIORITY)); + Add(new cMenuEditIntItem (tr("Maximum Priority"), &m_NewSetup.MaxPriority, -1, MAXPRIORITY)); + SetCurrent(Get(0)); +} + +cStreamdevClientMenuSetupPage::~cStreamdevClientMenuSetupPage() { +} + +void cStreamdevClientMenuSetupPage::Store(void) { + if (m_NewSetup.StartClient != StreamdevClientSetup.StartClient) { + if (m_NewSetup.StartClient) + cStreamdevDevice::Init(); + } + + SetupStore("StartClient", m_NewSetup.StartClient); + if (strcmp(m_NewSetup.RemoteIp, "") == 0) + SetupStore("RemoteIp", "-none-"); + else + SetupStore("RemoteIp", m_NewSetup.RemoteIp); + SetupStore("RemotePort", m_NewSetup.RemotePort); + SetupStore("StreamFilters", m_NewSetup.StreamFilters); + SetupStore("SyncEPG", m_NewSetup.SyncEPG); + SetupStore("HideMenuEntry", m_NewSetup.HideMenuEntry); + SetupStore("MinPriority", m_NewSetup.MinPriority); + SetupStore("MaxPriority", m_NewSetup.MaxPriority); + + StreamdevClientSetup = m_NewSetup; + + cStreamdevDevice::ReInit(); +} + diff --git a/plugins/streamdev/streamdev-cvs/client/setup.h b/plugins/streamdev/streamdev-cvs/client/setup.h new file mode 100644 index 0000000..f7cba08 --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/setup.h @@ -0,0 +1,39 @@ +/* + * $Id: setup.h,v 1.6 2009/09/18 10:43:26 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_SETUPCLIENT_H +#define VDR_STREAMDEV_SETUPCLIENT_H + +#include "common.h" + +struct cStreamdevClientSetup { + cStreamdevClientSetup(void); + + bool SetupParse(const char *Name, const char *Value); + + int StartClient; + char RemoteIp[20]; + int RemotePort; + int StreamFilters; + int SyncEPG; + int HideMenuEntry; + int MinPriority; + int MaxPriority; +}; + +extern cStreamdevClientSetup StreamdevClientSetup; + +class cStreamdevClientMenuSetupPage: public cMenuSetupPage { +private: + cStreamdevClientSetup m_NewSetup; + +protected: + virtual void Store(void); + +public: + cStreamdevClientMenuSetupPage(void); + virtual ~cStreamdevClientMenuSetupPage(); +}; + +#endif // VDR_STREAMDEV_SETUPCLIENT_H diff --git a/plugins/streamdev/streamdev-cvs/client/socket.c b/plugins/streamdev/streamdev-cvs/client/socket.c new file mode 100644 index 0000000..02f501d --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/socket.c @@ -0,0 +1,374 @@ +/* + * $Id: socket.c,v 1.12 2008/04/08 14:18:16 schmirl Exp $ + */ + +#include <tools/select.h> +#include <string.h> +#include <errno.h> +#include <stdlib.h> +#include <stdint.h> +#include <time.h> + +#define MINLOGREPEAT 10 //don't log connect failures too often (seconds) + +#include "client/socket.h" +#include "client/setup.h" +#include "common.h" + +cClientSocket ClientSocket; + +cClientSocket::cClientSocket(void) +{ + memset(m_DataSockets, 0, sizeof(cTBSocket*) * si_Count); + Reset(); +} + +cClientSocket::~cClientSocket() +{ + Reset(); + if (IsOpen()) Quit(); +} + +void cClientSocket::Reset(void) +{ + for (int it = 0; it < si_Count; ++it) { + if (m_DataSockets[it] != NULL) + DELETENULL(m_DataSockets[it]); + } +} + +cTBSocket *cClientSocket::DataSocket(eSocketId Id) const { + return m_DataSockets[Id]; +} + +bool cClientSocket::Command(const std::string &Command, uint Expected, uint TimeoutMs) +{ + errno = 0; + + std::string pkt = Command + "\015\012"; + Dprintf("OUT: |%s|\n", Command.c_str()); + + cTimeMs starttime; + if (!TimedWrite(pkt.c_str(), pkt.size(), TimeoutMs)) { + esyslog("Streamdev: Lost connection to %s:%d: %s", RemoteIp().c_str(), RemotePort(), + strerror(errno)); + Close(); + return false; + } + + uint64_t elapsed = starttime.Elapsed(); + if (Expected != 0) { // XXX+ What if elapsed > TimeoutMs? + TimeoutMs -= elapsed; + return Expect(Expected, NULL, TimeoutMs); + } + + return true; +} + +bool cClientSocket::Expect(uint Expected, std::string *Result, uint TimeoutMs) { + char *endptr; + int bufcount; + bool res; + + errno = 0; + + if ((bufcount = ReadUntil(m_Buffer, sizeof(m_Buffer) - 1, "\012", TimeoutMs)) == -1) { + esyslog("Streamdev: Lost connection to %s:%d: %s", RemoteIp().c_str(), RemotePort(), + strerror(errno)); + Close(); + return false; + } + if (m_Buffer[bufcount - 1] == '\015') + --bufcount; + m_Buffer[bufcount] = '\0'; + Dprintf("IN: |%s|\n", m_Buffer); + + if (Result != NULL) + *Result = m_Buffer; + + res = strtoul(m_Buffer, &endptr, 10) == Expected; + return res; +} + +bool cClientSocket::CheckConnection(void) { + CMD_LOCK; + + if (IsOpen()) { + cTBSelect select; + + Dprintf("connection open\n"); + + // XXX+ check if connection is still alive (is there a better way?) + // There REALLY shouldn't be anything readable according to PROTOCOL here + // If there is, assume it's an eof signal (subseq. read would return 0) + select.Add(*this, false); + int res; + if ((res = select.Select(0)) == 0) { + Dprintf("select said nothing happened\n"); + return true; + } + Dprintf("closing connection (res was %d)", res); + Close(); + } + + if (!Connect(StreamdevClientSetup.RemoteIp, StreamdevClientSetup.RemotePort)){ + static time_t lastTime = 0; + if (time(NULL) - lastTime > MINLOGREPEAT) { + esyslog("ERROR: Streamdev: Couldn't connect to %s:%d: %s", + (const char*)StreamdevClientSetup.RemoteIp, + StreamdevClientSetup.RemotePort, strerror(errno)); + lastTime = time(NULL); + } + return false; + } + + if (!Expect(220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Didn't receive greeting from %s:%d", + RemoteIp().c_str(), RemotePort()); + Close(); + return false; + } + + if (!Command("CAPS TSPIDS", 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't negotiate capabilities on %s:%d", + RemoteIp().c_str(), RemotePort()); + Close(); + return false; + } + + const char *Filters = ""; + if(Command("CAPS FILTERS", 220)) + Filters = ",FILTERS"; + + isyslog("Streamdev: Connected to server %s:%d using capabilities TSPIDS%s", + RemoteIp().c_str(), RemotePort(), Filters); + return true; +} + +bool cClientSocket::ProvidesChannel(const cChannel *Channel, int Priority) { + if (!CheckConnection()) return false; + + CMD_LOCK; + + std::string command = (std::string)"PROV " + (const char*)itoa(Priority) + " " + + (const char*)Channel->GetChannelID().ToString(); + if (!Command(command)) + return false; + + std::string buffer; + if (!Expect(220, &buffer)) { + if (buffer.substr(0, 3) != "560" && errno == 0) + esyslog("ERROR: Streamdev: Couldn't check if %s:%d provides channel %s", + RemoteIp().c_str(), RemotePort(), Channel->Name()); + return false; + } + return true; +} + +bool cClientSocket::CreateDataConnection(eSocketId Id) { + cTBSocket listen(SOCK_STREAM); + + if (!CheckConnection()) return false; + + if (m_DataSockets[Id] != NULL) + DELETENULL(m_DataSockets[Id]); + + if (!listen.Listen(LocalIp(), 0, 1)) { + esyslog("ERROR: Streamdev: Couldn't create data connection: %s", + strerror(errno)); + return false; + } + + std::string command = (std::string)"PORT " + (const char*)itoa(Id) + " " + + LocalIp().c_str() + "," + + (const char*)itoa((listen.LocalPort() >> 8) & 0xff) + "," + + (const char*)itoa(listen.LocalPort() & 0xff); + size_t idx = 4; + while ((idx = command.find('.', idx + 1)) != (size_t)-1) + command[idx] = ','; + + CMD_LOCK; + + if (!Command(command, 220)) { + Dprintf("error: %m\n"); + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't establish data connection to %s:%d", + RemoteIp().c_str(), RemotePort()); + return false; + } + + /* The server SHOULD do the following: + * - get PORT command + * - connect to socket + * - return 220 + */ + + m_DataSockets[Id] = new cTBSocket; + if (!m_DataSockets[Id]->Accept(listen)) { + esyslog("ERROR: Streamdev: Couldn't establish data connection to %s:%d%s%s", + RemoteIp().c_str(), RemotePort(), errno == 0 ? "" : ": ", + errno == 0 ? "" : strerror(errno)); + DELETENULL(m_DataSockets[Id]); + return false; + } + + return true; +} + +bool cClientSocket::CloseDataConnection(eSocketId Id) { + //if (!CheckConnection()) return false; + + CMD_LOCK; + + if(Id == siLive || Id == siLiveFilter) + if (m_DataSockets[Id] != NULL) { + std::string command = (std::string)"ABRT " + (const char*)itoa(Id); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't cleanly close data connection"); + //return false; + } + DELETENULL(m_DataSockets[Id]); + } + return true; +} + +bool cClientSocket::SetChannelDevice(const cChannel *Channel) { + if (!CheckConnection()) return false; + + CMD_LOCK; + + std::string command = (std::string)"TUNE " + + (const char*)Channel->GetChannelID().ToString(); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't tune %s:%d to channel %s", + RemoteIp().c_str(), RemotePort(), Channel->Name()); + return false; + } + return true; +} + +bool cClientSocket::SetPid(int Pid, bool On) { + if (!CheckConnection()) return false; + + CMD_LOCK; + + std::string command = (std::string)(On ? "ADDP " : "DELP ") + (const char*)itoa(Pid); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("Streamdev: Pid %d not available from %s:%d", Pid, LocalIp().c_str(), + LocalPort()); + return false; + } + return true; +} + +bool cClientSocket::SetFilter(ushort Pid, uchar Tid, uchar Mask, bool On) { + if (!CheckConnection()) return false; + + CMD_LOCK; + + std::string command = (std::string)(On ? "ADDF " : "DELF ") + (const char*)itoa(Pid) + + " " + (const char*)itoa(Tid) + " " + (const char*)itoa(Mask); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("Streamdev: Filter %hu, %hhu, %hhu not available from %s:%d", + Pid, Tid, Mask, LocalIp().c_str(), LocalPort()); + return false; + } + return true; +} + +bool cClientSocket::CloseDvr(void) { + if (!CheckConnection()) return false; + + CMD_LOCK; + + if (m_DataSockets[siLive] != NULL) { + std::string command = (std::string)"ABRT " + (const char*)itoa(siLive); + if (!Command(command, 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't cleanly close data connection"); + return false; + } + + DELETENULL(m_DataSockets[siLive]); + } + return true; +} + +bool cClientSocket::SynchronizeEPG(void) { + std::string buffer; + bool result; + FILE *epgfd; + + if (!CheckConnection()) return false; + + isyslog("Streamdev: Synchronizing EPG from server\n"); + + CMD_LOCK; + + if (!Command("LSTE")) + return false; + + if ((epgfd = tmpfile()) == NULL) { + esyslog("ERROR: Streamdev: Error while processing EPG data: %s", + strerror(errno)); + return false; + } + + while ((result = Expect(215, &buffer))) { + if (buffer[3] == ' ') break; + fputs(buffer.c_str() + 4, epgfd); + fputc('\n', epgfd); + } + + if (!result) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't fetch EPG data from %s:%d", + RemoteIp().c_str(), RemotePort()); + fclose(epgfd); + return false; + } + + rewind(epgfd); + if (cSchedules::Read(epgfd)) + cSchedules::Cleanup(true); + else { + esyslog("ERROR: Streamdev: Parsing EPG data failed"); + fclose(epgfd); + return false; + } + fclose(epgfd); + return true; +} + +bool cClientSocket::Quit(void) { + bool res; + + if (!CheckConnection()) return false; + + if (!(res = Command("QUIT", 221))) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't quit command connection to %s:%d", + RemoteIp().c_str(), RemotePort()); + } + Close(); + return res; +} + +bool cClientSocket::SuspendServer(void) { + if (!CheckConnection()) return false; + + CMD_LOCK; + + if (!Command("SUSP", 220)) { + if (errno == 0) + esyslog("ERROR: Streamdev: Couldn't suspend server"); + return false; + } + return true; +} diff --git a/plugins/streamdev/streamdev-cvs/client/socket.h b/plugins/streamdev/streamdev-cvs/client/socket.h new file mode 100644 index 0000000..a0400e6 --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/client/socket.h @@ -0,0 +1,60 @@ +/* + * $Id: socket.h,v 1.6 2008/04/07 14:40:40 schmirl Exp $ + */ + +#ifndef VDR_STREAMDEV_CLIENT_CONNECTION_H +#define VDR_STREAMDEV_CLIENT_CONNECTION_H + +#include <tools/socket.h> + +#include "common.h" + +#include <string> + +#define CMD_LOCK cMutexLock CmdLock((cMutex*)&m_Mutex) + +class cPES2TSRemux; + +class cClientSocket: public cTBSocket { +private: + cTBSocket *m_DataSockets[si_Count]; + cMutex m_Mutex; + char m_Buffer[BUFSIZ + 1]; // various uses + +protected: + /* Send Command, and return true if the command results in Expected. + Returns false on failure, setting errno appropriately if it has been + a system failure. If Expected is zero, returns immediately after + sending the command. */ + bool Command(const std::string &Command, uint Expected = 0, uint TimeoutMs = 1500); + + /* Fetch results from an ongoing Command called with Expected == 0. Returns + true if the response has the code Expected, returning an internal buffer + in the array pointer pointed to by Result. Returns false on failure, + setting errno appropriately if it has been a system failure. */ + bool Expect(uint Expected, std::string *Result = NULL, uint TimeoutMs = 1500); + +public: + cClientSocket(void); + virtual ~cClientSocket(); + + void Reset(void); + + bool CheckConnection(void); + bool ProvidesChannel(const cChannel *Channel, int Priority); + bool CreateDataConnection(eSocketId Id); + bool CloseDataConnection(eSocketId Id); + bool SetChannelDevice(const cChannel *Channel); + bool SetPid(int Pid, bool On); + bool SetFilter(ushort Pid, uchar Tid, uchar Mask, bool On); + bool CloseDvr(void); + bool SynchronizeEPG(void); + bool SuspendServer(void); + bool Quit(void); + + cTBSocket *DataSocket(eSocketId Id) const; +}; + +extern class cClientSocket ClientSocket; + +#endif // VDR_STREAMDEV_CLIENT_CONNECTION_H |
