summaryrefslogtreecommitdiff
path: root/tools/udp_pes_scheduler.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/udp_pes_scheduler.c')
-rw-r--r--tools/udp_pes_scheduler.c595
1 files changed, 595 insertions, 0 deletions
diff --git a/tools/udp_pes_scheduler.c b/tools/udp_pes_scheduler.c
new file mode 100644
index 00000000..5da4be74
--- /dev/null
+++ b/tools/udp_pes_scheduler.c
@@ -0,0 +1,595 @@
+/*
+ * udp_pes_scheduler.h: PES scheduler for UDP/RTP streams
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: udp_pes_scheduler.c,v 1.1 2006-06-03 10:04:28 phintuka Exp $
+ *
+ */
+
+#include <stdint.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>
+
+#include <vdr/config.h>
+#include <vdr/tools.h>
+
+#include "../logdefs.h"
+
+#include "udp_buffer.h"
+#include "pes.h"
+
+#include "udp_pes_scheduler.h"
+
+#include "../xine_input_vdr_net.h" // frame headers
+
+
+//----------------------- cTimePts ------------------------------------------
+
+cTimePts::cTimePts(void)
+{
+ Set();
+}
+
+int64_t cTimePts::Now(void)
+{
+ struct timeval t;
+
+ if (gettimeofday(&t, NULL) == 0) {
+ t.tv_sec -= tbegin.tv_sec;
+ if(t.tv_usec < tbegin.tv_usec) {
+ t.tv_sec--;
+ t.tv_usec += 1000000;
+ }
+ t.tv_usec -= tbegin.tv_usec;
+
+ return (uint64(t.tv_sec)) * 90000LL +
+ (uint64(t.tv_usec)) * 90LL / 1000LL +
+ begin;
+ }
+
+ return 0;
+}
+
+void cTimePts::Set(int64_t Pts)
+{
+ gettimeofday(&tbegin, NULL);
+ begin = Pts;
+}
+
+//----------------------- cUdpScheduler -------------------------------------
+
+//#define LOG_RESEND
+//#define LOG_SCR
+
+const int MAX_QUEUE_SIZE = 64; // ~ 65 ms with typical DVB stream
+const int MAX_LIVE_QUEUE_SIZE = (64+32); // ~ 100 ms with typical DVB stream
+const int HARD_LIMIT = (4*1024); // ~ 40 Mbit/s === 4 Mb/s
+
+// initial burst length after seek (500ms = ~13 video frames)
+const int64_t INITIAL_BURST_TIME = (int64_t)(45000); // pts units (90kHz)
+
+// assume seek when when pts difference between two frames exceeds this (1.5 seconds)
+const int64_t JUMP_LIMIT_TIME = (int64_t)(3*90000/2); // pts units (90kHz)
+
+cUdpScheduler::cUdpScheduler()
+{
+
+ // Scheduler data
+
+ current_audio_vtime = 0;
+ current_video_vtime = 0;
+
+#ifdef LOG_SCR
+ data_sent = 0;
+ frames_sent = 0;
+ frame_rate = 2000;
+ prev_frames = 200;
+#endif
+
+ last_delay_time = 0;
+
+ // queuing
+
+ int i;
+ for(i=0; i<MAX_UDP_HANDLES; i++)
+ m_Handles[i] = -1;
+
+ m_BackLog = new cUdpBackLog;
+
+ m_QueueNextSeq = 0;
+ m_QueuePending = 0;
+
+ // Thread
+
+ m_Running = 1;
+
+ Start();
+}
+
+cUdpScheduler::~cUdpScheduler()
+{
+ m_Lock.Lock();
+ m_Running = 0;
+ m_Cond.Broadcast();
+ m_Lock.Unlock();
+
+ Cancel(3);
+
+ delete m_BackLog;
+}
+
+bool cUdpScheduler::AddHandle(int fd)
+{
+ cMutexLock ml(&m_Lock);
+
+ int i;
+
+ for(i=0; i<MAX_UDP_HANDLES; i++)
+ if(m_Handles[i] < 0 || m_Handles[i] == fd) {
+ m_Handles[i] = fd;
+ return true;
+ }
+
+ return false;
+}
+
+void cUdpScheduler::RemoveHandle(int fd)
+{
+ cMutexLock ml(&m_Lock);
+
+ int i;
+ for(i=0; i<MAX_UDP_HANDLES; i++)
+ if(m_Handles[i] == fd)
+ break;
+
+ for(; i<MAX_UDP_HANDLES-1; i++)
+ m_Handles[i] = m_Handles[i+1];
+
+ m_Handles[MAX_UDP_HANDLES-1] = -1;
+
+ if(m_Handles[0] < 0) {
+ // No clients left ...
+
+ // Flush all buffers
+ m_QueueNextSeq = 0;
+ m_QueuePending = 0;
+ delete m_BackLog;
+ m_BackLog = new cUdpBackLog;
+ }
+}
+
+bool cUdpScheduler::Poll(int TimeoutMs, bool Master)
+{
+ cMutexLock ml(&m_Lock);
+
+ m_Master = Master;
+
+ if(m_Handles[0] < 0) {
+ // no clients, so we can eat all data we are given ...
+ return true;
+ }
+
+ uint64_t WaitEnd = cTimeMs::Now();
+ if(TimeoutMs >= 0)
+ WaitEnd += (uint64_t)TimeoutMs;
+
+ int limit = m_Master ? MAX_QUEUE_SIZE : MAX_LIVE_QUEUE_SIZE;
+ while(cTimeMs::Now() < WaitEnd &&
+ m_Running &&
+ m_QueuePending >= limit)
+ m_Cond.TimedWait(m_Lock, 5);
+
+ return m_QueuePending < limit;
+}
+
+bool cUdpScheduler::Flush(int TimeoutMs)
+{
+ uint64_t WaitEnd = cTimeMs::Now();
+ if(TimeoutMs >= 0)
+ WaitEnd += (uint64_t)TimeoutMs;
+
+ cMutexLock ml(&m_Lock);
+
+ if(m_Handles[0] < 0)
+ return true;
+
+ while(cTimeMs::Now() < WaitEnd &&
+ m_Running &&
+ m_QueuePending > 0)
+ m_Cond.TimedWait(m_Lock, 25);
+
+ return m_QueuePending == 0;
+}
+
+void cUdpScheduler::Clear(void)
+{
+ cMutexLock ml(&m_Lock);
+
+ m_BackLog->Clear(m_QueuePending);
+ m_QueuePending = 0;
+
+ m_Cond.Broadcast();
+}
+
+bool cUdpScheduler::Queue(uint64_t StreamPos, const uchar *Data, int Length)
+{
+ cMutexLock ml(&m_Lock);
+
+ if(m_Handles[0] < 0)
+ return true;
+
+ if(m_QueuePending >= MAX_QUEUE_SIZE)
+ return false;
+
+ m_BackLog->MakeFrame(StreamPos, Data, Length);
+ m_QueuePending++;
+
+ m_Cond.Broadcast();
+
+ return true;
+}
+
+int cUdpScheduler::calc_elapsed_vtime(int64_t pts, bool Audio)
+{
+ int64_t diff = 0;
+
+ if(!Audio /*Video*/) {
+ /* #warning TODO: should be possible to use video pts too (if ac3 or muted ...) */
+ diff = pts - current_video_vtime;
+ if(diff < 0) diff = -diff;
+ if(diff > JUMP_LIMIT_TIME) { // 1 s (must be > GOP)
+ // RESET
+#ifdef LOG_SCR
+ LOGDBG("cUdpScheduler RESET (Video jump %lld->%lld)",
+ current_audio_vtime, pts);
+ data_sent = frames_sent = 0;
+#endif
+ //diff = 0;
+ current_video_vtime = pts;
+ return -1;
+ }
+ current_video_vtime = pts;
+
+ } else if(Audio) {
+ diff = pts - current_audio_vtime;
+ if(diff < 0) diff = -diff;
+ if(diff > JUMP_LIMIT_TIME) { // 1 sec
+ // RESET
+#ifdef LOG_SCR
+ LOGDBG("cUdpScheduler RESET (Audio jump %lld->%lld)",
+ current_audio_vtime, pts);
+ data_sent = frames_sent = 0;
+#endif
+ //diff = 0;
+ current_audio_vtime = pts;
+
+ // Use audio pts for sync (audio has constant and increasing intervals)
+ MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME);
+
+ return -1;
+ }
+ current_audio_vtime = pts;
+ }
+
+#ifdef LOG_SCR
+ if(diff && Audio) {
+ frame_rate = (int)(90000*frames_sent/(int)diff);
+ LOGDBG("rate %d kbit/s (%d frames/s)",
+ (int)(90*data_sent/((int)diff)*8), frame_rate);
+ prev_frames = frames_sent;
+ data_sent = frames_sent = 0;
+ }
+#endif
+
+ return (int) diff;
+}
+
+void cUdpScheduler::Schedule(const uchar *Data, int Length)
+{
+ bool Audio=false, Video=false;
+ int64_t pts = pes_extract_pts(Data, Length, Audio, Video);
+ int elapsed = pts>0 ? calc_elapsed_vtime(pts, Audio) : 0;
+
+#ifdef LOG_SCR
+ if(elapsed > 0)
+ LOGMSG("PTS: %lld (%s) elapsed %d ms",
+ pts, Video?"Video":Audio?"Audio":"?", elapsed/90);
+#endif
+
+ if(elapsed > 0 && Audio/*Video*/) {
+ int64_t now = MasterClock.Now();
+ if(now > current_audio_vtime && (now - current_audio_vtime)>JUMP_LIMIT_TIME) {
+#ifdef LOG_SCR
+ LOGMSG("cUdpScheduler MasterClock init (was in past)");
+ elapsed = -1;
+#endif
+ MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME);
+ } else if(now < current_audio_vtime && (current_audio_vtime-now)>JUMP_LIMIT_TIME) {
+#ifdef LOG_SCR
+ LOGMSG("cUdpScheduler MasterClock init (was in future)");
+ elapsed = -1;
+#endif
+ MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME);
+ } else if(!last_delay_time) {
+ // first burst done, no delay yet ???
+ // (queue up to xxx bytes first)
+ } else {
+ if(current_audio_vtime > now) {
+ int delay_ms = (int)(current_audio_vtime - now)/90;
+#ifdef LOG_SCR
+ LOGDBG("cUdpScheduler sleeping %d ms "
+ "(time reference: %s, beat interval %d ms)",
+ delay_ms, (Audio?"Audio PTS":"Video PTS"), elapsed);
+#endif
+ if(delay_ms > 3) {
+ //LOGMSG("sleep %d ms (%d f)", delay_ms, prev_frames);
+ CondWait.Wait(delay_ms);
+ }
+ }
+ }
+ last_delay_time = now;
+ }
+
+#if 0
+ static int win = 0;
+ static int64_t prev;
+
+ if(data_sent == 0 || elapsed < 0) {
+ win = 0;
+ prev = MasterClock.Now();
+ }
+ win ++;
+ int mrate = 3*frame_rate/2;
+ if(mrate < 100) mrate = 100;
+ if(mrate > 2000) mrate = 2000;
+ if(MasterClock.Now() - prev >= win*90000 / frame_rate) {
+ LOGMSG("sleep:3");
+ CondWait.Wait(3);
+ }
+#endif
+
+#ifdef LOG_SCR
+ data_sent += Length;
+ frames_sent ++;
+#endif
+}
+
+void cUdpScheduler::Action(void)
+{
+#if 0
+ {
+ // Request real-time scheduling
+ sched_param temp;
+ temp.sched_priority = 2;
+
+ if (!pthread_setschedparam(pthread_self(), SCHED_RR, &temp)) {
+ LOGMSG("cUdpScheduler priority set successful SCHED_RR %d [%d,%d]",
+ temp.sched_priority,
+ sched_get_priority_min(SCHED_RR),
+ sched_get_priority_max(SCHED_RR));
+ } else {
+ LOGMSG("cUdpScheduer: Can't set priority to SCHED_RR %d [%d,%d]",
+ temp.sched_priority,
+ sched_get_priority_min(SCHED_RR),
+ sched_get_priority_max(SCHED_RR));
+ }
+
+ /* UDP Scheduler needs high priority */
+ SetPriority(0);
+ SetPriority(-1);
+ SetPriority(-2);
+ SetPriority(-3);
+ SetPriority(-4);
+ SetPriority(-5);
+ }
+#endif
+
+ m_Lock.Lock();
+
+ while(m_Running) {
+
+ if(m_Handles[0] < 0) {
+ m_Cond.TimedWait(m_Lock, 5000);
+ continue;
+ }
+
+ // Wait until we have outgoing data in queue
+ if(m_QueuePending <= 0) {
+ m_Cond.TimedWait(m_Lock, 100);
+ if(m_QueuePending <= 0) {
+ static unsigned char padding[] = {0x00,0x00,0x01,0xBE,0x00,0x02,0xff,0xff};
+ int prevseq = (m_QueueNextSeq + UDP_BUFFER_SIZE - 1) & UDP_BUFFER_MASK;
+ stream_udp_header_t *frame = m_BackLog->Get(prevseq);
+ if(frame)
+ m_BackLog->MakeFrame(ntohll(frame->pos), padding, 8);
+ else
+ m_BackLog->MakeFrame(0, padding, 8);
+ m_QueuePending++;
+ }
+ continue; // to check m_Running
+ }
+
+ // Take next frame from queue
+ stream_udp_header_t *frame = m_BackLog->Get(m_QueueNextSeq);
+ int PayloadSize = m_BackLog->PayloadSize(m_QueueNextSeq);
+ int UdpPacketLen = PayloadSize + sizeof(stream_udp_header_t);
+ m_QueueNextSeq = (m_QueueNextSeq + 1) & UDP_BUFFER_MASK;
+ m_QueuePending--;
+
+ m_Cond.Broadcast();
+
+ m_Lock.Unlock();
+
+#if 0 /* debugging checks */
+ {
+ if(!frame)
+ LOGMSG("frame == NULL !");
+ uint8_t *p = UDP_PAYLOAD(frame);
+
+ if(p[0] || p[1] || p[2]!=1)
+ LOGMSG("cUdpScheduler: invalid content");
+
+ int n = sizeof(stream_udp_header_t) + (p[4]<<8) + p[5] + 6;
+ if(n != UdpPacketLen)
+ LOGMSG("cUdpScheduler: length error -- %d != %d", n, UdpPacketLen);
+
+ static int seq = 0;
+ if(seq != ntohs(frame->seq))
+ LOGMSG("cUdpScheduler: SEQ jump %d -> %d !", seq, ntohs(frame->seq));
+ seq = (ntohs(frame->seq) + 1) & UDP_BUFFER_MASK;
+
+ if(PayloadSize != 8) {
+ static uint64_t pos = 0;
+ if(pos != ntohull(frame->pos))
+ LOGMSG("cUdpScheduler: POS jump %lld -> %lld !", pos, ntohull(frame->pos));
+ pos = ntohull(frame->pos) + PayloadSize;
+ }
+ }
+#endif
+
+ // Schedule frame
+ if(m_Master)
+ Schedule(UDP_PAYLOAD(frame), PayloadSize);
+
+ /* need some limit here for ex. sequence of stills when moving cutting marks very fast
+ (no audio or PTS available) */
+#if 1
+ // hard limit for used bandwidth:
+ // - ~1 frames/ms & 8kb/ms -> 8mb/s -> ~ 80 Mbit/s ( / client)
+ // - max burst 15 frames or 30kb
+ static int cnt = 0, bytes = 0;
+ static uint64_t dbg_timer = cTimeMs::Now();
+ static int dbg_bytes = 0;
+ cnt++;
+ bytes += PayloadSize;
+ if(cnt>=15 && bytes >= 30000) {
+ CondWait.Wait(4);
+ dbg_bytes += bytes;
+ cnt = 0;
+ bytes = 0;
+ if(dbg_timer+60000 <= cTimeMs::Now()) {
+ LOGDBG("UDP rate: %4d Kbps (queue %d)", dbg_bytes/(60*1024/8),
+ m_QueuePending);
+ dbg_bytes = 0;
+ dbg_timer = cTimeMs::Now();
+ }
+ }
+#endif
+
+ for(int i=0; i<MAX_UDP_HANDLES && m_Handles[i]>=0; i++) {
+
+ //
+ // use TIOCOUTQ ioctl instead of poll/select.
+ // - poll/select for UDP/RTP may return true even when queue
+ // is (almost) full
+ // - kernel silently drops frames it cant send
+ // -> poll() + send() just causes frames to be dropped
+ //
+ int size = 0;
+ if(!ioctl(m_Handles[i], TIOCOUTQ, &size))
+ if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer
+ int wmem=0;
+ socklen_t l = sizeof(int);
+ if(!getsockopt(m_Handles[i], SOL_SOCKET, SO_SNDBUF, &wmem, &l)) {
+#if 0
+// Large bursts cause client to loose data :(
+ if(size >= (wmem/2 - 8128)) {
+ LOGMSG("cUdpScheduler: kernel transmit queue > ~%dkb ! (master=%d)",
+ (wmem/2-8128)/1024, m_Master);
+ CondWait.Wait(2);
+ }
+ else
+#endif
+ {
+ if(m_QueuePending > (MAX_QUEUE_SIZE-5))
+ LOGMSG("cUdpScheduler: kernel transmit queue > ~30kb ! (master=%d ; Queue=%d)",
+ m_Master, m_QueuePending);
+ CondWait.Wait(2);
+ }
+ }
+ }
+
+ if(send(m_Handles[i], frame, UdpPacketLen, 0) <= 0)
+ LOGERR("cUdpScheduler: UDP send() failed !");
+ }
+
+ m_Lock.Lock();
+ }
+
+ m_Lock.Unlock();
+}
+
+void cUdpScheduler::ReSend(int fd, uint64_t Pos, int Seq1, int Seq2)
+{
+ cMutexLock ml(&m_Lock); // keeps also scheduler thread suspended ...
+
+ // Handle buffer wrap
+ if(Seq1 > Seq2)
+ Seq2 += UDP_BUFFER_SIZE;
+
+ if(Seq2-Seq1 > 64) {
+ LOGDBG("cUdpScheduler::ReSend: requested range too large (%d-%d)",
+ Seq1, Seq2);
+ return;
+ }
+
+ // re-send whole range
+ for(; Seq1 <= Seq2; Seq1++) {
+
+ // Wait if kernel queue is full
+ int size=0;
+ if(!ioctl(fd, TIOCOUTQ, &size))
+ if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer
+ LOGDBG("cUdpScheduler::ReSend: kernel transmit queue > ~30kb !");
+ cCondWait::SleepMs(2);
+ }
+
+ stream_udp_header_t *frame = m_BackLog->Get(Seq1);
+
+ if(frame) {
+ if(ntohull(frame->pos) == Pos) {
+ send(fd,
+ frame,
+ m_BackLog->PayloadSize(Seq1) + sizeof(stream_udp_header_t),
+ 0);
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: %d (%d bytes) @%lld sent",
+ Seq1, m_BackLog->PayloadSize(Seq1), Pos);
+#endif
+ Pos += m_BackLog->PayloadSize(Seq1);
+ continue;
+ } else {
+ // buffer has been lost long time ago...
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: Requested position does not match "
+ "(%lld ; has %lld)", Pos, ntohll(frame->pos));
+#endif
+ }
+ } else {
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: %d @%lld missing", Seq1, Pos);
+#endif
+ }
+ // buffer has been lost
+ // send packet missing info
+ char udp_ctrl[64];
+ ((stream_udp_header_t *)udp_ctrl)->seq = (uint16_t)(-1);
+ ((stream_udp_header_t *)udp_ctrl)->pos = (uint64_t)(-1);
+
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: missing %d-%d @%d (hdr 0x%llx 0x%x)",
+ Seq1, Seq1, Pos,
+ ((stream_udp_header_t *)udp_ctrl)->pos,
+ ((stream_udp_header_t *)udp_ctrl)->seq);
+#endif
+ sprintf((udp_ctrl+sizeof(stream_udp_header_t)),
+ "UDP MISSING %d-%d %lld",
+ Seq1, Seq1, Pos);
+
+ send(fd, udp_ctrl, 64, 0);
+ }
+}