diff options
-rw-r--r-- | pes2ts.c | 45 | ||||
-rw-r--r-- | pes2ts.h | 58 | ||||
-rw-r--r-- | streamdevice.c | 70 | ||||
-rw-r--r-- | streamdevice.h | 3 | ||||
-rw-r--r-- | tools/socket.c | 26 | ||||
-rw-r--r-- | tsworker.c | 87 |
6 files changed, 158 insertions, 131 deletions
@@ -13,26 +13,44 @@ #include "pes2ts.h" -cPES2TSRemux::cPES2TSRemux(int VPid, int APid): +////////////////////////////////////////////////////////////////////////////// +cPESRemux::cPESRemux(int inputBufferSize, int outputBufferSize): + m_InputBuffer(new cRingBufferLinear(inputBufferSize, outputBufferSize)) +{ + OutputLocked = false; + m_InputBuffer->SetTimeouts(0, 1000); // IMPORTANT to avoid busy wait in threads main loop and thus a high CPU load +} + +cPESRemux::~cPESRemux() +{ + delete m_InputBuffer; +} + +int cPESRemux::Put(const uchar *Data, int Count) +{ + InputMutex.Lock(); + int result = m_InputBuffer->Put(Data, Count); + InputMutex.Unlock(); + return ( result); +} + + +////////////////////////////////////////////////////////////////////////////// +cPES2TSRemux::cPES2TSRemux(int VPid, int APid): cPESRemux(INPUTBUFSIZE, IPACKS), cThread("[ffnetdev] PES2TS remux"), m_OutputBuffer(new cRingBufferLinear(OUTPUTBUFSIZE, TS_SIZE * 2)), - m_InputBuffer(new cRingBufferLinear(INPUTBUFSIZE, IPACKS)), m_Active(false), m_PlayModeChanged(false) { vpid = VPid; apid = APid; - m_InputBuffer->SetTimeouts(0, 1000); // IMPORTANT to avoid busy wait in threads main loop and thus a high CPU load Start(); - OutputLocked = false; } cPES2TSRemux::~cPES2TSRemux() { m_Active = false; - delete m_InputBuffer; delete m_OutputBuffer; - } void cPES2TSRemux::Action(void) @@ -57,8 +75,8 @@ void cPES2TSRemux::Action(void) if (m_PlayModeChanged) { - cCondWait::SleepMs(1500); - m_PlayModeChanged = false; + cCondWait::SleepMs(1500); + m_PlayModeChanged = false; } if (m_InputBuffer->Available() < (int)IPACKS*10) { @@ -199,12 +217,11 @@ void cPES2TSRemux::Action(void) } -int cPES2TSRemux::Put(const uchar *Data, int Count) +////////////////////////////////////////////////////////////////////////////// +cPES2PESRemux::cPES2PESRemux(): cPESRemux(INPUTBUFSIZE + OUTPUTBUFSIZE, IPACKS) { - InputMutex.Lock(); - int result = m_InputBuffer->Put(Data, Count); - InputMutex.Unlock(); - return ( result); } - +cPES2PESRemux::~cPES2PESRemux() +{ +} @@ -16,15 +16,40 @@ #define TS_SIZE 188 #define IPACKS 2048 -class cPES2TSRemux: public cThread { +class cPESRemux { +private: + bool OutputLocked; + +protected: + cRingBufferLinear *m_InputBuffer; + cMutex InputMutex; + +public: + cPESRemux(int inputBufferSize, int outputBufferSize); + virtual ~cPESRemux(); + + int Put(const uchar *Data, int Count); + void DelInput (int Count) { InputMutex.Lock(); m_InputBuffer ->Del(Count); InputMutex.Unlock(); } + void ClearInput () { InputMutex.Lock(); m_InputBuffer ->Clear(); InputMutex.Unlock(); } + + virtual int Available(void) = 0; + virtual int Free(void) = 0; + virtual int InputFree(void) = 0; + virtual uchar *Get(int &Count) = 0; + virtual void DelOutput(int Count) = 0; + virtual void ClearOutput() = 0; + void LockOutput() { while (OutputLocked) cCondWait::SleepMs(1); OutputLocked = true; } + void UnlockOutput() { OutputLocked = false; } + virtual void PlayModeChange() {}; + +}; + +class cPES2TSRemux: public cPESRemux, cThread { private: cRingBufferLinear *m_OutputBuffer; - cRingBufferLinear *m_InputBuffer; bool m_Active; unsigned short vpid; unsigned short apid; - bool OutputLocked; - cMutex InputMutex; bool m_PlayModeChanged; protected: @@ -34,17 +59,28 @@ public: cPES2TSRemux(int VPid, int APid); virtual ~cPES2TSRemux(); - int Free(void) { return m_InputBuffer->Free(); } - int Available(void) { return m_OutputBuffer->Available(); } - int Put(const uchar *Data, int Count); + int Available(void) { return m_OutputBuffer->Available(); } + int Free(void) { return m_OutputBuffer->Free(); } + int InputFree(void) { return m_InputBuffer->Free(); } uchar *Get(int &Count) { return m_OutputBuffer->Get(Count); } void DelOutput(int Count) { m_OutputBuffer->Del(Count); } - void DelInput (int Count) { InputMutex.Lock(); m_InputBuffer ->Del(Count); InputMutex.Unlock(); } void ClearOutput() { LockOutput(); m_OutputBuffer->Clear(); UnlockOutput(); } - void ClearInput () { InputMutex.Lock(); m_InputBuffer ->Clear(); InputMutex.Unlock(); } - void LockOutput() { while (OutputLocked) cCondWait::SleepMs(1); OutputLocked = true; } - void UnlockOutput() { OutputLocked = false; } void PlayModeChange() { m_PlayModeChanged = true; } + +}; + +class cPES2PESRemux: public cPESRemux { +public: + cPES2PESRemux(); + virtual ~cPES2PESRemux(); + + int Available(void) { return m_InputBuffer->Available(); } + int Free(void) { return m_InputBuffer->Free(); } + int InputFree(void) { return m_InputBuffer->Free(); } + uchar *Get(int &Count) { return m_InputBuffer->Get(Count); } + void DelOutput(int Count) { m_InputBuffer->Del(Count); } + void ClearOutput() { LockOutput(); m_InputBuffer->Clear(); UnlockOutput(); } + }; #endif // PES2TSREMUX_H diff --git a/streamdevice.c b/streamdevice.c index 354bad5..7380e9e 100644 --- a/streamdevice.c +++ b/streamdevice.c @@ -12,103 +12,97 @@ cStreamDevice::cStreamDevice(void) { -#ifdef DEBUG - fprintf(stderr,"[ffnetdev] Device: Constructor cStreamDevice \n"); -#endif - m_Remux = new cPES2TSRemux(TS_VPID, TS_APID); - + dsyslog("[ffnetdev] Device: Constructor cStreamDevice \n"); + //m_Remux = new cPES2TSRemux(TS_VPID, TS_APID); + m_Remux = new cPES2PESRemux(); } cStreamDevice::~cStreamDevice(void) { -#ifdef DEBUG - fprintf(stderr,"[ffnetdev] Device: Destructor cStreamDevice \n"); -#endif - DELETENULL(m_Remux); + dsyslog("[ffnetdev] Device: Destructor cStreamDevice \n"); + DELETENULL(m_Remux); } void cStreamDevice::MakePrimaryDevice(bool On) { -#ifdef DEBUG - fprintf(stderr,"[ffnetdev] Device: ffnetdev becomes primary device. Registering our OSD provider...\n"); -#endif + dsyslog("[ffnetdev] Device: ffnetdev becomes primary device. Registering our OSD provider...\n"); new cNetOSDProvider(); } int cStreamDevice::ProvidesCa(const cChannel *Channel) const { - return 0; + return 0; } bool cStreamDevice::HasDecoder(void) const { - return true; // We can decode MPEG2 + return true; // We can decode MPEG2 } bool cStreamDevice::CanReplay(void) const { - return true; // We can replay + return true; // We can replay } bool cStreamDevice::SetPlayMode(ePlayMode PlayMode) { - fprintf(stderr, "[ffnetdev] Device: Setting playmode(not implemented). Mode: %d\n",PlayMode); - cOSDWorker::SendPlayMode(PlayMode); - m_Remux->ClearInput(); - m_Remux->ClearOutput(); - m_Remux->PlayModeChange(); - return true; + dsyslog("[ffnetdev] Device: Setting playmode. Mode: %d\n",PlayMode); + cOSDWorker::SendPlayMode(PlayMode); + m_Remux->ClearInput(); + m_Remux->ClearOutput(); + m_Remux->PlayModeChange(); + return true; } void cStreamDevice::TrickSpeed(int Speed) { - fprintf(stderr,"[ffnetdev] Device: Trickspeed(not implemented). Speed: %d\n", Speed); - m_Remux->ClearInput(); - m_Remux->ClearOutput(); - m_Remux->PlayModeChange(); + dsyslog("[ffnetdev] Device: Trickspeed(not implemented). Speed: %d\n", Speed); + m_Remux->ClearInput(); + m_Remux->ClearOutput(); + m_Remux->PlayModeChange(); } void cStreamDevice::Clear(void) { - fprintf(stderr,"[ffnetdev] Device: Clear(not implemented).\n"); - m_Remux->ClearInput(); - m_Remux->ClearOutput(); - m_Remux->PlayModeChange(); + dsyslog("[ffnetdev] Device: Clear(not implemented).\n"); + m_Remux->ClearInput(); + m_Remux->ClearOutput(); + m_Remux->PlayModeChange(); // cDevice::Clear(); } void cStreamDevice::Play(void) { - fprintf(stderr,"[ffnetdev] Device: Play(not implemented).\n"); + dsyslog("[ffnetdev] Device: Play(not implemented).\n"); // cDevice::Play(); } void cStreamDevice::Freeze(void) { - fprintf(stderr,"[ffnetdev] Device: Freeze(not implemented).\n"); + dsyslog("[ffnetdev] Device: Freeze(not implemented).\n"); // cDevice::Freeze(); } void cStreamDevice::Mute(void) { - fprintf(stderr,"[ffnetdev] Device: Mute(not implemented).\n"); + dsyslog("[ffnetdev] Device: Mute(not implemented).\n"); // cDevice::Mute(); } void cStreamDevice::SetVolumeDevice(int Volume) { - fprintf (stderr, "[ffnetdev] Device: Setting volume to %d (not implemented).\n", Volume); + dsyslog("[ffnetdev] Device: Setting volume to %d (not implemented).\n", Volume); } void cStreamDevice::StillPicture(const uchar *Data, int Length) { - fprintf(stderr,"[ffnetdev] Device: StillPicture(not implemented).\n"); + dsyslog("[ffnetdev] Device: StillPicture(not implemented).\n"); } bool cStreamDevice::Poll(cPoller &Poller, int TimeoutMs) { - //fprintf(stderr,"[ffnetdev] Device: Poll TimeoutMs: %d ....\n",TimeoutMs); - return true; + //dsyslog("[ffnetdev] Device: Poll TimeoutMs: %d ....\n",TimeoutMs); + return true; } /* ---------------------------------------------------------------------------- */ @@ -120,7 +114,7 @@ int cStreamDevice::PlayAudio(const uchar *Data, int Length, uchar Id) { if (cTSWorker::HaveStreamClient()) { - while ((m_Remux->Free() < Length) && cTSWorker::HaveStreamClient()) + while ((m_Remux->InputFree() < Length) && cTSWorker::HaveStreamClient()) cCondWait::SleepMs(1); int result=m_Remux->Put(Data, Length); if (result!=Length) { @@ -150,7 +144,7 @@ int cStreamDevice::PlayVideo(const uchar *Data, int Length) if (cTSWorker::HaveStreamClient()) { - while ((m_Remux->Free() < Length) && cTSWorker::HaveStreamClient()) + while ((m_Remux->InputFree() < Length) && cTSWorker::HaveStreamClient()) cCondWait::SleepMs(1); int result=m_Remux->Put(Data, Length); if (result!=Length) { diff --git a/streamdevice.h b/streamdevice.h index 9bb2ff3..6cd3fa5 100644 --- a/streamdevice.h +++ b/streamdevice.h @@ -17,7 +17,7 @@ class cStreamDevice: public cDevice { private: - cPES2TSRemux *m_Remux; + cPESRemux *m_Remux; protected: public: cStreamDevice(void); @@ -47,6 +47,7 @@ public: void Del(int Count) { m_Remux->DelOutput(Count); } void ClearOutput() { m_Remux->ClearOutput(); } int Available(void) { return m_Remux->Available(); } + int Free(void) { return m_Remux->Free(); } }; #endif diff --git a/tools/socket.c b/tools/socket.c index 8bb7359..5e5c505 100644 --- a/tools/socket.c +++ b/tools/socket.c @@ -8,7 +8,8 @@ #include <errno.h> #include <fcntl.h> -#define UDP_TX_BUF_SIZE ((188*7+3)*20) +#define UDP_TX_BUF_SIZE (188*348) +//#define TCP_TX_BUF_SIZE (1024 * 30) cTBSocket::cTBSocket(int Type) { memset(&m_LocalAddr, 0, sizeof(m_LocalAddr)); @@ -21,7 +22,7 @@ cTBSocket::~cTBSocket() { } bool cTBSocket::OpenUDP(const std::string &Host, unsigned int Port) { - int socket, tmp; + int socket; struct sockaddr_in my_addr; if (IsOpen()) Close(); @@ -36,16 +37,18 @@ bool cTBSocket::OpenUDP(const std::string &Host, unsigned int Port) { my_addr.sin_family = AF_INET; my_addr.sin_port = htons(Port); my_addr.sin_addr.s_addr = htonl(INADDR_ANY); - -/* tmp = 1; - if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, sizeof(tmp)) < 0) - goto closefd;*/ - + /* limit the tx buf size to limit latency */ - tmp = UDP_TX_BUF_SIZE; +#ifdef UDP_TX_BUF_SIZE + int tmp = UDP_TX_BUF_SIZE; if (setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char*)&tmp, sizeof(tmp)) < 0) goto closefd; +#endif +/* tmp = 1; + if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, sizeof(tmp)) < 0) + goto closefd;*/ + /* the bind is needed to give a port to the socket now */ /* if (bind(socket,(struct sockaddr *)&my_addr, sizeof(my_addr)) < 0) goto closefd;*/ @@ -108,6 +111,13 @@ bool cTBSocket::Listen(const std::string &Ip, unsigned int Port, int BackLog) { if ((socket = ::socket(PF_INET, m_Type, IPPROTO_IP)) == -1) return false; + /* limit the tx buf size to limit latency */ +#ifdef TCP_TX_BUF_SIZE + int tmp = TCP_TX_BUF_SIZE; + if (setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char*)&tmp, sizeof(tmp)) < 0) + goto closefd; +#endif + val = 1; if (::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) goto closefd; //return false; @@ -4,7 +4,7 @@ * See the README file for copyright information and how to reach the author. * */ - + #include <sys/time.h> #include <vdr/tools.h> @@ -17,9 +17,11 @@ #define TS_PACKET_SIZE (188) #define UDP_PACKET_SIZE (TS_PACKET_SIZE * 7) -#define UDP_MAX_BITRATE 7000000 +#define UDP_MAX_BITRATE 8112832 #define UDP_SEND_INTERVALL 1000 +#define TCP_SEND_SIZE (1024 * 10) +// 8388608 = 8MBit struct TSData { char packNr; @@ -188,6 +190,7 @@ void cTSWorker::ActionTCP(void) { m_StreamDevice->LockOutput(); uchar *buffer = m_StreamDevice->Get(count); if (buffer!=NULL) { + count = (count > TCP_SEND_SIZE) ? TCP_SEND_SIZE : count; int available = count; int done = 0; int written = 0; @@ -225,15 +228,16 @@ void cTSWorker::ActionTCP(void) { bytessend += count; if (curtime.tv_sec > oldtime.tv_sec + 10) { - double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000 - - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000; + double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000.0)) / 1000 + - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000.0)) / 1000; double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024; + int bufstat = m_StreamDevice->Available() * 100 / (m_StreamDevice->Available() + m_StreamDevice->Free()); #ifdef DEBUG - fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", - rate, bytessend - oldbytessend); + fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send, %d%% Buffer used\n", + rate, bytessend - oldbytessend, bufstat); #endif - dsyslog("[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", - rate, bytessend - oldbytessend); + dsyslog("[ffnetdev] Streamer: Rate %2.3f MBit/Sec, %d Bytes send, %d%% Buffer used\n", + rate, bytessend - oldbytessend, bufstat); oldbytessend = bytessend; oldtime = curtime; @@ -337,12 +341,13 @@ void cTSWorker::ActionUDP(void) int written = 0; char data[100]; int rcvCount; + int sleepTime; - rcvCount=m_StreamClient.Read(data, 10); + /* rcvCount=m_StreamClient.Read(data, 10); if (rcvCount > 0) { isyslog("[ffnetdev] Streamer: empfangen:%d Bytes\n", rcvCount); - } + }*/ if (oldPacketTime == 0) oldPacketTime = get_time()- UDP_SEND_INTERVALL; @@ -350,52 +355,21 @@ void cTSWorker::ActionUDP(void) while ((available > 0) && (have_Streamclient == true) && (!close_Streamclient_request)) { - while ((tsData.packsCount * TS_PACKET_SIZE < UDP_PACKET_SIZE) && (available > 0)) - { - int moveCount = (available >= TS_PACKET_SIZE) ? TS_PACKET_SIZE : available; - moveCount = (moveCount + restData >= TS_PACKET_SIZE) ? TS_PACKET_SIZE - restData : moveCount; - memcpy(&tsData.data[(tsData.packsCount * TS_PACKET_SIZE) + restData], &buffer[done], moveCount); - available -= moveCount; - done += moveCount; - if (restData + moveCount == TS_PACKET_SIZE) - { - char *data = &tsData.data[tsData.packsCount * TS_PACKET_SIZE]; - for (int i = 0; i < 4; i++) - { - tsData.tsHeaderCRC += (char)*(data + i); - } - restData = 0; - tsData.packsCount ++; - } - else - { - restData = moveCount; - continue; - } - } - - if (restData > 0) - continue; - - while (get_time() < oldPacketTime + UDP_SEND_INTERVALL) - cCondWait::SleepMs(1); + while ((sleepTime = oldPacketTime + UDP_SEND_INTERVALL - get_time()) > 0) + usleep(sleepTime); if (toSend == 0) toSend = (long)(UDP_MAX_BITRATE * (((double)get_time() - oldPacketTime) / 1000000) / 8); - int sendcount = tsData.packsCount * TS_PACKET_SIZE + 3; - if (toSend < sendcount) - { - toSend = 0; - oldPacketTime = get_time(); - continue; - } + int sendcount = (available > toSend) ? toSend : available; + sendcount = (sendcount > UDP_PACKET_SIZE) ? UDP_PACKET_SIZE : sendcount; + + available -= sendcount; - char* pTsData = (char*)&tsData; while ((sendcount > 0) && (have_Streamclient == true) && (!close_Streamclient_request)) { - if (((written=m_StreamClient.Write(pTsData, sendcount)) < 0) && + if (((written=m_StreamClient.Write(&buffer[done], sendcount)) < 0) && (errno != EAGAIN)) { isyslog("[ffnetdev] Streamer: Couldn't send data: %d %s Len:%d\n", errno, strerror(errno), sendcount); @@ -404,22 +378,17 @@ void cTSWorker::ActionUDP(void) if (written > 0) { + done += written; sendcount -= written; - pTsData += written; toSend -= written; + if (toSend == 0) + oldPacketTime = get_time(); } else { cCondWait::SleepMs(5); } } - - if (sendcount == 0) - { - tsData.packsCount = 0; - tsData.tsHeaderCRC = 0; - tsData.packNr ++; - } } m_StreamDevice->Del(count); @@ -432,10 +401,10 @@ void cTSWorker::ActionUDP(void) } bytessend += count; - if (curtime.tv_sec > oldtime.tv_sec + 10) + if (curtime.tv_sec > oldtime.tv_sec + 3) { - double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000 - - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000; + double secs = (curtime.tv_sec * 1000 + ((double)curtime.tv_usec / 1000.0)) / 1000 + - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000.0)) / 1000; double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024; #ifdef DEBUG fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", |