diff options
author | zwer <zwer@1f4bef6d-8e0a-0410-8695-e467da8aaccf> | 2007-01-06 22:04:00 +0000 |
---|---|---|
committer | zwer <zwer@1f4bef6d-8e0a-0410-8695-e467da8aaccf> | 2007-01-06 22:04:00 +0000 |
commit | c6b150c7a364c5cb9d5149a92c81f854eb71d6d4 (patch) | |
tree | 023c9451d073c84056496a2bf5cb3df75d13cba0 /tsworker.c | |
parent | a1dc189c5334990a62f7b15bfa45071eec1d6db9 (diff) | |
download | vdr-plugin-ffnetdev-c6b150c7a364c5cb9d5149a92c81f854eb71d6d4.tar.gz vdr-plugin-ffnetdev-c6b150c7a364c5cb9d5149a92c81f854eb71d6d4.tar.bz2 |
- tests mit UDP-Streaming
git-svn-id: svn://svn.berlios.de/ffnetdev/trunk@24 1f4bef6d-8e0a-0410-8695-e467da8aaccf
Diffstat (limited to 'tsworker.c')
-rw-r--r-- | tsworker.c | 265 |
1 files changed, 249 insertions, 16 deletions
@@ -15,7 +15,18 @@ #include "tsworker.h" #include "config.h" -#define MINSENDBYTES KILOBYTE(500) +#define TS_PACKET_SIZE (188) +#define UDP_PACKET_SIZE (TS_PACKET_SIZE * 7) +#define UDP_MAX_BITRATE 7000000 +#define UDP_SEND_INTERVALL 1000 + +struct TSData +{ + char packNr; + char packsCount; + char tsHeaderCRC; + char data[UDP_PACKET_SIZE]; +}; ////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -25,7 +36,7 @@ cTSWorker *cTSWorker::m_Instance = NULL; cTSWorker::cTSWorker(void) : cThread("[ffnetdev] TS streamer") { - m_Active = false; + m_Active = false; m_StreamClient = NULL; origPrimaryDevice = -1; @@ -42,6 +53,7 @@ void cTSWorker::Init(cStreamDevice *StreamDevice, int tsport, cPluginFFNetDev *p m_Instance->TSPort = tsport; m_Instance->Start(); m_Instance->m_pPlugin = pPlugin; + m_Instance->close_Streamclient_request = false; } } @@ -65,8 +77,13 @@ void cTSWorker::CloseStreamClient(void) { } void cTSWorker::Action(void) { + ActionTCP(); + //ActionUDP(); +} + + +void cTSWorker::ActionTCP(void) { cTBSelect select; - //cTBSocket m_StreamListen(SOCK_DGRAM); cTBSocket m_StreamListen; struct timeval oldtime; long bytessend = 0; @@ -181,18 +198,17 @@ void cTSWorker::Action(void) { if (((written=m_StreamClient->Write(&buffer[done], available)) < 0) && (errno != EAGAIN)) { - CloseStreamClient(); + CloseStreamClient(); } if (written > 0) { - - available -= written; - done += written; + available -= written; + done += written; } else { - cCondWait::SleepMs(5); + cCondWait::SleepMs(5); } } m_StreamDevice->Del(count); @@ -209,17 +225,18 @@ void cTSWorker::Action(void) { bytessend += count; if (curtime.tv_sec > oldtime.tv_sec + 10) { - double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000 + double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000 - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000; + double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024; #ifdef DEBUG - fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %d Byte/Sec, %d Bytes send\n", - (int)((bytessend - oldbytessend) / secs), bytessend - oldbytessend); + fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", + rate, bytessend - oldbytessend); #endif - dsyslog("[ffnetdev] Streamer: current TransferRate %d Byte/Sec, %d Bytes send\n", - (int)((bytessend - oldbytessend) / secs), bytessend - oldbytessend); - - oldbytessend = bytessend; - oldtime = curtime; + dsyslog("[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", + rate, bytessend - oldbytessend); + + oldbytessend = bytessend; + oldtime = curtime; } } m_StreamDevice->UnlockOutput(); @@ -246,3 +263,219 @@ void cTSWorker::Action(void) { } + +void cTSWorker::ActionUDP(void) +{ + cTBSocket m_StreamClient(SOCK_DGRAM); + struct timeval oldtime, curtime; + u64 oldPacketTime = 0; + long bytessend = 0; + long oldbytessend = 0; + long toSend = 0; + int restData = 0; + TSData tsData; + + const char* StreamIp = "192.168.0.61"; + uint StreamPort = TSPort; + + m_Active = true; + have_Streamclient = true; + + if (!m_StreamClient.OpenUDP(StreamIp, StreamPort)) + { + isyslog("[ffnetdev] Streamer: Couldn't create UDP-Socket: %s", strerror(errno)); + m_Active = false; + } + else + isyslog("[ffnetdev] Streamer: UDP-Socket create successful"); + + gettimeofday(&oldtime, 0); + tsData.packNr = 0; + tsData.packsCount = 0; + tsData.tsHeaderCRC = 0; + + while (m_Active) + { + /* Check for closed streaming client connection */ + if (have_Streamclient==true) + { + if (close_Streamclient_request==true) + { + close_Streamclient_request = false; + have_Streamclient = false; + + m_pPlugin->RestorePrimaryDevice(); + + if ( m_StreamClient.Close() ) + { +#ifdef DEBUG + fprintf(stderr, "[ffnetdev] Streamer: Client socket closed successfully.\n"); +#endif + isyslog("[ffnetdev] Streamer: Connection closed: client %s:%d", + m_StreamClient.RemoteIp().c_str(), m_StreamClient.RemotePort()); + } + else + { +#ifdef DEBUG + fprintf(stderr, "[ffnetdev] Streamer: Error closing client socket.\n"); +#endif + esyslog("[ffnetdev] Streamer: Error closing connection."); + m_Active=false; + continue; + } + + } + + int count=0; + + m_StreamDevice->LockOutput(); + uchar *buffer = m_StreamDevice->Get(count); + if (buffer!=NULL) + { + int available = count; + int done = 0; + int written = 0; + char data[100]; + int rcvCount; + + rcvCount=m_StreamClient.Read(data, 10); + if (rcvCount > 0) + { + isyslog("[ffnetdev] Streamer: empfangen:%d Bytes\n", rcvCount); + } + + if (oldPacketTime == 0) + oldPacketTime = get_time()- UDP_SEND_INTERVALL; + + while ((available > 0) && (have_Streamclient == true) && + (!close_Streamclient_request)) + { + while ((tsData.packsCount * TS_PACKET_SIZE < UDP_PACKET_SIZE) && (available > 0)) + { + int moveCount = (available >= TS_PACKET_SIZE) ? TS_PACKET_SIZE : available; + moveCount = (moveCount + restData >= TS_PACKET_SIZE) ? TS_PACKET_SIZE - restData : moveCount; + memcpy(&tsData.data[(tsData.packsCount * TS_PACKET_SIZE) + restData], &buffer[done], moveCount); + available -= moveCount; + done += moveCount; + if (restData + moveCount == TS_PACKET_SIZE) + { + char *data = &tsData.data[tsData.packsCount * TS_PACKET_SIZE]; + for (int i = 0; i < 4; i++) + { + tsData.tsHeaderCRC += (char)*(data + i); + } + restData = 0; + tsData.packsCount ++; + } + else + { + restData = moveCount; + continue; + } + } + + if (restData > 0) + continue; + + while (get_time() < oldPacketTime + UDP_SEND_INTERVALL) + cCondWait::SleepMs(1); + + if (toSend == 0) + toSend = (long)(UDP_MAX_BITRATE * (((double)get_time() - oldPacketTime) / 1000000) / 8); + + int sendcount = tsData.packsCount * TS_PACKET_SIZE + 3; + if (toSend < sendcount) + { + toSend = 0; + oldPacketTime = get_time(); + continue; + } + + char* pTsData = (char*)&tsData; + while ((sendcount > 0) && (have_Streamclient == true) && + (!close_Streamclient_request)) + { + if (((written=m_StreamClient.Write(pTsData, sendcount)) < 0) && + (errno != EAGAIN)) + { + isyslog("[ffnetdev] Streamer: Couldn't send data: %d %s Len:%d\n", errno, strerror(errno), sendcount); + CloseStreamClient(); + } + + if (written > 0) + { + sendcount -= written; + pTsData += written; + toSend -= written; + } + else + { + cCondWait::SleepMs(5); + } + } + + if (sendcount == 0) + { + tsData.packsCount = 0; + tsData.tsHeaderCRC = 0; + tsData.packNr ++; + } + } + m_StreamDevice->Del(count); + + gettimeofday(&curtime, 0); + if (oldtime.tv_sec == 0) + { + oldtime = curtime; + bytessend = 0; + oldbytessend = 0; + } + + bytessend += count; + if (curtime.tv_sec > oldtime.tv_sec + 10) + { + double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000 + - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000; + double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024; +#ifdef DEBUG + fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", + rate, bytessend - oldbytessend); +#endif + dsyslog("[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n", + rate, bytessend - oldbytessend); + + oldbytessend = bytessend; + oldtime = curtime; + } + } + m_StreamDevice->UnlockOutput(); + + } + else + { + /* simply discard all data in ringbuffer */ + int count=0; + if ( (m_StreamDevice->Get(count)) !=NULL ) + { + m_StreamDevice->Del(count); +#ifdef DEBUG + fprintf (stderr, "[ffnetdev] Streamer: Bytes not sent, but deleted from ringbuffer: %d\n",count); +#endif + dsyslog("[ffnetdev] Streamer: Bytes not sent, but deleted from ringbuffer: %d\n",count); + } + } + cCondWait::SleepMs(3); + + } // while(m_Active) + +} + +/* Returns time since 1970 in microseconds */ +u64 cTSWorker::get_time(void) +{ + struct timeval tv; + struct timezone tz={0,0}; + + gettimeofday(&tv,&tz); + return ((u64)tv.tv_sec)*1000000+((u64)tv.tv_usec); +} |