summaryrefslogtreecommitdiff
path: root/tsworker.c
diff options
context:
space:
mode:
authorzwer <zwer@1f4bef6d-8e0a-0410-8695-e467da8aaccf>2007-01-06 22:04:00 +0000
committerzwer <zwer@1f4bef6d-8e0a-0410-8695-e467da8aaccf>2007-01-06 22:04:00 +0000
commitc6b150c7a364c5cb9d5149a92c81f854eb71d6d4 (patch)
tree023c9451d073c84056496a2bf5cb3df75d13cba0 /tsworker.c
parenta1dc189c5334990a62f7b15bfa45071eec1d6db9 (diff)
downloadvdr-plugin-ffnetdev-c6b150c7a364c5cb9d5149a92c81f854eb71d6d4.tar.gz
vdr-plugin-ffnetdev-c6b150c7a364c5cb9d5149a92c81f854eb71d6d4.tar.bz2
- tests mit UDP-Streaming
git-svn-id: svn://svn.berlios.de/ffnetdev/trunk@24 1f4bef6d-8e0a-0410-8695-e467da8aaccf
Diffstat (limited to 'tsworker.c')
-rw-r--r--tsworker.c265
1 files changed, 249 insertions, 16 deletions
diff --git a/tsworker.c b/tsworker.c
index 98e893e..cf10b52 100644
--- a/tsworker.c
+++ b/tsworker.c
@@ -15,7 +15,18 @@
#include "tsworker.h"
#include "config.h"
-#define MINSENDBYTES KILOBYTE(500)
+#define TS_PACKET_SIZE (188)
+#define UDP_PACKET_SIZE (TS_PACKET_SIZE * 7)
+#define UDP_MAX_BITRATE 7000000
+#define UDP_SEND_INTERVALL 1000
+
+struct TSData
+{
+ char packNr;
+ char packsCount;
+ char tsHeaderCRC;
+ char data[UDP_PACKET_SIZE];
+};
//////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -25,7 +36,7 @@ cTSWorker *cTSWorker::m_Instance = NULL;
cTSWorker::cTSWorker(void)
: cThread("[ffnetdev] TS streamer")
{
- m_Active = false;
+ m_Active = false;
m_StreamClient = NULL;
origPrimaryDevice = -1;
@@ -42,6 +53,7 @@ void cTSWorker::Init(cStreamDevice *StreamDevice, int tsport, cPluginFFNetDev *p
m_Instance->TSPort = tsport;
m_Instance->Start();
m_Instance->m_pPlugin = pPlugin;
+ m_Instance->close_Streamclient_request = false;
}
}
@@ -65,8 +77,13 @@ void cTSWorker::CloseStreamClient(void) {
}
void cTSWorker::Action(void) {
+ ActionTCP();
+ //ActionUDP();
+}
+
+
+void cTSWorker::ActionTCP(void) {
cTBSelect select;
- //cTBSocket m_StreamListen(SOCK_DGRAM);
cTBSocket m_StreamListen;
struct timeval oldtime;
long bytessend = 0;
@@ -181,18 +198,17 @@ void cTSWorker::Action(void) {
if (((written=m_StreamClient->Write(&buffer[done], available)) < 0) &&
(errno != EAGAIN))
{
- CloseStreamClient();
+ CloseStreamClient();
}
if (written > 0)
{
-
- available -= written;
- done += written;
+ available -= written;
+ done += written;
}
else
{
- cCondWait::SleepMs(5);
+ cCondWait::SleepMs(5);
}
}
m_StreamDevice->Del(count);
@@ -209,17 +225,18 @@ void cTSWorker::Action(void) {
bytessend += count;
if (curtime.tv_sec > oldtime.tv_sec + 10)
{
- double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000
+ double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000
- (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000;
+ double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024;
#ifdef DEBUG
- fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %d Byte/Sec, %d Bytes send\n",
- (int)((bytessend - oldbytessend) / secs), bytessend - oldbytessend);
+ fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",
+ rate, bytessend - oldbytessend);
#endif
- dsyslog("[ffnetdev] Streamer: current TransferRate %d Byte/Sec, %d Bytes send\n",
- (int)((bytessend - oldbytessend) / secs), bytessend - oldbytessend);
-
- oldbytessend = bytessend;
- oldtime = curtime;
+ dsyslog("[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",
+ rate, bytessend - oldbytessend);
+
+ oldbytessend = bytessend;
+ oldtime = curtime;
}
}
m_StreamDevice->UnlockOutput();
@@ -246,3 +263,219 @@ void cTSWorker::Action(void) {
}
+
+void cTSWorker::ActionUDP(void)
+{
+ cTBSocket m_StreamClient(SOCK_DGRAM);
+ struct timeval oldtime, curtime;
+ u64 oldPacketTime = 0;
+ long bytessend = 0;
+ long oldbytessend = 0;
+ long toSend = 0;
+ int restData = 0;
+ TSData tsData;
+
+ const char* StreamIp = "192.168.0.61";
+ uint StreamPort = TSPort;
+
+ m_Active = true;
+ have_Streamclient = true;
+
+ if (!m_StreamClient.OpenUDP(StreamIp, StreamPort))
+ {
+ isyslog("[ffnetdev] Streamer: Couldn't create UDP-Socket: %s", strerror(errno));
+ m_Active = false;
+ }
+ else
+ isyslog("[ffnetdev] Streamer: UDP-Socket create successful");
+
+ gettimeofday(&oldtime, 0);
+ tsData.packNr = 0;
+ tsData.packsCount = 0;
+ tsData.tsHeaderCRC = 0;
+
+ while (m_Active)
+ {
+ /* Check for closed streaming client connection */
+ if (have_Streamclient==true)
+ {
+ if (close_Streamclient_request==true)
+ {
+ close_Streamclient_request = false;
+ have_Streamclient = false;
+
+ m_pPlugin->RestorePrimaryDevice();
+
+ if ( m_StreamClient.Close() )
+ {
+#ifdef DEBUG
+ fprintf(stderr, "[ffnetdev] Streamer: Client socket closed successfully.\n");
+#endif
+ isyslog("[ffnetdev] Streamer: Connection closed: client %s:%d",
+ m_StreamClient.RemoteIp().c_str(), m_StreamClient.RemotePort());
+ }
+ else
+ {
+#ifdef DEBUG
+ fprintf(stderr, "[ffnetdev] Streamer: Error closing client socket.\n");
+#endif
+ esyslog("[ffnetdev] Streamer: Error closing connection.");
+ m_Active=false;
+ continue;
+ }
+
+ }
+
+ int count=0;
+
+ m_StreamDevice->LockOutput();
+ uchar *buffer = m_StreamDevice->Get(count);
+ if (buffer!=NULL)
+ {
+ int available = count;
+ int done = 0;
+ int written = 0;
+ char data[100];
+ int rcvCount;
+
+ rcvCount=m_StreamClient.Read(data, 10);
+ if (rcvCount > 0)
+ {
+ isyslog("[ffnetdev] Streamer: empfangen:%d Bytes\n", rcvCount);
+ }
+
+ if (oldPacketTime == 0)
+ oldPacketTime = get_time()- UDP_SEND_INTERVALL;
+
+ while ((available > 0) && (have_Streamclient == true) &&
+ (!close_Streamclient_request))
+ {
+ while ((tsData.packsCount * TS_PACKET_SIZE < UDP_PACKET_SIZE) && (available > 0))
+ {
+ int moveCount = (available >= TS_PACKET_SIZE) ? TS_PACKET_SIZE : available;
+ moveCount = (moveCount + restData >= TS_PACKET_SIZE) ? TS_PACKET_SIZE - restData : moveCount;
+ memcpy(&tsData.data[(tsData.packsCount * TS_PACKET_SIZE) + restData], &buffer[done], moveCount);
+ available -= moveCount;
+ done += moveCount;
+ if (restData + moveCount == TS_PACKET_SIZE)
+ {
+ char *data = &tsData.data[tsData.packsCount * TS_PACKET_SIZE];
+ for (int i = 0; i < 4; i++)
+ {
+ tsData.tsHeaderCRC += (char)*(data + i);
+ }
+ restData = 0;
+ tsData.packsCount ++;
+ }
+ else
+ {
+ restData = moveCount;
+ continue;
+ }
+ }
+
+ if (restData > 0)
+ continue;
+
+ while (get_time() < oldPacketTime + UDP_SEND_INTERVALL)
+ cCondWait::SleepMs(1);
+
+ if (toSend == 0)
+ toSend = (long)(UDP_MAX_BITRATE * (((double)get_time() - oldPacketTime) / 1000000) / 8);
+
+ int sendcount = tsData.packsCount * TS_PACKET_SIZE + 3;
+ if (toSend < sendcount)
+ {
+ toSend = 0;
+ oldPacketTime = get_time();
+ continue;
+ }
+
+ char* pTsData = (char*)&tsData;
+ while ((sendcount > 0) && (have_Streamclient == true) &&
+ (!close_Streamclient_request))
+ {
+ if (((written=m_StreamClient.Write(pTsData, sendcount)) < 0) &&
+ (errno != EAGAIN))
+ {
+ isyslog("[ffnetdev] Streamer: Couldn't send data: %d %s Len:%d\n", errno, strerror(errno), sendcount);
+ CloseStreamClient();
+ }
+
+ if (written > 0)
+ {
+ sendcount -= written;
+ pTsData += written;
+ toSend -= written;
+ }
+ else
+ {
+ cCondWait::SleepMs(5);
+ }
+ }
+
+ if (sendcount == 0)
+ {
+ tsData.packsCount = 0;
+ tsData.tsHeaderCRC = 0;
+ tsData.packNr ++;
+ }
+ }
+ m_StreamDevice->Del(count);
+
+ gettimeofday(&curtime, 0);
+ if (oldtime.tv_sec == 0)
+ {
+ oldtime = curtime;
+ bytessend = 0;
+ oldbytessend = 0;
+ }
+
+ bytessend += count;
+ if (curtime.tv_sec > oldtime.tv_sec + 10)
+ {
+ double secs = (curtime.tv_sec * 1000 + (curtime.tv_usec / 1000)) / 1000
+ - (oldtime.tv_sec * 1000 + (oldtime.tv_usec / 1000)) / 1000;
+ double rate = (double)((bytessend - oldbytessend) / secs) * 8 / 1024 / 1024;
+#ifdef DEBUG
+ fprintf(stderr, "[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",
+ rate, bytessend - oldbytessend);
+#endif
+ dsyslog("[ffnetdev] Streamer: current TransferRate %2.3f MBit/Sec, %d Bytes send\n",
+ rate, bytessend - oldbytessend);
+
+ oldbytessend = bytessend;
+ oldtime = curtime;
+ }
+ }
+ m_StreamDevice->UnlockOutput();
+
+ }
+ else
+ {
+ /* simply discard all data in ringbuffer */
+ int count=0;
+ if ( (m_StreamDevice->Get(count)) !=NULL )
+ {
+ m_StreamDevice->Del(count);
+#ifdef DEBUG
+ fprintf (stderr, "[ffnetdev] Streamer: Bytes not sent, but deleted from ringbuffer: %d\n",count);
+#endif
+ dsyslog("[ffnetdev] Streamer: Bytes not sent, but deleted from ringbuffer: %d\n",count);
+ }
+ }
+ cCondWait::SleepMs(3);
+
+ } // while(m_Active)
+
+}
+
+/* Returns time since 1970 in microseconds */
+u64 cTSWorker::get_time(void)
+{
+ struct timeval tv;
+ struct timezone tz={0,0};
+
+ gettimeofday(&tv,&tz);
+ return ((u64)tv.tv_sec)*1000000+((u64)tv.tv_usec);
+}