summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrank Schmirler <vdr@schmirler.de>2010-12-02 08:59:14 +0100
committerFrank Schmirler <vdr@schmirler.de>2010-12-02 08:59:14 +0100
commit5a270cc3ab659a98b4bb674acb77982f7e1ecb14 (patch)
tree5f2f51c096f192a9b32af9ffd8244eeb6637ad06
parente6249bf957a943920b11abbd9efac1efa18b1d00 (diff)
downloadvdr-plugin-streamdev-5a270cc3ab659a98b4bb674acb77982f7e1ecb14.tar.gz
vdr-plugin-streamdev-5a270cc3ab659a98b4bb674acb77982f7e1ecb14.tar.bz2
Snapshot 2007-05-09
-rw-r--r--Makefile4
-rw-r--r--client/device.c155
-rw-r--r--client/device.h10
-rw-r--r--client/filter.c265
-rw-r--r--client/filter.h45
-rw-r--r--client/socket.c20
-rw-r--r--client/socket.h3
-rw-r--r--common.c2
-rw-r--r--common.h3
-rw-r--r--server/connection.c9
-rw-r--r--server/connection.h6
-rw-r--r--server/connectionHTTP.c8
-rw-r--r--server/connectionVTP.c117
-rw-r--r--server/connectionVTP.h18
-rw-r--r--server/livefilter.c18
-rw-r--r--server/livefilter.h18
-rw-r--r--server/livestreamer.c506
-rw-r--r--server/livestreamer.h66
-rw-r--r--streamdev-server.c4
-rw-r--r--tools/socket.c14
-rw-r--r--tools/socket.h3
-rw-r--r--tools/source.c2
22 files changed, 1038 insertions, 258 deletions
diff --git a/Makefile b/Makefile
index 7be2627..8529750 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,7 @@
#
# Makefile for a Video Disk Recorder plugin
#
-# $Id: Makefile,v 1.7 2006/09/14 10:30:16 schmirl Exp $
+# $Id: Makefile,v 1.8 2007/04/16 11:01:02 schmirl Exp $
# The official name of this plugin.
# This name will be used in the '-P...' option of VDR to load the plugin.
@@ -16,7 +16,7 @@ VERSION = $(shell grep 'const char \*VERSION *=' common.c | awk '{ print $$5 }'
### The C++ compiler and options:
CXX ?= g++
-CXXFLAGS ?= -fPIC -W -Woverloaded-virtual
+CXXFLAGS ?= -fPIC -Wall -Woverloaded-virtual
### The directory environment:
diff --git a/client/device.c b/client/device.c
index a2e4580..be79bb9 100644
--- a/client/device.c
+++ b/client/device.c
@@ -1,5 +1,5 @@
/*
- * $Id: device.c,v 1.8 2007/01/15 12:15:12 schmirl Exp $
+ * $Id: device.c,v 1.13 2007/05/07 12:18:18 schmirl Exp $
*/
#include "client/device.h"
@@ -42,6 +42,8 @@ cStreamdevDevice::cStreamdevDevice(void) {
#endif
m_Device = this;
+ m_Pids = 0;
+ m_DvrClosed = true;
if (StreamdevClientSetup.SyncEPG)
ClientSocket.SynchronizeEPG();
@@ -59,13 +61,22 @@ cStreamdevDevice::~cStreamdevDevice() {
bool cStreamdevDevice::ProvidesSource(int Source) const {
Dprintf("ProvidesSource, Source=%d\n", Source);
- return false;
+ return true;
}
bool cStreamdevDevice::ProvidesTransponder(const cChannel *Channel) const
{
Dprintf("ProvidesTransponder\n");
- return false;
+ return true;
+}
+
+bool cStreamdevDevice::IsTunedToTransponder(const cChannel *Channel)
+{
+ bool res = false;
+ if (ClientSocket.DataSocket(siLive) != NULL
+ && TRANSPONDER(Channel, m_Channel))
+ res = true;
+ return res;
}
bool cStreamdevDevice::ProvidesChannel(const cChannel *Channel, int Priority,
@@ -123,37 +134,118 @@ bool cStreamdevDevice::SetChannelDevice(const cChannel *Channel,
bool cStreamdevDevice::SetPid(cPidHandle *Handle, int Type, bool On) {
Dprintf("SetPid, Pid=%d, Type=%d, On=%d, used=%d\n", Handle->pid, Type, On,
Handle->used);
- if (Handle->pid && (On || !Handle->used))
- return ClientSocket.SetPid(Handle->pid, On);
- return true;
+ LOCK_THREAD;
+
+ if (On && !m_TSBuffer) {
+ Dprintf("SetPid: no data connection -> OpenDvr()");
+ OpenDvrInt();
+ }
+
+ bool res = true;
+ if (Handle->pid && (On || !Handle->used)) {
+ res = ClientSocket.SetPid(Handle->pid, On);
+
+ m_Pids += (!res) ? 0 : On ? 1 : -1;
+ if (m_Pids < 0)
+ m_Pids = 0;
+
+ if(m_Pids < 1 && m_DvrClosed) {
+ Dprintf("SetPid: 0 pids left -> CloseDvr()");
+ CloseDvrInt();
+ }
+ }
+
+ return res;
}
-bool cStreamdevDevice::OpenDvr(void) {
- Dprintf("OpenDvr\n");
- CloseDvr();
+bool cStreamdevDevice::OpenDvrInt(void) {
+ Dprintf("OpenDvrInt\n");
+ LOCK_THREAD;
+
+ CloseDvrInt();
+ if (m_TSBuffer) {
+ Dprintf("cStreamdevDevice::OpenDvrInt(): DVR connection already open\n");
+ return true;
+ }
+
+ Dprintf("cStreamdevDevice::OpenDvrInt(): Connecting ...\n");
if (ClientSocket.CreateDataConnection(siLive)) {
- //m_Assembler = new cStreamdevAssembler(ClientSocket.DataSocket(siLive));
- //m_TSBuffer = new cTSBuffer(m_Assembler->ReadPipe(), MEGABYTE(2), CardIndex() + 1);
m_TSBuffer = new cTSBuffer(*ClientSocket.DataSocket(siLive), MEGABYTE(2), CardIndex() + 1);
- Dprintf("waiting\n");
- //m_Assembler->WaitForFill();
- Dprintf("resuming\n");
return true;
}
+ esyslog("cStreamdevDevice::OpenDvrInt(): DVR connection FAILED");
return false;
}
-void cStreamdevDevice::CloseDvr(void) {
- Dprintf("CloseDvr\n");
+bool cStreamdevDevice::OpenDvr(void) {
+ Dprintf("OpenDvr\n");
+ LOCK_THREAD;
+
+ m_DvrClosed = false;
+ return OpenDvrInt();
+}
- //DELETENULL(m_Assembler);
+void cStreamdevDevice::CloseDvrInt(void) {
+ Dprintf("CloseDvrInt\n");
+ LOCK_THREAD;
+
+ if (ClientSocket.CheckConnection()) {
+ if (!m_DvrClosed) {
+ Dprintf("cStreamdevDevice::CloseDvrInt(): m_DvrClosed=false -> not closing yet\n");
+ return;
+ }
+ if (m_Pids > 0) {
+ Dprintf("cStreamdevDevice::CloseDvrInt(): %d active pids -> not closing yet\n", m_Pids);
+ return;
+ }
+ } else {
+ Dprintf("cStreamdevDevice::CloseDvrInt(): Control connection gone !\n");
+ }
+
+ Dprintf("cStreamdevDevice::CloseDvrInt(): Closing DVR connection\n");
DELETENULL(m_TSBuffer);
ClientSocket.CloseDvr();
}
+void cStreamdevDevice::CloseDvr(void) {
+ Dprintf("CloseDvr\n");
+ LOCK_THREAD;
+
+ m_DvrClosed = true;
+ CloseDvrInt();
+}
+
bool cStreamdevDevice::GetTSPacket(uchar *&Data) {
if (m_TSBuffer) {
Data = m_TSBuffer->Get();
+#if 1 // TODO: this should be fixed in vdr cTSBuffer
+ // simple disconnect detection
+ static int m_TSFails = 0;
+ if (!Data) {
+ LOCK_THREAD;
+ if(!ClientSocket.DataSocket(siLive)) {
+ return false; // triggers CloseDvr() + OpenDvr() in cDevice
+ }
+ cPoller Poller(*ClientSocket.DataSocket(siLive));
+ errno = 0;
+ if (Poller.Poll() && !errno) {
+ char tmp[1];
+ if (recv(*ClientSocket.DataSocket(siLive), tmp, 1, MSG_PEEK) == 0 && !errno) {
+esyslog("cStreamDevice::GetTSPacket: GetChecked: NOTHING (%d)", m_TSFails);
+ m_TSFails++;
+ if (m_TSFails > 10) {
+ isyslog("cStreamdevDevice::GetTSPacket(): disconnected");
+ m_Pids = 0;
+ CloseDvrInt();
+ m_TSFails = 0;
+ return false;
+ }
+ return true;
+ }
+ }
+ m_TSFails = 0;
+ }
+#endif
return true;
}
return false;
@@ -162,11 +254,24 @@ bool cStreamdevDevice::GetTSPacket(uchar *&Data) {
#if VDRVERSNUM >= 10300
int cStreamdevDevice::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
Dprintf("OpenFilter\n");
- if (StreamdevClientSetup.StreamFilters
- && ClientSocket.SetFilter(Pid, Tid, Mask, true)) {
- return m_Filters->OpenFilter(Pid, Tid, Mask);
- } else
+
+ if (!StreamdevClientSetup.StreamFilters)
return -1;
+
+
+ if (!ClientSocket.DataSocket(siLiveFilter)) {
+ if (ClientSocket.CreateDataConnection(siLiveFilter)) {
+ m_Filters->SetConnection(*ClientSocket.DataSocket(siLiveFilter));
+ } else {
+ isyslog("cStreamdevDevice::OpenFilter: connect failed: %m");
+ return -1;
+ }
+ }
+
+ if (ClientSocket.SetFilter(Pid, Tid, Mask, true))
+ return m_Filters->OpenFilter(Pid, Tid, Mask);
+
+ return -1;
}
#endif
@@ -177,11 +282,17 @@ bool cStreamdevDevice::Init(void) {
}
bool cStreamdevDevice::ReInit(void) {
+ if(m_Device) {
+ m_Device->Lock();
+ m_Device->m_Filters->SetConnection(-1);
+ m_Device->m_Pids = 0;
+ }
ClientSocket.Quit();
ClientSocket.Reset();
if (m_Device != NULL) {
- DELETENULL(m_Device->m_TSBuffer);
+ //DELETENULL(m_Device->m_TSBuffer);
DELETENULL(m_Device->m_Assembler);
+ m_Device->Unlock();
}
return StreamdevClientSetup.StartClient ? Init() : true;
}
diff --git a/client/device.h b/client/device.h
index bdfabb6..525c1d4 100644
--- a/client/device.h
+++ b/client/device.h
@@ -1,5 +1,5 @@
/*
- * $Id: device.h,v 1.3 2005/02/08 15:21:19 lordjaxom Exp $
+ * $Id: device.h,v 1.5 2007/04/24 10:43:40 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_DEVICE_H
@@ -25,9 +25,14 @@ private:
#if VDRVERSNUM >= 10307
cStreamdevFilters *m_Filters;
#endif
+ int m_Pids;
+ bool m_DvrClosed;
static cStreamdevDevice *m_Device;
+ bool OpenDvrInt(void);
+ void CloseDvrInt(void);
+
protected:
virtual bool SetChannelDevice(const cChannel *Channel, bool LiveView);
virtual bool HasLock(int TimeoutMs)
@@ -51,9 +56,10 @@ public:
virtual ~cStreamdevDevice();
virtual bool ProvidesSource(int Source) const;
- virtual bool ProvidesTransponder(const cChannel *Channel) const;
+ virtual bool ProvidesTransponder(const cChannel *Channel) const;
virtual bool ProvidesChannel(const cChannel *Channel, int Priority = -1,
bool *NeedsDetachReceivers = NULL) const;
+ virtual bool IsTunedToTransponder(const cChannel *Channel);
static bool Init(void);
static bool ReInit(void);
diff --git a/client/filter.c b/client/filter.c
index daf534a..91af8b7 100644
--- a/client/filter.c
+++ b/client/filter.c
@@ -1,5 +1,5 @@
/*
- * $Id: filter.c,v 1.3 2005/11/06 16:43:58 lordjaxom Exp $
+ * $Id: filter.c,v 1.11 2007/04/24 11:23:16 schmirl Exp $
*/
#include "client/filter.h"
@@ -7,113 +7,288 @@
#include "tools/select.h"
#include "common.h"
-#include <vdr/ringbuffer.h>
#include <vdr/device.h>
#if VDRVERSNUM >= 10300
+// --- cStreamdevFilter ------------------------------------------------------
+
+class cStreamdevFilter: public cListObject {
+private:
+ uchar m_Buffer[4096];
+ int m_Used;
+ int m_Pipe[2];
+ u_short m_Pid;
+ u_char m_Tid;
+ u_char m_Mask;
+
+public:
+ cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
+ virtual ~cStreamdevFilter();
+
+ bool Matches(u_short Pid, u_char Tid);
+ bool PutSection(const uchar *Data, int Length, bool Pusi);
+ int ReadPipe(void) const { return m_Pipe[0]; }
+
+ bool IsClosed(void);
+ void Reset(void);
+
+ u_short Pid(void) const { return m_Pid; }
+ u_char Tid(void) const { return m_Tid; }
+ u_char Mask(void) const { return m_Mask; }
+};
+
+inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) {
+ return m_Pid == Pid && m_Tid == (Tid & m_Mask);
+}
+
cStreamdevFilter::cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask) {
m_Used = 0;
m_Pid = Pid;
m_Tid = Tid;
m_Mask = Mask;
+ m_Pipe[0] = m_Pipe[1] = -1;
- if (pipe(m_Pipe) != 0 || fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0) {
- esyslog("streamev-client: coudln't open section filter pipe: %m");
- m_Pipe[0] = m_Pipe[1] = -1;
+#ifdef SOCK_SEQPACKET
+ // SOCK_SEQPACKET (since kernel 2.6.4)
+ if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, m_Pipe) != 0) {
+ esyslog("streamdev-client: socketpair(SOCK_SEQPACKET) failed: %m, trying SOCK_DGRAM");
+ }
+#endif
+ if (m_Pipe[0] < 0 && socketpair(AF_UNIX, SOCK_DGRAM, 0, m_Pipe) != 0) {
+ esyslog("streamdev-client: couldn't open section filter socket: %m");
+ }
+
+ else if(fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK) != 0 ||
+ fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK) != 0) {
+ esyslog("streamdev-client: couldn't set section filter socket to non-blocking mode: %m");
}
}
cStreamdevFilter::~cStreamdevFilter() {
Dprintf("~cStreamdevFilter %p\n", this);
- if (m_Pipe[0] >= 0)
- close(m_Pipe[0]);
+
+ // ownership of handle m_Pipe[0] has been transferred to VDR section handler
+ //if (m_Pipe[0] >= 0)
+ // close(m_Pipe[0]);
if (m_Pipe[1] >= 0)
close(m_Pipe[1]);
}
-bool cStreamdevFilter::PutSection(const uchar *Data, int Length) {
+bool cStreamdevFilter::PutSection(const uchar *Data, int Length, bool Pusi) {
+
+ if (!m_Used && !Pusi) /* wait for payload unit start indicator */
+ return true;
+ if (m_Used && Pusi) /* reset at payload unit start */
+ Reset();
+
if (m_Used + Length >= (int)sizeof(m_Buffer)) {
esyslog("ERROR: Streamdev: Section handler buffer overflow (%d bytes lost)",
Length);
- m_Used = 0;
+ Reset();
return true;
}
+
memcpy(m_Buffer + m_Used, Data, Length);
m_Used += Length;
-
if (m_Used > 3) {
int length = (((m_Buffer[1] & 0x0F) << 8) | m_Buffer[2]) + 3;
if (m_Used == length) {
- if (write(m_Pipe[1], m_Buffer, length) < 0)
- return false;
m_Used = 0;
+ if (write(m_Pipe[1], m_Buffer, length) < 0) {
+ if(errno == EAGAIN || errno == EWOULDBLOCK)
+ dsyslog("cStreamdevFilter::PutSection socket overflow, "
+ "Pid %4d Tid %3d", m_Pid, m_Tid);
+
+ else
+ return false;
+ }
}
+
+ if (m_Used > length) {
+ dsyslog("cStreamdevFilter::PutSection: m_Used > length ! Pid %2d, Tid%2d "
+ "(len %3d, got %d/%d)", m_Pid, m_Tid, Length, m_Used, length);
+ if(Length < TS_SIZE-5) {
+ // TS packet not full -> this must be last TS packet of section data -> safe to reset now
+ Reset();
+ }
+ }
+
}
return true;
}
+void cStreamdevFilter::Reset(void) {
+ if(m_Used)
+ dsyslog("cStreamdevFilter::Reset skipping %d bytes", m_Used);
+ m_Used = 0;
+}
+
+bool cStreamdevFilter::IsClosed(void) {
+ char m_Buffer[3] = {0,0,0}; /* tid 0, 0 bytes */
+
+ // Test if pipe/socket has been closed by writing empty section
+ if (write(m_Pipe[1], m_Buffer, 3) < 0 &&
+ errno != EAGAIN &&
+ errno != EWOULDBLOCK) {
+
+ if (errno != ECONNREFUSED &&
+ errno != ECONNRESET &&
+ errno != EPIPE)
+ esyslog("cStreamdevFilter::IsClosed failed: %m");
+
+ return true;
+ }
+
+ return false;
+}
+
+// --- cStreamdevFilters -----------------------------------------------------
+
cStreamdevFilters::cStreamdevFilters(void):
cThread("streamdev-client: sections assembler") {
- m_Active = false;
- m_RingBuffer = new cRingBufferLinear(MEGABYTE(1), TS_SIZE * 2, true);
- Start();
+ m_TSBuffer = NULL;
}
cStreamdevFilters::~cStreamdevFilters() {
- if (m_Active) {
- m_Active = false;
- Cancel(3);
- }
- delete m_RingBuffer;
+ SetConnection(-1);
}
int cStreamdevFilters::OpenFilter(u_short Pid, u_char Tid, u_char Mask) {
+ CarbageCollect();
+
cStreamdevFilter *f = new cStreamdevFilter(Pid, Tid, Mask);
+ int fh = f->ReadPipe();
+
+ Lock();
Add(f);
- return f->ReadPipe();
+ Unlock();
+
+ return fh;
+}
+
+void cStreamdevFilters::CarbageCollect(void) {
+ LOCK_THREAD;
+ for (cStreamdevFilter *fi = First(); fi;) {
+ if (fi->IsClosed()) {
+ if (errno == ECONNREFUSED ||
+ errno == ECONNRESET ||
+ errno == EPIPE) {
+ ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), false);
+ Dprintf("cStreamdevFilters::CarbageCollector: filter closed: Pid %4d, Tid %3d, Mask %2x (%d filters left)",
+ (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
+
+ cStreamdevFilter *next = Prev(fi);
+ Del(fi);
+ fi = next ? Next(next) : First();
+ } else {
+ esyslog("cStreamdevFilters::CarbageCollector() error: "
+ "Pid %4d, Tid %3d, Mask %2x (%d filters left) failed",
+ (int)fi->Pid(), (int)fi->Tid(), fi->Mask(), Count()-1);
+ LOG_ERROR;
+ fi = Next(fi);
+ }
+ } else {
+ fi = Next(fi);
+ }
+ }
}
-cStreamdevFilter *cStreamdevFilters::Matches(u_short Pid, u_char Tid) {
- for (cStreamdevFilter *f = First(); f; f = Next(f)) {
- if (f->Matches(Pid, Tid))
- return f;
+bool cStreamdevFilters::ReActivateFilters(void)
+{
+ LOCK_THREAD;
+
+ bool res = true;
+ CarbageCollect();
+ for (cStreamdevFilter *fi = First(); fi; fi = Next(fi)) {
+ res = ClientSocket.SetFilter(fi->Pid(), fi->Tid(), fi->Mask(), true) && res;
+ Dprintf("ReActivateFilters(%d, %d, %d) -> %s", fi->Pid(), fi->Tid(), fi->Mask(), res ? "Ok" :"FAIL");
}
- return NULL;
+ return res;
}
-void cStreamdevFilters::Put(const uchar *Data) {
- int p = m_RingBuffer->Put(Data, TS_SIZE);
- if (p != TS_SIZE)
- m_RingBuffer->ReportOverflow(TS_SIZE - p);
+void cStreamdevFilters::SetConnection(int Handle) {
+
+ Cancel(2);
+ DELETENULL(m_TSBuffer);
+
+ if (Handle >= 0) {
+ m_TSBuffer = new cTSBuffer(Handle, MEGABYTE(1), 1);
+ ReActivateFilters();
+ Start();
+ }
}
void cStreamdevFilters::Action(void) {
- m_Active = true;
- while (m_Active) {
- int recvd;
- const uchar *block = m_RingBuffer->Get(recvd);
+ int fails = 0;
- if (block && recvd > 0) {
- cStreamdevFilter *f;
+ while (Running()) {
+ const uchar *block = m_TSBuffer->Get();
+ if (block) {
u_short pid = (((u_short)block[1] & PID_MASK_HI) << 8) | block[2];
u_char tid = block[3];
-
- if ((f = Matches(pid, tid)) != NULL) {
- int len = block[4];
- if (!f->PutSection(block + 5, len)) {
- if (errno != EPIPE) {
- esyslog("streamdev-client: couldn't send section packet: %m");
+ bool Pusi = block[1] & 0x40;
+ int len = block[4];
+#if 0
+ if (block[1] == 0xff &&
+ block[2] == 0xff &&
+ block[3] == 0xff &&
+ block[4] == 0x7f)
+ isyslog("*********** TRANSPONDER -> %s **********", block+5);
+#endif
+ LOCK_THREAD;
+ cStreamdevFilter *f = First();
+ while (f) {
+ cStreamdevFilter *next = Next(f);
+ if (f->Matches(pid, tid)) {
+
+ if (f->PutSection(block + 5, len, Pusi))
+ break;
+
+ if (errno != ECONNREFUSED &&
+ errno != ECONNRESET &&
+ errno != EPIPE) {
Dprintf("FATAL ERROR: %m\n");
+ esyslog("streamdev-client: couldn't send section packet: %m");
}
ClientSocket.SetFilter(f->Pid(), f->Tid(), f->Mask(), false);
Del(f);
+ // Filter was closed.
+ // - need to check remaining filters for another match
}
+ f = next;
}
- m_RingBuffer->Del(TS_SIZE);
- } else
- usleep(1);
+ } else {
+#if 1 // TODO: this should be fixed in vdr cTSBuffer
+ // Check disconnection
+ int fd = *ClientSocket.DataSocket(siLiveFilter);
+ if(fd < 0)
+ break;
+ cPoller Poller(fd);
+ if (Poller.Poll()) {
+ char tmp[1];
+ errno = 0;
+ Dprintf("cStreamdevFilters::Action(): checking connection");
+ if (recv(fd, tmp, 1, MSG_PEEK) == 0 && errno != EAGAIN) {
+ ++fails;
+ if (fails >= 10) {
+ esyslog("cStreamdevFilters::Action(): stream disconnected ?");
+ ClientSocket.CloseDataConnection(siLiveFilter);
+ break;
+ }
+ } else {
+ fails = 0;
+ }
+ } else {
+ fails = 0;
+ }
+ cCondWait::SleepMs(10);
+#endif
+ }
}
+
+ DELETENULL(m_TSBuffer);
+ dsyslog("StreamdevFilters::Action() ended");
}
#endif // VDRVERSNUM >= 10300
diff --git a/client/filter.h b/client/filter.h
index cb46bf0..e0a1575 100644
--- a/client/filter.h
+++ b/client/filter.h
@@ -1,5 +1,5 @@
/*
- * $Id: filter.h,v 1.1.1.1 2004/12/30 22:44:04 lordjaxom Exp $
+ * $Id: filter.h,v 1.4 2007/04/24 11:23:16 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_FILTER_H
@@ -12,52 +12,25 @@
#include <vdr/tools.h>
#include <vdr/thread.h>
-class cRingBufferFrame;
-class cRingBufferLinear;
-
-class cStreamdevFilter: public cListObject {
-private:
- uchar m_Buffer[4096];
- int m_Used;
- int m_Pipe[2];
- u_short m_Pid;
- u_char m_Tid;
- u_char m_Mask;
- cRingBufferFrame *m_RingBuffer;
-
-public:
- cStreamdevFilter(u_short Pid, u_char Tid, u_char Mask);
- virtual ~cStreamdevFilter();
-
- bool Matches(u_short Pid, u_char Tid);
- bool PutSection(const uchar *Data, int Length);
- int ReadPipe(void) const { return m_Pipe[0]; }
-
- u_short Pid(void) const { return m_Pid; }
- u_char Tid(void) const { return m_Tid; }
- u_char Mask(void) const { return m_Mask; }
-
-};
-
-inline bool cStreamdevFilter::Matches(u_short Pid, u_char Tid) {
- return m_Pid == Pid && m_Tid == (Tid & m_Mask);
-}
+class cTSBuffer;
+class cStreamdevFilter;
class cStreamdevFilters: public cList<cStreamdevFilter>, public cThread {
private:
- bool m_Active;
- cRingBufferLinear *m_RingBuffer;
-
+ cTSBuffer *m_TSBuffer;
+
protected:
virtual void Action(void);
+ void CarbageCollect(void);
+
+ bool ReActivateFilters(void);
public:
cStreamdevFilters(void);
virtual ~cStreamdevFilters();
+ void SetConnection(int Handle);
int OpenFilter(u_short Pid, u_char Tid, u_char Mask);
- cStreamdevFilter *Matches(u_short Pid, u_char Tid);
- void Put(const uchar *Data);
};
# endif // VDRVERSNUM >= 10300
diff --git a/client/socket.c b/client/socket.c
index e59c705..5db6efe 100644
--- a/client/socket.c
+++ b/client/socket.c
@@ -1,5 +1,5 @@
/*
- * $Id: socket.c,v 1.7 2007/01/15 11:45:48 schmirl Exp $
+ * $Id: socket.c,v 1.8 2007/04/24 10:57:34 schmirl Exp $
*/
#include <tools/select.h>
@@ -215,6 +215,24 @@ bool cClientSocket::CreateDataConnection(eSocketId Id) {
return true;
}
+bool cClientSocket::CloseDataConnection(eSocketId Id) {
+ //if (!CheckConnection()) return false;
+
+ CMD_LOCK;
+
+ if(Id == siLive || Id == siLiveFilter)
+ if (m_DataSockets[Id] != NULL) {
+ std::string command = (std::string)"ABRT " + (const char*)itoa(Id);
+ if (!Command(command, 220)) {
+ if (errno == 0)
+ esyslog("ERROR: Streamdev: Couldn't cleanly close data connection");
+ //return false;
+ }
+ DELETENULL(m_DataSockets[Id]);
+ }
+ return true;
+}
+
bool cClientSocket::SetChannelDevice(const cChannel *Channel) {
if (!CheckConnection()) return false;
diff --git a/client/socket.h b/client/socket.h
index 17478ce..e839223 100644
--- a/client/socket.h
+++ b/client/socket.h
@@ -1,5 +1,5 @@
/*
- * $Id: socket.h,v 1.3 2005/02/08 17:22:35 lordjaxom Exp $
+ * $Id: socket.h,v 1.4 2007/04/24 10:57:34 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_CLIENT_CONNECTION_H
@@ -47,6 +47,7 @@ public:
bool CheckConnection(void);
bool ProvidesChannel(const cChannel *Channel, int Priority);
bool CreateDataConnection(eSocketId Id);
+ bool CloseDataConnection(eSocketId Id);
bool SetChannelDevice(const cChannel *Channel);
bool SetPid(int Pid, bool On);
#if VDRVERSNUM >= 10300
diff --git a/common.c b/common.c
index d2a5fed..34495c1 100644
--- a/common.c
+++ b/common.c
@@ -11,7 +11,7 @@
using namespace std;
-const char *VERSION = "0.3.3-20070403";
+const char *VERSION = "0.3.3-20070509";
const char *StreamTypes[st_Count] = {
"TS",
diff --git a/common.h b/common.h
index e5f143d..1d14883 100644
--- a/common.h
+++ b/common.h
@@ -1,5 +1,5 @@
/*
- * $Id: common.h,v 1.7 2005/11/06 16:43:58 lordjaxom Exp $
+ * $Id: common.h,v 1.8 2007/04/24 10:50:13 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_COMMON_H
@@ -83,6 +83,7 @@ enum eSuspendMode {
enum eSocketId {
siLive,
siReplay,
+ siLiveFilter,
si_Count
};
diff --git a/server/connection.c b/server/connection.c
index dff1945..629ed1d 100644
--- a/server/connection.c
+++ b/server/connection.c
@@ -1,5 +1,5 @@
/*
- * $Id: connection.c,v 1.8 2007/01/15 12:00:19 schmirl Exp $
+ * $Id: connection.c,v 1.10 2007/05/07 12:25:11 schmirl Exp $
*/
#include "server/connection.h"
@@ -101,9 +101,16 @@ bool cServerConnection::Respond(const char *Message, bool Last, ...)
length = vasprintf(&buffer, Message, ap);
va_end(ap);
+ if (length < 0) {
+ esyslog("ERROR: streamdev: buffer allocation failed (%s) for %s:%d",
+ m_Protocol, RemoteIp().c_str(), RemotePort());
+ return false;
+ }
+
if (m_WriteBytes + length + 2 > sizeof(m_WriteBuffer)) {
esyslog("ERROR: streamdev: output buffer overflow (%s) for %s:%d",
m_Protocol, RemoteIp().c_str(), RemotePort());
+ free(buffer);
return false;
}
Dprintf("OUT: |%s|\n", buffer);
diff --git a/server/connection.h b/server/connection.h
index 2df850e..fe828d9 100644
--- a/server/connection.h
+++ b/server/connection.h
@@ -1,5 +1,5 @@
/*
- * $Id: connection.h,v 1.4 2007/04/02 10:32:34 schmirl Exp $
+ * $Id: connection.h,v 1.5 2007/04/16 11:01:02 schmirl Exp $
*/
#ifndef VDR_STREAMDEV_SERVER_CONNECTION_H
@@ -38,8 +38,8 @@ protected:
Only one line at a time may be sent. If there are lines to follow, set
Last to false. Command(NULL) will be called in the next cycle, so you can
post the next line. */
- virtual bool Respond(const char *Message, bool Last = true, ...)
- __attribute__ ((format (printf, 2, 4)));
+ virtual bool Respond(const char *Message, bool Last = true, ...);
+ //__attribute__ ((format (printf, 2, 4)));
public:
/* If you derive, specify a short string such as HTTP for Protocol, which
diff --git a/server/connectionHTTP.c b/server/connectionHTTP.c
index a526da9..8bde3aa 100644
--- a/server/connectionHTTP.c
+++ b/server/connectionHTTP.c
@@ -1,5 +1,5 @@
/*
- * $Id: connectionHTTP.c,v 1.10 2006/01/26 19:40:18 lordjaxom Exp $
+ * $Id: connectionHTTP.c,v 1.12 2007/05/09 09:12:42 schmirl Exp $
*/
#include <ctype.h>
@@ -41,6 +41,8 @@ bool cConnectionHTTP::Command(char *Cmd)
}
Dprintf("header\n");
return true;
+ default:
+ break;
}
return false; // ??? shouldn't happen
}
@@ -69,6 +71,8 @@ bool cConnectionHTTP::ProcessRequest(void)
device->SwitchChannel(m_Channel, false);
if (m_LiveStreamer->SetChannel(m_Channel, m_StreamType, m_Apid)) {
m_LiveStreamer->SetDevice(device);
+ if (!SetDSCP())
+ LOG_ERROR_STR("unable to set DSCP sockopt");
if (m_StreamType == stES && (m_Apid != 0 || ISRADIO(m_Channel))) {
return Respond("HTTP/1.0 200 OK")
&& Respond("Content-Type: audio/mpeg")
@@ -153,7 +157,7 @@ bool cConnectionHTTP::CmdGET(const std::string &Opts)
{
const char *sp = Opts.c_str(), *ptr = sp, *ep;
const cChannel *chan;
- int apid = 0, pos;
+ int apid = 0;
ptr = skipspace(ptr);
while (*ptr == '/')
diff --git a/server/connectionVTP.c b/server/connectionVTP.c
index 18ea353..c847bf3 100644
--- a/server/connectionVTP.c
+++ b/server/connectionVTP.c
@@ -1,5 +1,5 @@
/*
- * $Id: connectionVTP.c,v 1.8 2007/03/02 15:27:07 schmirl Exp $
+ * $Id: connectionVTP.c,v 1.14 2007/05/09 09:12:42 schmirl Exp $
*/
#include "server/connectionVTP.h"
@@ -153,8 +153,6 @@ cLSTEHandler::~cLSTEHandler()
bool cLSTEHandler::Next(bool &Last)
{
- char *buffer;
-
if (m_Error != NULL) {
Last = true;
cString str(m_Error, true);
@@ -468,6 +466,8 @@ cConnectionVTP::cConnectionVTP(void):
cServerConnection("VTP"),
m_LiveSocket(NULL),
m_LiveStreamer(NULL),
+ m_FilterSocket(NULL),
+ m_FilterStreamer(NULL),
m_LastCommand(NULL),
m_NoTSPIDS(false),
m_LSTEHandler(NULL),
@@ -482,11 +482,18 @@ cConnectionVTP::~cConnectionVTP()
free(m_LastCommand);
delete m_LiveStreamer;
delete m_LiveSocket;
+ delete m_FilterStreamer;
+ delete m_FilterSocket;
delete m_LSTTHandler;
delete m_LSTCHandler;
delete m_LSTEHandler;
}
+inline bool cConnectionVTP::Abort(void) const
+{
+ return m_LiveStreamer && m_LiveStreamer->Abort();
+}
+
void cConnectionVTP::Welcome(void)
{
Respond(220, "Welcome to Video Disk Recorder (VTP)");
@@ -500,12 +507,14 @@ void cConnectionVTP::Reject(void)
void cConnectionVTP::Detach(void)
{
- if (m_LiveStreamer != NULL) m_LiveStreamer->Detach();
+ if (m_LiveStreamer) m_LiveStreamer->Detach();
+ if (m_FilterStreamer) m_FilterStreamer->Detach();
}
void cConnectionVTP::Attach(void)
{
- if (m_LiveStreamer != NULL) m_LiveStreamer->Attach();
+ if (m_LiveStreamer) m_LiveStreamer->Attach();
+ if (m_FilterStreamer) m_FilterStreamer->Attach();
}
bool cConnectionVTP::Command(char *Cmd)
@@ -549,8 +558,8 @@ bool cConnectionVTP::Command(char *Cmd)
else if (strcasecmp(Cmd, "ADDF") == 0) return CmdADDF(param);
else if (strcasecmp(Cmd, "DELF") == 0) return CmdDELF(param);
else if (strcasecmp(Cmd, "ABRT") == 0) return CmdABRT(param);
- else if (strcasecmp(Cmd, "QUIT") == 0) return CmdQUIT(param);
- else if (strcasecmp(Cmd, "SUSP") == 0) return CmdSUSP(param);
+ else if (strcasecmp(Cmd, "QUIT") == 0) return CmdQUIT();
+ else if (strcasecmp(Cmd, "SUSP") == 0) return CmdSUSP();
// Commands adopted from SVDRP
//else if (strcasecmp(Cmd, "DELR") == 0) return CmdDELR(param);
else if (strcasecmp(Cmd, "MODT") == 0) return CmdMODT(param);
@@ -562,8 +571,6 @@ bool cConnectionVTP::Command(char *Cmd)
bool cConnectionVTP::CmdCAPS(char *Opts)
{
- char *buffer;
-
if (strcasecmp(Opts, "TS") == 0) {
m_NoTSPIDS = true;
return Respond(220, "Ignored, capability \"%s\" accepted for "
@@ -575,6 +582,14 @@ bool cConnectionVTP::CmdCAPS(char *Opts)
return Respond(220, "Capability \"%s\" accepted", Opts);
}
+#if VDRVERSNUM >= 10300
+ //
+ // Deliver section filters data in separate, channel-independent data stream
+ //
+ if (strcasecmp(Opts, "FILTERS") == 0)
+ return Respond(220, "Capability \"%s\" accepted", Opts);
+#endif
+
return Respond(561, "Capability \"%s\" not known", Opts);
}
@@ -607,9 +622,14 @@ bool cConnectionVTP::CmdPORT(char *Opts)
id = strtoul(Opts, &ep, 10);
if (ep == Opts || !isspace(*ep))
return Respond(500, "Use: PORT Id Destination");
-
- if (id != 0)
+
+#if VDRVERSNUM >= 10300
+ if (id != siLive && id != siLiveFilter)
+ return Respond(501, "Wrong connection id %d", id);
+#else
+ if (id != siLive)
return Respond(501, "Wrong connection id %d", id);
+#endif
Opts = skipspace(ep);
n = 0;
@@ -636,6 +656,33 @@ bool cConnectionVTP::CmdPORT(char *Opts)
isyslog("Streamdev: Setting data connection to %s:%d", dataip, dataport);
+#if VDRVERSNUM >= 10300
+ if (id == siLiveFilter) {
+ if(m_FilterStreamer)
+ m_FilterStreamer->Stop();
+ delete m_FilterSocket;
+
+ m_FilterSocket = new cTBSocket(SOCK_STREAM);
+ if (!m_FilterSocket->Connect(dataip, dataport)) {
+ esyslog("ERROR: Streamdev: Couldn't open data connection to %s:%d: %s",
+ dataip, dataport, strerror(errno));
+ DELETENULL(m_FilterSocket);
+ return Respond(551, "Couldn't open data connection");
+ }
+
+ if(!m_FilterStreamer)
+ m_FilterStreamer = new cStreamdevFilterStreamer;
+ m_FilterStreamer->Start(m_FilterSocket);
+ m_FilterStreamer->Activate(true);
+
+ return Respond(220, "Port command ok, data connection opened");
+ }
+#endif
+
+ if(m_LiveSocket && m_LiveStreamer)
+ m_LiveStreamer->Stop();
+ delete m_LiveSocket;
+
m_LiveSocket = new cTBSocket(SOCK_STREAM);
if (!m_LiveSocket->Connect(dataip, dataport)) {
esyslog("ERROR: Streamdev: Couldn't open data connection to %s:%d: %s",
@@ -644,10 +691,12 @@ bool cConnectionVTP::CmdPORT(char *Opts)
return Respond(551, "Couldn't open data connection");
}
- if (id == siLive)
+ if (!m_LiveSocket->SetDSCP())
+ LOG_ERROR_STR("unable to set DSCP sockopt");
+ if (m_LiveStreamer)
m_LiveStreamer->Start(m_LiveSocket);
- return Respond(220, "Port command ok, data connection opened");
+ return Respond(220, "Port command ok, data connection opened");
}
bool cConnectionVTP::CmdTUNE(char *Opts)
@@ -668,7 +717,16 @@ bool cConnectionVTP::CmdTUNE(char *Opts)
m_LiveStreamer = new cStreamdevLiveStreamer(1);
m_LiveStreamer->SetChannel(chan, m_NoTSPIDS ? stTS : stTSPIDS);
m_LiveStreamer->SetDevice(dev);
+ if(m_LiveSocket)
+ m_LiveStreamer->Start(m_LiveSocket);
+#if VDRVERSNUM >= 10300
+ if(!m_FilterStreamer)
+ m_FilterStreamer = new cStreamdevFilterStreamer;
+ m_FilterStreamer->SetDevice(dev);
+ //m_FilterStreamer->SetChannel(chan);
+#endif
+
return Respond(220, "Channel tuned");
}
@@ -706,8 +764,8 @@ bool cConnectionVTP::CmdADDF(char *Opts)
int pid, tid, mask;
char *ep;
- if (m_LiveStreamer == NULL)
- return Respond(560, "Can't set filters without a stream");
+ if (m_FilterStreamer == NULL)
+ return Respond(560, "Can't set filters without a filter stream");
pid = strtol(Opts, &ep, 10);
if (ep == Opts || (*ep != ' '))
@@ -721,7 +779,7 @@ bool cConnectionVTP::CmdADDF(char *Opts)
if (ep == Opts || (*ep != '\0' && *ep != ' '))
return Respond(500, "Use: ADDF Pid Tid Mask");
- return m_LiveStreamer->SetFilter(pid, tid, mask, true)
+ return m_FilterStreamer->SetFilter(pid, tid, mask, true)
? Respond(220, "Filter %d transferring", pid)
: Respond(560, "Filter %d not available", pid);
#else
@@ -735,7 +793,7 @@ bool cConnectionVTP::CmdDELF(char *Opts)
int pid, tid, mask;
char *ep;
- if (m_LiveStreamer == NULL)
+ if (m_FilterStreamer == NULL)
return Respond(560, "Can't delete filters without a stream");
pid = strtol(Opts, &ep, 10);
@@ -750,9 +808,8 @@ bool cConnectionVTP::CmdDELF(char *Opts)
if (ep == Opts || (*ep != '\0' && *ep != ' '))
return Respond(500, "Use: DELF Pid Tid Mask");
- return m_LiveStreamer->SetFilter(pid, tid, mask, false)
- ? Respond(220, "Filter %d stopped", pid)
- : Respond(560, "Filter %d not transferring", pid);
+ m_FilterStreamer->SetFilter(pid, tid, mask, false);
+ return Respond(220, "Filter %d stopped", pid);
#else
return Respond(500, "DELF known but unimplemented with VDR < 1.3.0");
#endif
@@ -768,20 +825,32 @@ bool cConnectionVTP::CmdABRT(char *Opts)
return Respond(500, "Use: ABRT Id");
switch (id) {
- case 0: DELETENULL(m_LiveStreamer); break;
+ case siLive:
+ DELETENULL(m_LiveStreamer);
+ DELETENULL(m_LiveSocket);
+ break;
+#if VDRVERSNUM >= 10300
+ case siLiveFilter:
+ DELETENULL(m_FilterStreamer);
+ DELETENULL(m_FilterSocket);
+ break;
+#endif
+ default:
+ return Respond(501, "Wrong connection id %d", id);
+ break;
+
}
- DELETENULL(m_LiveSocket);
return Respond(220, "Data connection closed");
}
-bool cConnectionVTP::CmdQUIT(char *Opts)
+bool cConnectionVTP::CmdQUIT(void)
{
DeferClose();
return Respond(221, "Video Disk Recorder closing connection");
}
-bool cConnectionVTP::CmdSUSP(char *Opts)
+bool cConnectionVTP::CmdSUSP(void)
{
if (StreamdevServerSetup.SuspendMode == smAlways || cSuspendCtl::IsActive())
return Respond(220, "Server is suspended");
diff --git a/server/connectionVTP.h b/server/connectionVTP.h
index c6ab223..fffff4f 100644
--- a/server/connectionVTP.h
+++ b/server/connectionVTP.h
@@ -2,9 +2,10 @@
#define VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H
#include "server/connection.h"
-#include "server/livestreamer.h"
class cTBSocket;
+class cStreamdevLiveStreamer;
+class cStreamdevFilterStreamer;
class cLSTEHandler;
class cLSTCHandler;
class cLSTTHandler;
@@ -16,8 +17,10 @@ class cConnectionVTP: public cServerConnection {
using cServerConnection::Respond;
private:
- cTBSocket *m_LiveSocket;
- cStreamdevLiveStreamer *m_LiveStreamer;
+ cTBSocket *m_LiveSocket;
+ cStreamdevLiveStreamer *m_LiveStreamer;
+ cTBSocket *m_FilterSocket;
+ cStreamdevFilterStreamer *m_FilterStreamer;
char *m_LastCommand;
bool m_NoTSPIDS;
@@ -53,8 +56,8 @@ public:
bool CmdADDF(char *Opts);
bool CmdDELF(char *Opts);
bool CmdABRT(char *Opts);
- bool CmdQUIT(char *Opts);
- bool CmdSUSP(char *Opts);
+ bool CmdQUIT(void);
+ bool CmdSUSP(void);
// Thread-safe implementations of SVDRP commands
bool CmdLSTE(char *Opts);
@@ -73,9 +76,4 @@ public:
__attribute__ ((format (printf, 3, 4)));
};
-inline bool cConnectionVTP::Abort(void) const
-{
- return m_LiveStreamer && m_LiveStreamer->Abort();
-}
-
#endif // VDR_STREAMDEV_SERVERS_CONNECTIONVTP_H
diff --git a/server/livefilter.c b/server/livefilter.c
index 4524a88..e7d896c 100644
--- a/server/livefilter.c
+++ b/server/livefilter.c
@@ -1,20 +1,24 @@
/*
- * $Id: livefilter.c,v 1.2 2005/02/08 13:59:16 lordjaxom Exp $
+ * $Id: livefilter.c,v 1.4 2007/04/24 11:06:12 schmirl Exp $
*/
#include "server/livefilter.h"
-#include "server/livestreamer.h"
+#include "server/streamer.h"
#include "common.h"
+#ifndef TS_SIZE
+# define TS_SIZE 188
+#endif
+#ifndef TS_SYNC_BYTE
+# define TS_SYNC_BYTE 0x47
+#endif
+
#if VDRVERSNUM >= 10300
-cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer) {
+cStreamdevLiveFilter::cStreamdevLiveFilter(cStreamdevStreamer *Streamer) {
m_Streamer = Streamer;
}
-cStreamdevLiveFilter::~cStreamdevLiveFilter() {
-}
-
void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length)
{
uchar buffer[TS_SIZE];
@@ -24,7 +28,7 @@ void cStreamdevLiveFilter::Process(u_short Pid, u_char Tid, const u_char *Data,
while (length > 0) {
int chunk = min(length, TS_SIZE - 5);
buffer[0] = TS_SYNC_BYTE;
- buffer[1] = (Pid >> 8) & 0xff;
+ buffer[1] = ((Pid >> 8) & 0x3f) | (pos==0 ? 0x40 : 0); /* bit 6: payload unit start indicator (PUSI) */
buffer[2] = Pid & 0xff;
buffer[3] = Tid;
buffer[4] = (uchar)chunk;
diff --git a/server/livefilter.h b/server/livefilter.h
index a30cba0..99c69d4 100644
--- a/server/livefilter.h
+++ b/server/livefilter.h
@@ -1,5 +1,5 @@
/*
- * $Id: livefilter.h,v 1.2 2005/11/07 19:28:41 lordjaxom Exp $
+ * $Id: livefilter.h,v 1.4 2007/04/24 11:29:29 schmirl Exp $
*/
#ifndef VDR_STREAMEV_LIVEFILTER_H
@@ -11,20 +11,24 @@
#include <vdr/filter.h>
-class cStreamdevLiveStreamer;
+class cStreamdevStreamer;
class cStreamdevLiveFilter: public cFilter {
- friend class cStreamdevLiveStreamer;
-
private:
- cStreamdevLiveStreamer *m_Streamer;
+ cStreamdevStreamer *m_Streamer;
protected:
virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length);
public:
- cStreamdevLiveFilter(cStreamdevLiveStreamer *Streamer);
- virtual ~cStreamdevLiveFilter();
+ cStreamdevLiveFilter(cStreamdevStreamer *Streamer);
+
+ void Set(u_short Pid, u_char Tid, u_char Mask) {
+ cFilter::Set(Pid, Tid, Mask);
+ }
+ void Del(u_short Pid, u_char Tid, u_char Mask) {
+ cFilter::Del(Pid, Tid, Mask);
+ }
};
# endif // VDRVERSNUM >= 10300
diff --git a/server/livestreamer.c b/server/livestreamer.c
index 6148720..1bbeddb 100644
--- a/server/livestreamer.c
+++ b/server/livestreamer.c
@@ -1,6 +1,12 @@
+#include <assert.h>
+
+#include <libsi/section.h>
+#include <libsi/descriptor.h>
+
#include <vdr/ringbuffer.h>
#include "server/livestreamer.h"
+#include "server/livefilter.h"
#include "remux/ts2ps.h"
#include "remux/ts2es.h"
#include "remux/extern.h"
@@ -8,12 +14,31 @@
// --- cStreamdevLiveReceiver -------------------------------------------------
+class cStreamdevLiveReceiver: public cReceiver {
+ friend class cStreamdevStreamer;
+
+private:
+ cStreamdevStreamer *m_Streamer;
+
+protected:
+ virtual void Activate(bool On);
+ virtual void Receive(uchar *Data, int Length);
+
+public:
#if VDRVERSNUM < 10500
-cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca,
+ cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, int Ca, int Priority, const int *Pids);
+#else
+ cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids);
+#endif
+ virtual ~cStreamdevLiveReceiver();
+};
+
+#if VDRVERSNUM < 10500
+cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, int Ca,
int Priority, const int *Pids):
cReceiver(Ca, Priority, 0, Pids),
#else
-cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, tChannelID ChannelID,
+cStreamdevLiveReceiver::cStreamdevLiveReceiver(cStreamdevStreamer *Streamer, tChannelID ChannelID,
int Priority, const int *Pids):
cReceiver(ChannelID, Priority, 0, Pids),
#endif
@@ -33,6 +58,228 @@ void cStreamdevLiveReceiver::Receive(uchar *Data, int Length) {
m_Streamer->ReportOverflow(Length - p);
}
+inline void cStreamdevLiveReceiver::Activate(bool On)
+{
+ Dprintf("LiveReceiver->Activate(%d)\n", On);
+ m_Streamer->Activate(On);
+}
+
+// --- cStreamdevPatFilter ----------------------------------------------------
+
+class cStreamdevPatFilter : public cFilter {
+private:
+ int pmtPid;
+ int pmtSid;
+ int pmtVersion;
+
+ const cChannel *m_Channel;
+ cStreamdevLiveStreamer *m_Streamer;
+
+ virtual void Process(u_short Pid, u_char Tid, const u_char *Data, int Length);
+
+ int GetPid(SI::PMT::Stream& stream);
+public:
+ cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel);
+};
+
+cStreamdevPatFilter::cStreamdevPatFilter(cStreamdevLiveStreamer *Streamer, const cChannel *Channel)
+{
+ Dprintf("cStreamdevPatFilter(\"%s\")", Channel->Name());
+ assert(Streamer);
+ m_Channel = Channel;
+ m_Streamer = Streamer;
+ pmtPid = 0;
+ pmtSid = 0;
+ pmtVersion = -1;
+ Set(0x00, 0x00); // PAT
+}
+
+static const char * const psStreamTypes[] = {
+ "UNKNOWN",
+ "ISO/IEC 11172 Video",
+ "ISO/IEC 13818-2 Video",
+ "ISO/IEC 11172 Audio",
+ "ISO/IEC 13818-3 Audio",
+ "ISO/IEC 13818-1 Privete sections",
+ "ISO/IEC 13818-1 Private PES data",
+ "ISO/IEC 13512 MHEG",
+ "ISO/IEC 13818-1 Annex A DSM CC",
+ "0x09",
+ "ISO/IEC 13818-6 Multiprotocol encapsulation",
+ "ISO/IEC 13818-6 DSM-CC U-N Messages",
+ "ISO/IEC 13818-6 Stream Descriptors",
+ "ISO/IEC 13818-6 Sections (any type, including private data)",
+ "ISO/IEC 13818-1 auxiliary",
+ "ISO/IEC 13818-7 Audio with ADTS transport sytax",
+ "ISO/IEC 14496-2 Visual (MPEG-4)",
+ "ISO/IEC 14496-3 Audio with LATM transport syntax",
+ "0x12", "0x13", "0x14", "0x15", "0x16", "0x17", "0x18", "0x19", "0x1a",
+ "ISO/IEC 14496-10 Video (MPEG-4 part 10/AVC, aka H.264)",
+ "",
+};
+
+int cStreamdevPatFilter::GetPid(SI::PMT::Stream& stream)
+{
+ SI::Descriptor *d;
+
+ if (!stream.getPid())
+ return 0;
+
+ switch (stream.getStreamType()) {
+ case 0x01: // ISO/IEC 11172 Video
+ case 0x02: // ISO/IEC 13818-2 Video
+ case 0x03: // ISO/IEC 11172 Audio
+ case 0x04: // ISO/IEC 13818-3 Audio
+#if 0
+ case 0x07: // ISO/IEC 13512 MHEG
+ case 0x08: // ISO/IEC 13818-1 Annex A DSM CC
+ case 0x0a: // ISO/IEC 13818-6 Multiprotocol encapsulation
+ case 0x0b: // ISO/IEC 13818-6 DSM-CC U-N Messages
+ case 0x0c: // ISO/IEC 13818-6 Stream Descriptors
+ case 0x0d: // ISO/IEC 13818-6 Sections (any type, including private data)
+ case 0x0e: // ISO/IEC 13818-1 auxiliary
+#endif
+ case 0x0f: // ISO/IEC 13818-7 Audio with ADTS transport syntax
+ case 0x10: // ISO/IEC 14496-2 Visual (MPEG-4)
+ case 0x11: // ISO/IEC 14496-3 Audio with LATM transport syntax
+ case 0x1b: // ISO/IEC 14496-10 Video (MPEG-4 part 10/AVC, aka H.264)
+ Dprintf("cStreamdevPatFilter PMT scanner adding PID %d (%s)",
+ stream.getPid(), psStreamTypes[stream.getStreamType()]);
+ return stream.getPid();
+ case 0x05: // ISO/IEC 13818-1 private sections
+ case 0x06: // ISO/IEC 13818-1 PES packets containing private data
+ for (SI::Loop::Iterator it; (d = stream.streamDescriptors.getNext(it)); ) {
+ switch (d->getDescriptorTag()) {
+ case SI::AC3DescriptorTag:
+ Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s",
+ stream.getPid(), psStreamTypes[stream.getStreamType()], "AC3");
+ return stream.getPid();
+ case SI::TeletextDescriptorTag:
+ Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s",
+ stream.getPid(), psStreamTypes[stream.getStreamType()], "Teletext");
+ return stream.getPid();
+ case SI::SubtitlingDescriptorTag:
+ Dprintf("cStreamdevPatFilter PMT scanner: adding PID %d (%s) %s",
+ stream.getPid(), psStreamTypes[stream.getStreamType()], "DVBSUB");
+ return stream.getPid();
+ default:
+ Dprintf("cStreamdevPatFilter PMT scanner: NOT adding PID %d (%s) %s",
+ stream.getPid(), psStreamTypes[stream.getStreamType()], "UNKNOWN");
+ break;
+ }
+ delete d;
+ }
+ break;
+ default:
+ /* This following section handles all the cases where the audio track
+ * info is stored in PMT user info with stream id >= 0x80
+ * we check the registration format identifier to see if it
+ * holds "AC-3"
+ */
+ if (stream.getStreamType() >= 0x80) {
+ bool found = false;
+ for (SI::Loop::Iterator it; (d = stream.streamDescriptors.getNext(it)); ) {
+ switch (d->getDescriptorTag()) {
+ case SI::RegistrationDescriptorTag:
+ /* unfortunately libsi does not implement RegistrationDescriptor */
+ if (d->getLength() >= 4) {
+ found = true;
+ SI::CharArray rawdata = d->getData();
+ if (/*rawdata[0] == 5 && rawdata[1] >= 4 && */
+ rawdata[2] == 'A' && rawdata[3] == 'C' &&
+ rawdata[4] == '-' && rawdata[5] == '3') {
+ isyslog("cStreamdevPatFilter PMT scanner:"
+ "Adding pid %d (type 0x%x) RegDesc len %d (%c%c%c%c)",
+ stream.getPid(), stream.getStreamType(),
+ d->getLength(), rawdata[2], rawdata[3],
+ rawdata[4], rawdata[5]);
+ return stream.getPid();
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ delete d;
+ }
+ if(!found) {
+ isyslog("Adding pid %d (type 0x%x) RegDesc not found -> assume AC-3",
+ stream.getPid(), stream.getStreamType());
+ return stream.getPid();
+ }
+ }
+ Dprintf("cStreamdevPatFilter PMT scanner: NOT adding PID %d (%s) %s",
+ stream.getPid(), psStreamTypes[stream.getStreamType()<0x1c?stream.getStreamType():0], "UNKNOWN");
+ break;
+ }
+ return 0;
+}
+
+void cStreamdevPatFilter::Process(u_short Pid, u_char Tid, const u_char *Data, int Length)
+{
+ if (Pid == 0x00) {
+ if (Tid == 0x00 && !pmtPid) {
+ SI::PAT pat(Data, false);
+ if (!pat.CheckCRCAndParse())
+ return;
+ SI::PAT::Association assoc;
+ for (SI::Loop::Iterator it; pat.associationLoop.getNext(assoc, it); ) {
+ if (!assoc.isNITPid()) {
+ const cChannel *Channel = Channels.GetByServiceID(Source(), Transponder(), assoc.getServiceId());
+ if (Channel && (Channel == m_Channel)) {
+ if (0 != (pmtPid = assoc.getPid())) {
+ Dprintf("cStreamdevPatFilter: PMT pid for channel %s: %d", Channel->Name(), pmtPid);
+ pmtSid = assoc.getServiceId();
+ if (Length < TS_SIZE-5) {
+ // repack PAT to TS frame and send to client
+ uint8_t pat_ts[TS_SIZE] = {TS_SYNC_BYTE, 0x40 /* pusi=1 */, 0 /* pid=0 */, 0x10 /* adaption=1 */, 0 /* pointer */};
+ memcpy(pat_ts + 5, Data, Length);
+ m_Streamer->Put(pat_ts, TS_SIZE);
+ } else
+ isyslog("cStreamdevPatFilter: PAT size %d too large to fit in one TS", Length);
+ m_Streamer->SetPids(pmtPid);
+ Add(pmtPid, 0x02);
+ pmtVersion = -1;
+ return;
+ }
+ }
+ }
+ }
+ }
+ } else if (Pid == pmtPid && Tid == SI::TableIdPMT && Source() && Transponder()) {
+ SI::PMT pmt(Data, false);
+ if (!pmt.CheckCRCAndParse())
+ return;
+ if (pmt.getServiceId() != pmtSid)
+ return; // skip broken PMT records
+ if (pmtVersion != -1) {
+ if (pmtVersion != pmt.getVersionNumber()) {
+ Dprintf("cStreamdevPatFilter: PMT version changed, detaching all pids");
+ Del(pmtPid, 0x02);
+ pmtPid = 0; // this triggers PAT scan
+ }
+ return;
+ }
+ pmtVersion = pmt.getVersionNumber();
+
+ SI::PMT::Stream stream;
+ int pids[MAXRECEIVEPIDS + 1], npids = 0;
+ pids[npids++] = pmtPid;
+#if 0
+ pids[npids++] = 0x10; // pid 0x10, tid 0x40: NIT
+ pids[npids++] = 0x11; // pid 0x11, tid 0x42: SDT
+ pids[npids++] = 0x12; // pid 0x12, tid 0x4E...0x6F: EIT
+ pids[npids++] = 0x14; // pid 0x14, tid 0x70: TDT
+#endif
+ for (SI::Loop::Iterator it; pmt.streamLoop.getNext(stream, it); )
+ if (0 != (pids[npids] = GetPid(stream)) && npids < MAXRECEIVEPIDS)
+ npids++;
+
+ pids[npids] = 0;
+ m_Streamer->SetPids(pmt.getPCRPid(), pids);
+ }
+}
+
// --- cStreamdevLiveStreamer -------------------------------------------------
cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority):
@@ -43,6 +290,7 @@ cStreamdevLiveStreamer::cStreamdevLiveStreamer(int Priority):
m_Channel(NULL),
m_Device(NULL),
m_Receiver(NULL),
+ m_PatFilter(NULL),
m_PESRemux(NULL),
m_ESRemux(NULL),
m_PSRemux(NULL),
@@ -54,14 +302,24 @@ cStreamdevLiveStreamer::~cStreamdevLiveStreamer()
{
Dprintf("Desctructing Live streamer\n");
Stop();
- delete m_Receiver;
+ if(m_PatFilter) {
+ Detach();
+ DELETENULL(m_PatFilter);
+ }
+ DELETENULL(m_Receiver);
delete m_PESRemux;
delete m_ESRemux;
delete m_PSRemux;
delete m_ExtRemux;
-#if VDRVERSNUM >= 10300
- //delete m_Filter; TODO
-#endif
+}
+
+bool cStreamdevLiveStreamer::HasPid(int Pid)
+{
+ int idx;
+ for (idx = 0; idx < m_NumPids; ++idx)
+ if (m_Pids[idx] == Pid)
+ return true;
+ return false;
}
bool cStreamdevLiveStreamer::SetPid(int Pid, bool On)
@@ -93,6 +351,44 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On)
}
}
+ StartReceiver();
+ return true;
+}
+
+bool cStreamdevLiveStreamer::SetPids(int Pid, const int *Pids1, const int *Pids2, const int *Pids3)
+{
+ m_NumPids = 0;
+
+ if (Pid)
+ m_Pids[m_NumPids++] = Pid;
+
+ if (Pids1)
+ for ( ; *Pids1 && m_NumPids < MAXRECEIVEPIDS; Pids1++)
+ if (!HasPid(*Pids1))
+ m_Pids[m_NumPids++] = *Pids1;
+
+ if (Pids2)
+ for ( ; *Pids2 && m_NumPids < MAXRECEIVEPIDS; Pids2++)
+ if (!HasPid(*Pids2))
+ m_Pids[m_NumPids++] = *Pids2;
+
+ if (Pids3)
+ for ( ; *Pids3 && m_NumPids < MAXRECEIVEPIDS; Pids3++)
+ if (!HasPid(*Pids3))
+ m_Pids[m_NumPids++] = *Pids3;
+
+ if (m_NumPids >= MAXRECEIVEPIDS) {
+ esyslog("ERROR: Streamdev: No free slot to receive pid %d\n", Pid);
+ return false;
+ }
+
+ m_Pids[m_NumPids] = 0;
+ StartReceiver();
+ return true;
+}
+
+void cStreamdevLiveStreamer::StartReceiver(void)
+{
DELETENULL(m_Receiver);
if (m_NumPids > 0) {
Dprintf("Creating Receiver to respect changed pids\n");
@@ -106,15 +402,19 @@ bool cStreamdevLiveStreamer::SetPid(int Pid, bool On)
Attach();
}
}
- return true;
}
bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType StreamType, int Apid)
{
Dprintf("Initializing Remuxer for full channel transfer\n");
- printf("ca pid: %d\n", Channel->Ca());
+ //printf("ca pid: %d\n", Channel->Ca());
m_Channel = Channel;
m_StreamType = StreamType;
+
+ int apid[2] = { Apid, 0 };
+ const int *Apids = Apid ? apid : m_Channel->Apids();
+ const int *Dpids = Apid ? NULL : m_Channel->Dpids();
+
switch (m_StreamType) {
case stES:
{
@@ -122,51 +422,33 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str
if (Apid != 0)
pid = Apid;
m_ESRemux = new cTS2ESRemux(pid);
- return SetPid(pid, true);
+ return SetPids(pid);
}
case stPES:
- Dprintf("PES\n");
m_PESRemux = new cRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids(), false);
- if (Apid != 0)
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(Apid, true);
- else
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(m_Channel->Apid(0), true)
- && SetPid(m_Channel->Dpid(0), true);
+ return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stPS:
m_PSRemux = new cTS2PSRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids());
- if (Apid != 0)
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(Apid, true);
- else
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(m_Channel->Apid(0), true)
- && SetPid(m_Channel->Dpid(0), true);
+ return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stTS:
- if (Apid != 0)
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(Apid, true);
- else
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(m_Channel->Apid(0), true)
- && SetPid(m_Channel->Dpid(0), true);
+ // This should never happen, but ...
+ if (m_PatFilter) {
+ Detach();
+ DELETENULL(m_PatFilter);
+ }
+ // Set pids from PMT
+ m_PatFilter = new cStreamdevPatFilter(this, m_Channel);
+ return true;
case stExtern:
m_ExtRemux = new cExternRemux(m_Channel->Vpid(), m_Channel->Apids(), m_Channel->Dpids(),
m_Channel->Spids());
- if (Apid != 0)
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(Apid, true);
- else
- return SetPid(m_Channel->Vpid(), true)
- && SetPid(m_Channel->Apid(0), true)
- && SetPid(m_Channel->Dpid(0), true);
+ return SetPids(m_Channel->Vpid(), Apids, Dpids, m_Channel->Spids());
case stTSPIDS:
Dprintf("pid streaming mode\n");
@@ -175,25 +457,6 @@ bool cStreamdevLiveStreamer::SetChannel(const cChannel *Channel, eStreamType Str
return false;
}
-bool cStreamdevLiveStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On)
-{
-#if 0
- Dprintf("setting filter\n");
- if (On) {
- if (m_Filter == NULL) {
- m_Filter = new cStreamdevLiveFilter(this);
- Dprintf("attaching filter to device\n");
- m_Device->AttachFilter(m_Filter);
- }
- m_Filter->Set(Pid, Tid, Mask);
- } else if (m_Filter != NULL)
- m_Filter->Del(Pid, Tid, Mask);
- return true;
-#else
- return false;
-#endif
-}
-
int cStreamdevLiveStreamer::Put(const uchar *Data, int Count)
{
switch (m_StreamType) {
@@ -270,14 +533,28 @@ void cStreamdevLiveStreamer::Del(int Count)
void cStreamdevLiveStreamer::Attach(void)
{
- printf("RIGHT ATTACH\n");
- m_Device->AttachReceiver(m_Receiver);
+ Dprintf("cStreamdevLiveStreamer::Attach()\n");
+ if (m_Device) {
+ if (m_Receiver) {
+ m_Device->Detach(m_Receiver);
+ m_Device->AttachReceiver(m_Receiver);
+ }
+ if (m_PatFilter) {
+ m_Device->Detach(m_PatFilter);
+ m_Device->AttachFilter(m_PatFilter);
+ }
+ }
}
void cStreamdevLiveStreamer::Detach(void)
{
- printf("RIGHT DETACH\n");
- m_Device->Detach(m_Receiver);
+ Dprintf("cStreamdevLiveStreamer::Detach()\n");
+ if (m_Device) {
+ if (m_Receiver)
+ m_Device->Detach(m_Receiver);
+ if (m_PatFilter)
+ m_Device->Detach(m_PatFilter);
+ }
}
std::string cStreamdevLiveStreamer::Report(void)
@@ -296,3 +573,110 @@ std::string cStreamdevLiveStreamer::Report(void)
result += "\n";
return result;
}
+
+// --- cStreamdevFilterStreamer -------------------------------------------------
+
+#if VDRVERSNUM >= 10300
+cStreamdevFilterStreamer::cStreamdevFilterStreamer():
+ cStreamdevStreamer("streamdev-filterstreaming"),
+ m_Device(NULL),
+ m_Filter(NULL)/*,
+ m_Channel(NULL)*/
+{
+}
+
+cStreamdevFilterStreamer::~cStreamdevFilterStreamer()
+{
+ Dprintf("Desctructing Filter streamer\n");
+ Detach();
+ m_Device = NULL;
+ DELETENULL(m_Filter);
+ Stop();
+}
+
+void cStreamdevFilterStreamer::Attach(void)
+{
+ Dprintf("cStreamdevFilterStreamer::Attach()\n");
+ LOCK_THREAD;
+ if(m_Device && m_Filter)
+ m_Device->AttachFilter(m_Filter);
+}
+
+void cStreamdevFilterStreamer::Detach(void)
+{
+ Dprintf("cStreamdevFilterStreamer::Detach()\n");
+ LOCK_THREAD;
+ if(m_Device && m_Filter)
+ m_Device->Detach(m_Filter);
+}
+
+#if 0
+void cStreamdevFilterStreamer::SetChannel(const cChannel *Channel)
+{
+ LOCK_THREAD;
+ Dprintf("cStreamdevFilterStreamer::SetChannel(%s : %s)", Channel?Channel->Name():"<null>",
+ Channel ? *Channel->GetChannelID().ToString() : "");
+ m_Channel = Channel;
+}
+#endif
+
+void cStreamdevFilterStreamer::SetDevice(cDevice *Device)
+{
+ Dprintf("cStreamdevFilterStreamer::SetDevice()\n");
+ LOCK_THREAD;
+ if(Device != m_Device) {
+ Detach();
+ m_Device = Device;
+ //m_Channel = NULL;
+ Attach();
+ }
+}
+
+bool cStreamdevFilterStreamer::SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On)
+{
+ Dprintf("cStreamdevFilterStreamer::SetFilter(%u,0x%x,0x%x,%s)\n", Pid, Tid, Mask, On?"On":"Off");
+
+ if(!m_Device)
+ return false;
+
+ if (On) {
+ if (m_Filter == NULL) {
+ m_Filter = new cStreamdevLiveFilter(this);
+ Dprintf("attaching filter to device\n");
+ Attach();
+ }
+ m_Filter->Set(Pid, Tid, Mask);
+ } else if (m_Filter != NULL)
+ m_Filter->Del(Pid, Tid, Mask);
+
+ return true;
+}
+
+#if 0
+void cStreamdevFilterStreamer::ChannelSwitch(const cDevice *Device, int ChannelNumber) {
+ LOCK_THREAD;
+ if(Device == m_Device) {
+ if(ChannelNumber > 0) {
+ cChannel *ch = Channels.GetByNumber(ChannelNumber);
+ if(ch != NULL) {
+ if(m_Filter != NULL &&
+ m_Channel != NULL &&
+ (! TRANSPONDER(ch, m_Channel))) {
+
+ isyslog("***** LiveFilterStreamer: transponder changed ! %s",
+ *ch->GetChannelID().ToString());
+
+ uchar buffer[TS_SIZE] = {TS_SYNC_BYTE, 0xff, 0xff, 0xff, 0x7f, 0};
+ strcpy((char*)(buffer + 5), ch->GetChannelID().ToString());
+ int p = Put(buffer, TS_SIZE);
+ if (p != TS_SIZE)
+ ReportOverflow(TS_SIZE - p);
+ }
+ m_Channel = ch;
+ }
+ }
+ }
+}
+#endif
+
+#endif // if VDRVERSNUM >= 10300
diff --git a/server/livestreamer.h b/server/livestreamer.h
index 0c525bf..1973f71 100644
--- a/server/livestreamer.h
+++ b/server/livestreamer.h
@@ -5,34 +5,14 @@
#include <vdr/receiver.h>
#include "server/streamer.h"
-#include "server/livefilter.h"
#include "common.h"
class cTS2PSRemux;
class cTS2ESRemux;
class cExternRemux;
class cRemux;
-
-// --- cStreamdevLiveReceiver -------------------------------------------------
-
-class cStreamdevLiveReceiver: public cReceiver {
- friend class cStreamdevLiveStreamer;
-
-private:
- cStreamdevLiveStreamer *m_Streamer;
-
-protected:
- virtual void Activate(bool On);
- virtual void Receive(uchar *Data, int Length);
-
-public:
-#if VDRVERSNUM < 10500
- cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, int Ca, int Priority, const int *Pids);
-#else
- cStreamdevLiveReceiver(cStreamdevLiveStreamer *Streamer, tChannelID ChannelID, int Priority, const int *Pids);
-#endif
- virtual ~cStreamdevLiveReceiver();
-};
+class cStreamdevPatFilter;
+class cStreamdevLiveReceiver;
// --- cStreamdevLiveStreamer -------------------------------------------------
@@ -45,19 +25,23 @@ private:
const cChannel *m_Channel;
cDevice *m_Device;
cStreamdevLiveReceiver *m_Receiver;
+ cStreamdevPatFilter *m_PatFilter;
cRemux *m_PESRemux;
cTS2ESRemux *m_ESRemux;
cTS2PSRemux *m_PSRemux;
cExternRemux *m_ExtRemux;
+ void StartReceiver(void);
+ bool HasPid(int Pid);
+
public:
cStreamdevLiveStreamer(int Priority);
virtual ~cStreamdevLiveStreamer();
void SetDevice(cDevice *Device) { m_Device = Device; }
bool SetPid(int Pid, bool On);
+ bool SetPids(int Pid, const int *Pids1 = NULL, const int *Pids2 = NULL, const int *Pids3 = NULL);
bool SetChannel(const cChannel *Channel, eStreamType StreamType, int Apid = 0);
- bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On);
virtual int Put(const uchar *Data, int Count);
virtual uchar *Get(int &Count);
@@ -70,12 +54,36 @@ public:
virtual std::string Report(void);
};
-// --- cStreamdevLiveReceiver reverse inlines ---------------------------------
-inline void cStreamdevLiveReceiver::Activate(bool On)
-{
- Dprintf("LiveReceiver->Activate(%d)\n", On);
- m_Streamer->Activate(On);
-}
+// --- cStreamdevFilterStreamer -------------------------------------------------
+
+# if VDRVERSNUM >= 10300
+
+//#include <vdr/status.h>
+
+class cStreamdevLiveFilter;
+
+class cStreamdevFilterStreamer: public cStreamdevStreamer /*, public cStatus*/ {
+private:
+ cDevice *m_Device;
+ cStreamdevLiveFilter *m_Filter;
+ //const cChannel *m_Channel;
+
+public:
+ cStreamdevFilterStreamer();
+ virtual ~cStreamdevFilterStreamer();
+
+ void SetDevice(cDevice *Device);
+ //void SetChannel(const cChannel *Channel);
+ bool SetFilter(u_short Pid, u_char Tid, u_char Mask, bool On);
+
+ virtual void Attach(void);
+ virtual void Detach(void);
+
+ // cStatus message handlers
+ //virtual void ChannelSwitch(const cDevice *Device, int ChannelNumber);
+};
+
+# endif // if VDRVERSNUM >= 10300
#endif // VDR_STREAMDEV_LIVESTREAMER_H
diff --git a/streamdev-server.c b/streamdev-server.c
index af5f104..1d0c097 100644
--- a/streamdev-server.c
+++ b/streamdev-server.c
@@ -3,7 +3,7 @@
*
* See the README file for copyright information and how to reach the author.
*
- * $Id: streamdev-server.c,v 1.5 2007/02/19 12:08:16 schmirl Exp $
+ * $Id: streamdev-server.c,v 1.6 2007/04/16 11:01:02 schmirl Exp $
*/
#include <getopt.h>
@@ -63,7 +63,7 @@ bool cPluginStreamdevServer::Start(void)
if (!StreamdevHosts.Load(STREAMDEVHOSTSPATH, true, true)) {
esyslog("streamdev-server: error while loading %s", STREAMDEVHOSTSPATH);
- fprintf(stderr, "streamdev-server: error while loading %s\n");
+ fprintf(stderr, "streamdev-server: error while loading %s\n", STREAMDEVHOSTSPATH);
if (access(STREAMDEVHOSTSPATH, F_OK) != 0) {
fprintf(stderr, " Please install streamdevhosts.conf into the path "
"printed above. Without it\n"
diff --git a/tools/socket.c b/tools/socket.c
index 4b5167d..e9266c5 100644
--- a/tools/socket.c
+++ b/tools/socket.c
@@ -6,6 +6,15 @@
#include <errno.h>
#include <fcntl.h>
+// default class: best effort
+#define DSCP_BE 0
+// gold class (video): assured forwarding 4 with lowest drop precedence
+#define DSCP_AF41 34 << 2
+// premium class (voip): expedited forwarding
+#define DSCP_EF 46 << 2
+// actual DSCP value used
+#define STREAMDEV_DSCP DSCP_AF41
+
cTBSocket::cTBSocket(int Type) {
memset(&m_LocalAddr, 0, sizeof(m_LocalAddr));
memset(&m_RemoteAddr, 0, sizeof(m_RemoteAddr));
@@ -141,3 +150,8 @@ bool cTBSocket::Shutdown(int how) {
return ::shutdown(*this, how) != -1;
}
+
+bool cTBSocket::SetDSCP(void) {
+ int dscp = STREAMDEV_DSCP;
+ return ::setsockopt(*this, SOL_IP, IP_TOS, &dscp, sizeof(dscp)) != -1;
+}
diff --git a/tools/socket.h b/tools/socket.h
index d1a7d62..23272ec 100644
--- a/tools/socket.h
+++ b/tools/socket.h
@@ -68,6 +68,9 @@ public:
an appropriate value. */
virtual bool Accept(const cTBSocket &Listener);
+ /* Sets DSCP sockopt */
+ bool SetDSCP(void);
+
/* LocalPort() returns the port number this socket is connected to locally.
The result is undefined for a non-open socket. */
int LocalPort(void) const { return ntohs(m_LocalAddr.sin_port); }
diff --git a/tools/source.c b/tools/source.c
index 80625e5..c328d7c 100644
--- a/tools/source.c
+++ b/tools/source.c
@@ -110,7 +110,7 @@ bool cTBSource::SafeWrite(const void *Buffer, size_t Length) {
ssize_t cTBSource::ReadUntil(void *Buffer, size_t Length, const char *Seq,
uint TimeoutMs) {
- int seqlen, ms;
+ int ms;
size_t len;
cTBSelect sel;