summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pes2ts.c45
-rw-r--r--pes2ts.h58
-rw-r--r--streamdevice.c70
-rw-r--r--streamdevice.h3
-rw-r--r--tools/socket.c26
-rw-r--r--tsworker.c87
6 files changed, 158 insertions, 131 deletions
diff --git a/pes2ts.c b/pes2ts.c
index f7d7f55..5f17ed0 100644
--- a/pes2ts.c
+++ b/pes2ts.c
@@ -13,26 +13,44 @@
#include "pes2ts.h"
-cPES2TSRemux::cPES2TSRemux(int VPid, int APid):
+//////////////////////////////////////////////////////////////////////////////
+cPESRemux::cPESRemux(int inputBufferSize, int outputBufferSize):
+ m_InputBuffer(new cRingBufferLinear(inputBufferSize, outputBufferSize))
+{
+ OutputLocked = false;
+ m_InputBuffer->SetTimeouts(0, 1000); // IMPORTANT to avoid busy wait in threads main loop and thus a high CPU load
+}
+
+cPESRemux::~cPESRemux()
+{
+ delete m_InputBuffer;
+}
+
+int cPESRemux::Put(const uchar *Data, int Count)
+{
+ InputMutex.Lock();
+ int result = m_InputBuffer->Put(Data, Count);
+ InputMutex.Unlock();
+ return ( result);
+}
+
+
+//////////////////////////////////////////////////////////////////////////////
+cPES2TSRemux::cPES2TSRemux(int VPid, int APid): cPESRemux(INPUTBUFSIZE, IPACKS),
cThread("[ffnetdev] PES2TS remux"),
m_OutputBuffer(new cRingBufferLinear(OUTPUTBUFSIZE, TS_SIZE * 2)),
- m_InputBuffer(new cRingBufferLinear(INPUTBUFSIZE, IPACKS)),
m_Active(false),
m_PlayModeChanged(false)
{
vpid = VPid;
apid = APid;
- m_InputBuffer->SetTimeouts(0, 1000); // IMPORTANT to avoid busy wait in threads main loop and thus a high CPU load
Start();
- OutputLocked = false;
}
cPES2TSRemux::~cPES2TSRemux()
{
m_Active = false;
- delete m_InputBuffer;
delete m_OutputBuffer;
-
}
void cPES2TSRemux::Action(void)
@@ -57,8 +75,8 @@ void cPES2TSRemux::Action(void)
if (m_PlayModeChanged)
{
- cCondWait::SleepMs(1500);
- m_PlayModeChanged = false;
+ cCondWait::SleepMs(1500);
+ m_PlayModeChanged = false;
}
if (m_InputBuffer->Available() < (int)IPACKS*10) {
@@ -199,12 +217,11 @@ void cPES2TSRemux::Action(void)
}
-int cPES2TSRemux::Put(const uchar *Data, int Count)
+//////////////////////////////////////////////////////////////////////////////
+cPES2PESRemux::cPES2PESRemux(): cPESRemux(INPUTBUFSIZE + OUTPUTBUFSIZE, IPACKS)
{
- InputMutex.Lock();
- int result = m_InputBuffer->Put(Data, Count);
- InputMutex.Unlock();
- return ( result);
}
-
+cPES2PESRemux::~cPES2PESRemux()
+{
+}
diff --git a/pes2ts.h b/pes2ts.h
index 43fa3a1..2674bd8 100644
--- a/pes2ts.h
+++ b/pes2ts.h
@@ -16,15 +16,40 @@
#define TS_SIZE 188
#define IPACKS 2048
-class cPES2TSRemux: public cThread {
+class cPESRemux {
+private:
+ bool OutputLocked;
+
+protected:
+ cRingBufferLinear *m_InputBuffer;
+ cMutex InputMutex;
+
+public:
+ cPESRemux(int inputBufferSize, int outputBufferSize);
+ virtual ~cPESRemux();
+
+ int Put(const uchar *Data, int Count);
+ void DelInput (int Count) { InputMutex.Lock(); m_InputBuffer ->Del(Count); InputMutex.Unlock(); }
+ void ClearInput () { InputMutex.Lock(); m_InputBuffer ->Clear(); InputMutex.Unlock(); }
+
+ virtual int Available(void) = 0;
+ virtual int Free(void) = 0;
+ virtual int InputFree(void) = 0;
+ virtual uchar *Get(int &Count) = 0;
+ virtual void DelOutput(int Count) = 0;
+ virtual void ClearOutput() = 0;
+ void LockOutput() { while (OutputLocked) cCondWait::SleepMs(1); OutputLocked = true; }
+ void UnlockOutput() { OutputLocked = false; }
+ virtual void PlayModeChange() {};
+
+};
+
+class cPES2TSRemux: public cPESRemux, cThread {
private:
cRingBufferLinear *m_OutputBuffer;
- cRingBufferLinear *m_InputBuffer;
bool m_Active;
unsigned short vpid;
unsigned short apid;
- bool OutputLocked;
- cMutex InputMutex;
bool m_PlayModeChanged;
protected:
@@ -34,17 +59,28 @@ public:
cPES2TSRemux(int VPid, int APid);
virtual ~cPES2TSRemux();
- int Free(void) { return m_InputBuffer->Free(); }
- int Available(void) { return m_OutputBuffer->Available(); }
- int Put(const uchar *Data, int Count);
+ int Available(void) { return m_OutputBuffer->Available(); }
+ int Free(void) { return m_OutputBuffer->Free(); }
+ int InputFree(void) { return m_InputBuffer->Free(); }
uchar *Get(int &Count) { return m_OutputBuffer->Get(Count); }
void DelOutput(int Count) { m_OutputBuffer->Del(Count); }
- void DelInput (int Count) { InputMutex.Lock(); m_InputBuffer ->Del(Count); InputMutex.Unlock(); }
void ClearOutput() { LockOutput(); m_OutputBuffer->Clear(); UnlockOutput(); }
- void ClearInput () { InputMutex.Lock(); m_InputBuffer ->Clear(); InputMutex.Unlock(); }
- void LockOutput() { while (OutputLocked) cCondWait::SleepMs(1); OutputLocked = true; }
- void UnlockOutput() { OutputLocked = false; }
void PlayModeChange() { m_PlayModeChanged = true; }
+
+};
+
+class cPES2PESRemux: public cPESRemux {
+public:
+ cPES2PESRemux();
+ virtual ~cPES2PESRemux();
+
+ int Available(void) { return m_InputBuffer->Available(); }
+ int Free(void) { return m_InputBuffer->Free(); }
+ int InputFree(void) { return m_InputBuffer->Free(); }
+ uchar *Get(int &Count) { return m_InputBuffer->Get(Count); }
+ void DelOutput(int Count) { m_InputBuffer->Del(Count); }
+ void ClearOutput() { LockOutput(); m_InputBuffer->Clear(); UnlockOutput(); }
+
};
#endif // PES2TSREMUX_H
diff --git a/streamdevice.c b/streamdevice.c
index 354bad5..7380e9e 100644
--- a/streamdevice.c
+++ b/streamdevice.c
@@ -12,103 +12,97 @@
cStreamDevice::cStreamDevice(void)
{
-#ifdef DEBUG
- fprintf(stderr,"[ffnetdev] Device: Constructor cStreamDevice \n");
-#endif
- m_Remux = new cPES2TSRemux(TS_VPID, TS_APID);
-
+ dsyslog("[ffnetdev] Device: Constructor cStreamDevice \n");
+ //m_Remux = new cPES2TSRemux(TS_VPID, TS_APID);
+ m_Remux = new cPES2PESRemux();
}
cStreamDevice::~cStreamDevice(void)
{
-#ifdef DEBUG
- fprintf(stderr,"[ffnetdev] Device: Destructor cStreamDevice \n");
-#endif
- DELETENULL(m_Remux);
+ dsyslog("[ffnetdev] Device: Destructor cStreamDevice \n");
+ DELETENULL(m_Remux);
}
void cStreamDevice::MakePrimaryDevice(bool On)
{
-#ifdef DEBUG
- fprintf(stderr,"[ffnetdev] Device: ffnetdev becomes primary device. Registering our OSD provider...\n");
-#endif
+ dsyslog("[ffnetdev] Device: ffnetdev becomes primary device. Registering our OSD provider...\n");
new cNetOSDProvider();
}
int cStreamDevice::ProvidesCa(const cChannel *Channel) const
{
- return 0;
+ return 0;
}
bool cStreamDevice::HasDecoder(void) const
{
- return true; // We can decode MPEG2
+ return true; // We can decode MPEG2
}
bool cStreamDevice::CanReplay(void) const
{
- return true; // We can replay
+ return true; // We can replay
}
bool cStreamDevice::SetPlayMode(ePlayMode PlayMode)
{
- fprintf(stderr, "[ffnetdev] Device: Setting playmode(not implemented). Mode: %d\n",PlayMode);
- cOSDWorker::SendPlayMode(PlayMode);
- m_Remux->ClearInput();
- m_Remux->ClearOutput();
- m_Remux->PlayModeChange();
- return true;
+ dsyslog("[ffnetdev] Device: Setting playmode. Mode: %d\n",PlayMode);
+ cOSDWorker::SendPlayMode(PlayMode);
+ m_Remux->ClearInput();
+ m_Remux->ClearOutput();
+ m_Remux->PlayModeChange();
+ return true;
}
void cStreamDevice::TrickSpeed(int Speed)
{
- fprintf(stderr,"[ffnetdev] Device: Trickspeed(not implemented). Speed: %d\n", Speed);
- m_Remux->ClearInput();
- m_Remux->ClearOutput();
- m_Remux->PlayModeChange();
+ dsyslog("[ffnetdev] Device: Trickspeed(not implemented). Speed: %d\n", Speed);
+ m_Remux->ClearInput();
+ m_Remux->ClearOutput();
+ m_Remux->PlayModeChange();
}
void cStreamDevice::Clear(void)
{
- fprintf(stderr,"[ffnetdev] Device: Clear(not implemented).\n");
- m_Remux->ClearInput();
- m_Remux->ClearOutput();
- m_Remux->PlayModeChange();
+ dsyslog("[ffnetdev] Device: Clear(not implemented).\n");
+ m_Remux->ClearInput();
+ m_Remux->ClearOutput();
+ m_Remux->PlayModeChange();
// cDevice::Clear();
}
void cStreamDevice::Play(void)
{
- fprintf(stderr,"[ffnetdev] Device: Play(not implemented).\n");
+ dsyslog("[ffnetdev] Device: Play(not implemented).\n");
// cDevice::Play();
}
void cStreamDevice::Freeze(void)
{
- fprintf(stderr,"[ffnetdev] Device: Freeze(not implemented).\n");
+ dsyslog("[ffnetdev] Device: Freeze(not implemented).\n");
// cDevice::Freeze();
}
void cStreamDevice::Mute(void)
{
- fprintf(stderr,"[ffnetdev] Device: Mute(not implemented).\n");
+ dsyslog("[ffnetdev] Device: Mute(not implemented).\n");
// cDevice::Mute();
}
void cStreamDevice::SetVolumeDevice(int Volume)
{
- fprintf (stderr, "[ffnetdev] Device: Setting volume to %d (not implemented).\n", Volume);
+ dsyslog("[ffnetdev] Device: Setting volume to %d (not implemented).\n", Volume);
}
void cStreamDevice::StillPicture(const uchar *Data, int Length)
{
- fprintf(stderr,"[ffnetdev] Device: StillPicture(not implemented).\n");
+ dsyslog("[ffnetdev] Device: StillPicture(not implemented).\n");
}
bool cStreamDevice::Poll(cPoller &Poller, int TimeoutMs)
{
- //fprintf(stderr,"[ffnetdev] Device: Poll TimeoutMs: %d ....\n",TimeoutMs);
- return true;
+ //dsyslog("[ffnetdev] Device: Poll TimeoutMs: %d ....\n",TimeoutMs);
+ return true;
}
/* ----------------------------------------------------------------------------
*/
@@ -120,7 +114,7 @@ int cStreamDevice::PlayAudio(const uchar *Data, int Length, uchar Id)
{
if (cTSWorker::HaveStreamClient())
{
- while ((m_Remux->Free() < Length) && cTSWorker::HaveStreamClient())
+ while ((m_Remux->InputFree() < Length) && cTSWorker::HaveStreamClient())
cCondWait::SleepMs(1);
int result=m_Remux->Put(Data, Length);
if (result!=Length) {
@@ -150,7 +144,7 @@ int cStreamDevice::PlayVideo(const uchar *Data, int Length)
if (cTSWorker::HaveStreamClient())
{
- while ((m_Remux->Free() < Length) && cTSWorker::HaveStreamClient())
+ while ((m_Remux->InputFree() < Length) && cTSWorker::HaveStreamClient())
cCondWait::SleepMs(1);
int result=m_Remux->Put(Data, Length);
if (result!=Length) {
diff --git a/streamdevice.h b/streamdevice.h
index 9bb2ff3..6cd3fa5 100644
--- a/streamdevice.h
+++ b/streamdevice.h
@@ -17,7 +17,7 @@
class cStreamDevice: public cDevice {
private:
- cPES2TSRemux *m_Remux;
+ cPESRemux *m_Remux;
protected:
public:
cStreamDevice(void);
@@ -47,6 +47,7 @@ public:
void Del(int Count) { m_Remux->DelOutput(Count); }
void ClearOutput() { m_Remux->ClearOutput(); }
int Available(void) { return m_Remux->Available(); }
+ int Free(void) { return m_Remux->Free(); }
};
#endif
diff --git a/tools/socket.c b/tools/socket.c
index 8bb7359..5e5c505 100644
--- a/tools/socket.c
+++ b/tools/socket.c
@@ -8,7 +8,8 @@
#include <errno.h>
#include <fcntl.h>
-#define UDP_TX_BUF_SIZE ((188*7+3)*20)
+#define UDP_TX_BUF_SIZE (188*348)
+//#define TCP_TX_BUF_SIZE (1024 * 30)
cTBSocket::cTBSocket(int Type) {
memset(&m_LocalAddr, 0, sizeof(m_LocalAddr));
@@ -21,7 +22,7 @@ cTBSocket::~cTBSocket() {
}
bool cTBSocket::OpenUDP(const std::string &Host, unsigned int Port) {
- int socket, tmp;
+ int socket;
struct sockaddr_in my_addr;
if (IsOpen()) Close();
@@ -36,16 +37,18 @@ bool cTBSocket::OpenUDP(const std::string &Host, unsigned int Port) {
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(Port);
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
-/* tmp = 1;
- if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, sizeof(tmp)) < 0)
- goto closefd;*/
-
+
/* limit the tx buf size to limit latency */
- tmp = UDP_TX_BUF_SIZE;
+#ifdef UDP_TX_BUF_SIZE
+ int tmp = UDP_TX_BUF_SIZE;
if (setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char*)&tmp, sizeof(tmp)) < 0)
goto closefd;
+#endif
+/* tmp = 1;
+ if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, sizeof(tmp)) < 0)
+ goto closefd;*/
+
/* the bind is needed to give a port to the socket now */
/* if (bind(socket,(struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
goto closefd;*/
@@ -108,6 +111,13 @@ bool cTBSocket::Listen(const std::string &Ip, unsigned int Port, int BackLog) {
if ((socket = ::socket(PF_INET, m_Type, IPPROTO_IP)) == -1)
return false;
+ /* limit the tx buf size to limit latency */
+#ifdef TCP_TX_BUF_SIZE
+ int tmp = TCP_TX_BUF_SIZE;
+ if (setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char*)&tmp, sizeof(tmp)) < 0)
+ goto closefd;
+#endif
+
val = 1;
if (::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1)
goto closefd; //return false;
diff --git a/tsworker.c b/tsworker.c
index cf10b52..756e222 100644
--- a/tsworker.c
+++ b/tsworker.c
@@ -4,7 +4,7 @@
* See the README file for copyright information and how to reach the author.
*
*/
-
+
#include <sys/time.h>
#include <vdr/tools.h>
@@ -17,9 +17,11 @@
#define TS_PACKET_SIZE (188)
#define UDP_PACKET_SIZE (TS_PACKET_SIZE * 7)
-#define UDP_MAX_BITRATE 7000000
+#define UDP_MAX_BITRATE 8112832
#define UDP_SEND_INTERVALL 1000
+#define TCP_SEND_SIZE (1024 * 10)
+// 8388608 = 8MBit
struct TSData
{
char packNr;
@@ -188,6 +190,7 @@ void cTSWorker::ActionTCP(void) {
m_StreamDevice->LockOutput();
uchar *buffer = m_StreamDevice->Get(count);
if (buffer!=NULL) {
+ count = (count > TCP_SEND_SIZE) ? TCP_SEND_SIZE : count;
int available = count;
int done = 0;
int written = 0;
@@ -225,15 +228,16 @@ void cTSWorker::ActionTCP(void) {
bytessend += count;
if (curtime.tv_sec > oldtime.tv_sec + 10)
{
- double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000
- - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000;
+ double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000.0)) / 1000
+ - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000.0)) / 1000;
double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024;
+ int bufstat = m_StreamDevice->Available() * 100 / (m_StreamDevice->Available() + m_StreamDevice->Free());
#ifdef DEBUG
- fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",
- rate, bytessend - oldbytessend);
+ fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send, %d%% Buffer used\n",
+ rate, bytessend - oldbytessend, bufstat);
#endif
- dsyslog("[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",
- rate, bytessend - oldbytessend);
+ dsyslog("[ffnetdev] Streamer: Rate %2.3f MBit/Sec, %d Bytes send, %d%% Buffer used\n",
+ rate, bytessend - oldbytessend, bufstat);
oldbytessend = bytessend;
oldtime = curtime;
@@ -337,12 +341,13 @@ void cTSWorker::ActionUDP(void)
int written = 0;
char data[100];
int rcvCount;
+ int sleepTime;
- rcvCount=m_StreamClient.Read(data, 10);
+ /* rcvCount=m_StreamClient.Read(data, 10);
if (rcvCount > 0)
{
isyslog("[ffnetdev] Streamer: empfangen:%d Bytes\n", rcvCount);
- }
+ }*/
if (oldPacketTime == 0)
oldPacketTime = get_time()- UDP_SEND_INTERVALL;
@@ -350,52 +355,21 @@ void cTSWorker::ActionUDP(void)
while ((available > 0) && (have_Streamclient == true) &&
(!close_Streamclient_request))
{
- while ((tsData.packsCount * TS_PACKET_SIZE < UDP_PACKET_SIZE) && (available > 0))
- {
- int moveCount = (available >= TS_PACKET_SIZE) ? TS_PACKET_SIZE : available;
- moveCount = (moveCount + restData >= TS_PACKET_SIZE) ? TS_PACKET_SIZE - restData : moveCount;
- memcpy(&tsData.data[(tsData.packsCount * TS_PACKET_SIZE) + restData], &buffer[done], moveCount);
- available -= moveCount;
- done += moveCount;
- if (restData + moveCount == TS_PACKET_SIZE)
- {
- char *data = &tsData.data[tsData.packsCount * TS_PACKET_SIZE];
- for (int i = 0; i < 4; i++)
- {
- tsData.tsHeaderCRC += (char)*(data + i);
- }
- restData = 0;
- tsData.packsCount ++;
- }
- else
- {
- restData = moveCount;
- continue;
- }
- }
-
- if (restData > 0)
- continue;
-
- while (get_time() < oldPacketTime + UDP_SEND_INTERVALL)
- cCondWait::SleepMs(1);
+ while ((sleepTime = oldPacketTime + UDP_SEND_INTERVALL - get_time()) > 0)
+ usleep(sleepTime);
if (toSend == 0)
toSend = (long)(UDP_MAX_BITRATE * (((double)get_time() - oldPacketTime) / 1000000) / 8);
- int sendcount = tsData.packsCount * TS_PACKET_SIZE + 3;
- if (toSend < sendcount)
- {
- toSend = 0;
- oldPacketTime = get_time();
- continue;
- }
+ int sendcount = (available > toSend) ? toSend : available;
+ sendcount = (sendcount > UDP_PACKET_SIZE) ? UDP_PACKET_SIZE : sendcount;
+
+ available -= sendcount;
- char* pTsData = (char*)&tsData;
while ((sendcount > 0) && (have_Streamclient == true) &&
(!close_Streamclient_request))
{
- if (((written=m_StreamClient.Write(pTsData, sendcount)) < 0) &&
+ if (((written=m_StreamClient.Write(&buffer[done], sendcount)) < 0) &&
(errno != EAGAIN))
{
isyslog("[ffnetdev] Streamer: Couldn't send data: %d %s Len:%d\n", errno, strerror(errno), sendcount);
@@ -404,22 +378,17 @@ void cTSWorker::ActionUDP(void)
if (written > 0)
{
+ done += written;
sendcount -= written;
- pTsData += written;
toSend -= written;
+ if (toSend == 0)
+ oldPacketTime = get_time();
}
else
{
cCondWait::SleepMs(5);
}
}
-
- if (sendcount == 0)
- {
- tsData.packsCount = 0;
- tsData.tsHeaderCRC = 0;
- tsData.packNr ++;
- }
}
m_StreamDevice->Del(count);
@@ -432,10 +401,10 @@ void cTSWorker::ActionUDP(void)
}
bytessend += count;
- if (curtime.tv_sec > oldtime.tv_sec + 10)
+ if (curtime.tv_sec > oldtime.tv_sec + 3)
{
- double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000
- - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000;
+ double secs = (curtime.tv_sec * 1000 + ((double)curtime.tv_usec / 1000.0)) / 1000
+ - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000.0)) / 1000;
double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024;
#ifdef DEBUG
fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",