summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorphintuka <phintuka>2006-06-03 10:04:49 +0000
committerphintuka <phintuka>2006-06-03 10:04:49 +0000
commit0e345486181ef82b681dd6047f3b6ccb44c77146 (patch)
treea5401c7f97ab047a0afa890e6806d8537564102a /tools
parent321eb114c9fe9abd954ce4270595d53df6cccbae (diff)
downloadxineliboutput-0_99rc4.tar.gz
xineliboutput-0_99rc4.tar.bz2
Initial importxineliboutput-0_99rc4
Diffstat (limited to 'tools')
-rw-r--r--tools/backgroundwriter.c215
-rw-r--r--tools/backgroundwriter.h62
-rw-r--r--tools/cxsocket.c3
-rw-r--r--tools/cxsocket.h192
-rw-r--r--tools/future.h84
-rw-r--r--tools/general_remote.h27
-rw-r--r--tools/listiter.h83
-rw-r--r--tools/pes.h258
-rw-r--r--tools/timer.c292
-rw-r--r--tools/timer.h296
-rw-r--r--tools/udp_buffer.h115
-rw-r--r--tools/udp_pes_scheduler.c595
-rw-r--r--tools/udp_pes_scheduler.h103
13 files changed, 2325 insertions, 0 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c
new file mode 100644
index 00000000..26e1f34b
--- /dev/null
+++ b/tools/backgroundwriter.c
@@ -0,0 +1,215 @@
+/*
+ * backgroundwriter.h: Buffered socket/file writing thread
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: backgroundwriter.c,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+#include <stdint.h>
+#include <unistd.h>
+
+#include <vdr/tools.h>
+
+#include "../logdefs.h"
+#include "../xine_input_vdr_net.h" // stream_tcp_header_t
+
+#include "backgroundwriter.h"
+
+//#define DISABLE_DISCARD
+//#define LOG_DISCARDS
+
+
+cBackgroundWriter::cBackgroundWriter(int fd, int Size)
+ : m_RingBuffer(Size, sizeof(stream_tcp_header_t))
+{
+ m_fd = fd;
+ m_RingBuffer.SetTimeouts(0, 100);
+ m_Active = true;
+
+ m_PutPos = 0;
+ m_DiscardStart = 0;
+ m_DiscardEnd = 0;
+
+ LOGDBG("cBackgroundWriter initialized (buffer %d kb)", Size/1024);
+
+ Start();
+}
+
+cBackgroundWriter::~cBackgroundWriter()
+{
+ m_Active = false;
+ Cancel(3);
+}
+
+int cBackgroundWriter::Free(void)
+{
+ return m_RingBuffer.Free();
+}
+
+void cBackgroundWriter::Action(void)
+{
+ uint64_t NextHeaderPos = 0ULL;
+ uint64_t GetPos = 0ULL;
+ cPoller Poller(m_fd, true);
+
+ while(m_Active) {
+
+ if(Poller.Poll(100)) {
+
+ int Count = 0;
+ uchar *Data = m_RingBuffer.Get(Count);
+
+ if(Data && Count > 0) {
+
+#ifndef DISABLE_DISCARD
+ Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
+ if(m_DiscardEnd > GetPos) {
+
+# ifdef LOG_DISCARDS
+ LOGMSG("TCP: queue: discard request: queue %d bytes, "
+ "next point %d bytes forward (Count=%d)",
+ m_RingBuffer.Available(),
+ NextHeaderPos - GetPos,
+ Count);
+# endif
+ if(NextHeaderPos == GetPos) {
+ // we're at frame boundary
+# ifdef LOG_DISCARDS
+ uint8_t *pkt = TCP_PAYLOAD(Data);
+ if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) {
+ LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]);
+ }
+# endif
+ Count = min(Count, (int)(m_DiscardEnd - GetPos));
+# ifdef LOG_DISCARDS
+ LOGMSG("Flushing %d bytes", Count);
+#endif
+ Unlock();
+
+ m_RingBuffer.Del(Count);
+ GetPos += Count;
+ NextHeaderPos = GetPos;
+# ifdef LOG_DISCARDS
+ LOGMSG("Queue now %d bytes", m_RingBuffer.Available());
+ pkt = TCP_PAYLOAD(Data);
+ if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) {
+ LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]);
+# endif
+ continue;
+ }
+ }
+ Unlock();
+#endif
+
+#ifndef DISABLE_DISCARD
+ if(GetPos == NextHeaderPos) {
+ if(Count < (int)sizeof(stream_tcp_header_t))
+ LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");
+
+ stream_tcp_header_t *header = (stream_tcp_header_t*)Data;
+ if(Count < (int)(ntohl(header->len) + sizeof(stream_tcp_header_t)))
+ ;//LOGMSG("Count = %d < %d", Count,
+ // header->len + sizeof(stream_tcp_header_t));
+ else
+ Count = ntohl(header->len) + sizeof(stream_tcp_header_t);
+ NextHeaderPos = GetPos + ntohl(header->len) + sizeof(stream_tcp_header_t);
+ } else {
+ Count = min(Count, (int)(NextHeaderPos-GetPos));
+ }
+#endif
+
+ errno = 0;
+ int n = write(m_fd, Data, Count);
+
+ if(n == 0) {
+ LOGERR("cBackgroundWriter: Client disconnected data stream ?");
+ break;
+
+ } else if(n < 0) {
+
+ if (errno == EINTR || errno == EWOULDBLOCK) {
+ TRACE("cBackgroundWriter: EINTR while writing to file handle "
+ <<m_fd<<" - retrying");
+ continue;
+
+ } else {
+ LOGERR("cBackgroundWriter: TCP write error");
+ break;
+ }
+ }
+
+ GetPos += n;
+ m_RingBuffer.Del(n);
+ }
+ }
+ }
+
+ m_RingBuffer.Clear();
+ m_Active = false;
+}
+
+void cBackgroundWriter::Clear(void)
+{
+ // Can't just drop buffer contents or PES frames will be broken.
+
+ // Serialize with Put
+ LOCK_THREAD;
+#ifdef LOG_DISCARDS
+ LOGMSG("cBackgroundWriter::Clear() @%lld", m_PutPos);
+#endif
+ m_DiscardEnd = m_PutPos;
+}
+
+bool cBackgroundWriter::Flush(int TimeoutMs)
+{
+ uint64_t WaitEnd = cTimeMs::Now();
+
+ if(TimeoutMs > 0)
+ WaitEnd += (uint64_t)TimeoutMs;
+
+ while(cTimeMs::Now() < WaitEnd &&
+ m_Active &&
+ m_RingBuffer.Available() > 0)
+ cCondWait::SleepMs(3);
+
+ return m_RingBuffer.Available() <= 0;
+}
+
+int cBackgroundWriter::Put(uint64_t StreamPos,
+ const uchar *Data, int DataCount)
+{
+ stream_tcp_header_t header;
+ header.pos = htonull(StreamPos);
+ header.len = htonl(DataCount);
+ return Put((uchar*)&header, sizeof(header), Data, DataCount);
+}
+
+int cBackgroundWriter::Put(const uchar *Header, int HeaderCount,
+ const uchar *Data, int DataCount)
+{
+ if(m_Active) {
+
+ // Serialize Put access to keep Data and Header together
+ LOCK_THREAD;
+
+ if(m_RingBuffer.Free() < HeaderCount+DataCount) {
+ LOGMSG("cXinelibServer: TCP buffer overflow !");
+ return -HeaderCount-DataCount;
+ }
+ int n = m_RingBuffer.Put(Header, HeaderCount) +
+ m_RingBuffer.Put(Data, DataCount);
+ if(n == HeaderCount+DataCount) {
+ m_PutPos += n;
+ return n;
+ }
+
+ LOGMSG("cXinelibServer: TCP buffer internal error ?!?");
+ m_RingBuffer.Clear();
+ m_Active = false;
+ }
+
+ return 0;
+}
diff --git a/tools/backgroundwriter.h b/tools/backgroundwriter.h
new file mode 100644
index 00000000..619b2a29
--- /dev/null
+++ b/tools/backgroundwriter.h
@@ -0,0 +1,62 @@
+/*
+ * backgroundwriter.h: Buffered socket/file writing thread
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: backgroundwriter.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+#ifndef __BACKGROUNDWRITER_H
+#define __BACKGROUNDWRITER_H
+
+#include <stdint.h>
+
+#include <vdr/thread.h>
+#include <vdr/ringbuffer.h>
+
+
+class cBackgroundWriter : public cThread {
+
+ private:
+ cRingBufferLinear m_RingBuffer;
+
+ volatile bool m_Active;
+ int m_fd;
+
+ uint64_t m_PutPos;
+ uint64_t m_DiscardStart;
+ uint64_t m_DiscardEnd;
+
+ protected:
+ virtual void Action(void);
+
+ int Put(const uchar *Header, int HeaderCount,
+ const uchar *Data, int DataCount);
+
+ public:
+ cBackgroundWriter(int fd, int Size = KILOBYTE(512));
+ virtual ~cBackgroundWriter() ;
+
+ // Return largest possible Put size
+ int Free(void);
+
+ // Add PES frame to buffer
+ //
+ // Return value:
+ // Success: Count (all bytes pushed to queue)
+ // Error: 0 (write error ; socket disconnected)
+ // Buffer full: -Count (no bytes will be pushed to queue)
+ //
+ int Put(uint64_t StreamPos, const uchar *Data, int DataCount);
+
+ // Drop all data (only complete frames) from buffer
+ void Clear(void);
+
+ //bool Poll(int TimeoutMs);
+ bool Flush(int TimeoutMs);
+};
+
+
+#endif
diff --git a/tools/cxsocket.c b/tools/cxsocket.c
new file mode 100644
index 00000000..a8a1953e
--- /dev/null
+++ b/tools/cxsocket.c
@@ -0,0 +1,3 @@
+/*static*/ int cxsocket_c_dummy;
+
+/* #warning TODO: socket helper classes / functions */
diff --git a/tools/cxsocket.h b/tools/cxsocket.h
new file mode 100644
index 00000000..f3d823e5
--- /dev/null
+++ b/tools/cxsocket.h
@@ -0,0 +1,192 @@
+/*
+ * cxsocket.h: Socket wrapper classes
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: cxsocket.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+#ifndef __CXSOCKET_H
+#define __CXSOCKET_H
+
+#define CLOSESOCKET(fd) do { if(fd>=0) { close(fd); fd=-1; } } while(0)
+
+//
+// Connect data socket to client (take address from fd_control)
+//
+static inline int sock_connect(int fd_control, int port, int type)
+{
+ struct sockaddr_in sin;
+ socklen_t len = sizeof(sin);
+ int s;
+
+ if(getpeername(fd_control, (struct sockaddr *)&sin, &len)) {
+ LOGERR("sock_connect: getpeername failed");
+ return -1;
+ }
+
+ uint32_t tmp = ntohl(sin.sin_addr.s_addr);
+ LOGMSG("Client address: %d.%d.%d.%d",
+ ((tmp>>24)&0xff), ((tmp>>16)&0xff),
+ ((tmp>>8)&0xff), ((tmp)&0xff));
+
+#if 0
+ if ((h = gethostbyname(tmp)) == NULL) {
+ LOGDBG("sock_connect: unable to resolve host name", tmp);
+ }
+#endif
+
+ if ((s = socket(PF_INET, type,
+ type==SOCK_DGRAM?IPPROTO_UDP:IPPROTO_TCP)) < 0) {
+ LOGERR("sock_connect: failed to create socket");
+ return -1;
+ }
+
+#if 1
+ // Set socket buffers: large send buffer, small receive buffer
+ {
+ int max_buf = KILOBYTE(128);
+ //while(max_buf) {
+ errno = 0;
+ if(setsockopt(s, SOL_SOCKET, SO_SNDBUF, &max_buf, sizeof(int))) {
+ LOGERR("setsockopt(SO_SNDBUF,%d) failed", max_buf);
+ max_buf >>= 1;
+ } else {
+ int tmp = 0;
+ int len = sizeof(int);
+ errno = 0;
+ if(getsockopt(s, SOL_SOCKET, SO_SNDBUF, &tmp, (socklen_t*)&len)) {
+ LOGERR("getsockopt(SO_SNDBUF,%d) failed", max_buf);
+ max_buf >>= 1;
+ } else if(tmp != max_buf) {
+ LOGDBG("setsockopt(SO_SNDBUF): got %d bytes", tmp);
+ max_buf >>= 1;
+ }
+ }
+ max_buf = 1024;
+ setsockopt(s, SOL_SOCKET, SO_RCVBUF, &max_buf, sizeof(int));
+ }
+#endif
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(port);
+
+ if (connect(s, (struct sockaddr *)&sin, sizeof(sin))==-1 &&
+ errno != EINPROGRESS) {
+ LOGERR("connect() failed");
+ CLOSESOCKET(s);
+ }
+
+ if (fcntl (s, F_SETFL, fcntl (s, F_GETFL) | O_NONBLOCK) == -1) {
+ LOGERR("can't put socket in non-blocking mode");
+ CLOSESOCKET(s);
+ return -1;
+ }
+
+ return s;
+}
+
+static inline int timed_write(int fd, const void *buffer, size_t size,
+ int timeout_ms)
+{
+ int written = size;
+ const unsigned char *ptr = (const unsigned char *)buffer;
+ cPoller poller(fd, true);
+
+ while (size > 0) {
+
+ if(!poller.Poll(timeout_ms)) {
+ LOGERR("timed_write: poll() failed");
+ return written-size;
+ }
+
+ errno = 0;
+ int p = write(fd, ptr, size);
+
+ if (p <= 0) {
+ if (errno == EINTR || errno == EAGAIN) {
+ LOGDBG("timed_write: EINTR during write(), retrying");
+ continue;
+ }
+ LOGERR("timed_write: write() error");
+ return p;
+ }
+
+ ptr += p;
+ size -= p;
+ }
+
+ return written;
+}
+
+//#include "xine_osd_command.h"
+
+static inline int write_osd_command(int fd, osd_command_t *cmd)
+{
+ if(8 != timed_write(fd, "OSDCMD\r\n", 8, 200)) {
+ LOGDBG("write_osd_command: write (command) failed");
+ return 0;
+ }
+ if(sizeof(osd_command_t) !=
+ timed_write(fd, cmd, sizeof(osd_command_t), 200)) {
+ LOGDBG("write_osd_command: write (data) failed");
+ return 0;
+ }
+ if(cmd->palette && cmd->colors &&
+ (int)(sizeof(xine_clut_t)*ntohl(cmd->colors)) !=
+ timed_write(fd, cmd->palette, (int)(sizeof(xine_clut_t)*ntohl(cmd->colors)), 200)) {
+ LOGDBG("write_osd_command: write (palette) failed");
+ return 0;
+ }
+ if(cmd->data && cmd->datalen &&
+ (int)ntohl(cmd->datalen) != timed_write(fd, cmd->data, ntohl(cmd->datalen), 1000)) {
+ LOGDBG("write_osd_command: write (bitmap) failed");
+ return 0;
+ }
+ return 1;
+}
+
+static inline int write_str(int fd, const char *str, int timeout_ms=-1)
+{
+ return timed_write(fd, str, strlen(str), timeout_ms);
+}
+
+static inline int write_cmd(int fd, const char *str)
+{
+ return write_str(fd, str, 10);
+}
+
+static inline int udp_discovery_broadcast(int fd_discovery, int m_Port)
+{
+ if(!xc.remote_usebcast) {
+ LOGDBG("UDP broadcasts (discovery) disabled in configuration");
+ return -1;
+ }
+
+ struct sockaddr_in sin;
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(DISCOVERY_PORT);
+ sin.sin_addr.s_addr = INADDR_BROADCAST;
+
+ char *test = NULL;
+ asprintf(&test,
+ "VDR xineliboutput DISCOVERY 1.0\r\n"
+ "Server port: %d\r\n"
+ "\r\n",
+ m_Port);
+ int testlen = strlen(test);
+ if(testlen != sendto(fd_discovery, test, testlen, 0,
+ (struct sockaddr *)&sin, sizeof(sin))) {
+ LOGERR("UDP broadcast send failed (discovery)");
+ return -1;
+ } else {
+ LOGDBG("UDP broadcast send succeed (discovery)");
+ }
+ return 1;
+}
+
+
+#endif // __CXSOCKET_H
diff --git a/tools/future.h b/tools/future.h
new file mode 100644
index 00000000..73d4bfec
--- /dev/null
+++ b/tools/future.h
@@ -0,0 +1,84 @@
+/*
+ * future.h: A variable that gets its value in future.
+ * Used to convert asynchronous IPCs to synchronous.
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: future.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+#ifndef __FUTURE_H
+#define __FUTURE_H
+
+#include <vdr/thread.h>
+
+template <class T>
+class cFuture {
+
+ private:
+ cMutex mutex;
+ cCondVar cond;
+ bool m_Ready;
+ T m_Value;
+
+ public:
+
+ cFuture()
+ {
+ m_Ready = false;
+ }
+
+ void Reset(void)
+ {
+ cMutexLock l(&mutex);
+ m_Ready = false;
+ }
+
+ //
+ // Producer interface
+ //
+
+ void Set(T& Value)
+ {
+ cMutexLock l(&mutex);
+ m_Value = Value;
+ m_Ready = true;
+ cond.Broadcast();
+ }
+
+ //
+ // Consumer interface
+ //
+
+ bool Wait(int Timeout = -1)
+ {
+ cMutexLock l(&mutex);
+
+ if(Timeout >= 0)
+ return cond.TimedWait(mutex, Timeout);
+
+ while(!m_Ready)
+ cond.Wait(mutex);
+
+ return true;
+ }
+
+ bool IsReady(void)
+ {
+ cMutexLock l(&mutex);
+ return m_Ready;
+ }
+
+ T Value(void)
+ {
+ cMutexLock l(&mutex);
+ while(!m_Ready)
+ cond.Wait(mutex);
+ return m_Value;
+ }
+};
+
+
+#endif // __FUTURE_H
diff --git a/tools/general_remote.h b/tools/general_remote.h
new file mode 100644
index 00000000..aed60463
--- /dev/null
+++ b/tools/general_remote.h
@@ -0,0 +1,27 @@
+/*
+ * general_remote.h:
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: general_remote.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+#ifndef __GENERAL_REMOTE_H
+#define __GENERAL_REMOTE_H
+
+
+//----------------------------- cGeneralRemote --------------------------------
+
+#include <vdr/remote.h>
+
+class cGeneralRemote : public cRemote {
+ public:
+ cGeneralRemote(const char *Name) : cRemote(Name) {};
+ bool Put(const char *Code, bool Repeat=false, bool Release=false)
+ { return cRemote::Put(Code, Repeat, Release); };
+};
+
+
+#endif
diff --git a/tools/listiter.h b/tools/listiter.h
new file mode 100644
index 00000000..9c88940e
--- /dev/null
+++ b/tools/listiter.h
@@ -0,0 +1,83 @@
+/*
+ * listiter.h:
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: listiter.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+
+#ifndef _LISTITER_H_
+#define _LISTITER_H_
+
+//------------------------------ list ----------------------------------------
+
+template <class LIST,class ITEM, class RESULT>
+void ForEach(LIST& List, RESULT (ITEM::*f)())
+{
+ for(ITEM *it = List.First(); it; it = List.Next(it))
+ (*it.*f)();
+}
+
+template <class LIST,class ITEM, class ARG1, class RESULT>
+void ForEach(LIST& List, RESULT (ITEM::*f)(ARG1), ARG1 arg1)
+{
+ for(ITEM *it = List.First(); it; it = List.Next(it))
+ (*it.*f)(arg1);
+}
+
+template <class LIST,class ITEM, class ARG1, class ARG2>
+void ForEach(LIST& List, void (ITEM::*f)(ARG1,ARG2), ARG1 arg1, ARG2 arg2)
+{
+ for(ITEM *it = List.First(); it; it = List.Next(it))
+ (*it.*f)(arg1,arg2);
+}
+
+template <class LIST,class ITEM, class ARG1, class RESULT>
+RESULT ForEach(LIST& List, RESULT (ITEM::*f)(ARG1), ARG1 arg1,
+ RESULT (*combiner)(RESULT,RESULT), RESULT def)
+{
+ RESULT result = def;
+ for(ITEM *it = List.First(); it; it = List.Next(it))
+ result = (*combiner)((*it.*f)(arg1),result);
+ return result;
+}
+
+template <class LIST,class ITEM, class ARG1, class ARG2, class RESULT>
+RESULT ForEach(LIST& List, RESULT (ITEM::*f)(ARG1,ARG2),
+ ARG1 arg1, ARG2 arg2,
+ RESULT (*combiner)(RESULT,RESULT), RESULT def)
+{
+ RESULT result = def;
+ for(ITEM *it = List.First(); it; it = List.Next(it))
+ result = (*combiner)((*it.*f)(arg1,arg2),result);
+ return result;
+}
+
+template <class LIST,class ITEM, class ARG1, class ARG2, class ARG3,
+ class RESULT>
+RESULT ForEach(LIST& List, RESULT (ITEM::*f)(ARG1,ARG2,ARG3),
+ ARG1 arg1, ARG2 arg2, ARG3 arg3,
+ RESULT (*combiner)(RESULT,RESULT), RESULT def)
+{
+ RESULT result = def;
+ for(ITEM *it = List.First(); it; it = List.Next(it))
+ result = (*combiner)((*it.*f)(arg1,arg2,arg3),result);
+ return result;
+}
+
+template<class T>
+T mmin(T a, T b) {return a<b ? a : b;}
+
+template<class T>
+T mmax(T a, T b) {return a>b ? a : b;}
+
+template<class T>
+T mand(T a, T b) {return a&&b;}
+
+template<class T>
+T mor(T a, T b) {return a||b;}
+
+#endif
diff --git a/tools/pes.h b/tools/pes.h
new file mode 100644
index 00000000..c01b98db
--- /dev/null
+++ b/tools/pes.h
@@ -0,0 +1,258 @@
+/*
+ * pes.h: PES header definitions
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: pes.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $
+ *
+ */
+
+#ifndef _PES_H_
+#define _PES_H_
+
+
+#define PES_CHUNK_SIZE 2048
+
+#define MAX_SCR ((int64_t)0x1ffffffffLL)
+
+// PES PIDs
+#define PRIVATE_STREAM1 0xBD
+#define PADDING_STREAM 0xBE
+#define PRIVATE_STREAM2 0xBF
+#define AUDIO_STREAM_S 0xC0 /* 1100 0000 */
+#define AUDIO_STREAM_E 0xDF /* 1101 1111 */
+#define VIDEO_STREAM_S 0xE0 /* 1110 0000 */
+#define VIDEO_STREAM_E 0xEF /* 1110 1111 */
+
+#define AUDIO_STREAM_MASK 0x1F /* 0001 1111 */
+#define VIDEO_STREAM_MASK 0x0F /* 0000 1111 */
+#define AUDIO_STREAM 0xC0 /* 1100 0000 */
+#define VIDEO_STREAM 0xE0 /* 1110 0000 */
+
+#define ECM_STREAM 0xF0
+#define EMM_STREAM 0xF1
+#define DSM_CC_STREAM 0xF2
+#define ISO13522_STREAM 0xF3
+#define PROG_STREAM_DIR 0xFF
+
+// "picture header"
+#define SC_PICTURE 0x00
+
+// Picture types
+#define NO_PICTURE 0
+#define I_FRAME 1
+#define P_FRAME 2
+#define B_FRAME 3
+
+static inline int64_t pes_extract_pts(const uchar *Data, int Length,
+ bool& Audio, bool& Video)
+{
+ /* assume mpeg2 pes header ... */
+ Audio = Video = false;
+
+ if((VIDEO_STREAM == (Data[3] & ~VIDEO_STREAM_MASK) && (Video=true)) ||
+ (AUDIO_STREAM == (Data[3] & ~AUDIO_STREAM_MASK) && (Audio=true)) ||
+ (PRIVATE_STREAM1 == Data[3] && (Audio=true))) {
+
+ if ((Data[6] & 0xC0) != 0x80)
+ return -1;
+ if ((Data[6] & 0x30) != 0)
+ return -1;
+
+ if((Length > 14) && (Data[7] & 0x80)) { /* pts avail */
+ int64_t pts;
+ pts = ((int64_t)(Data[ 9] & 0x0E)) << 29 ;
+ pts |= ((int64_t) Data[10]) << 22 ;
+ pts |= ((int64_t)(Data[11] & 0xFE)) << 14 ;
+ pts |= ((int64_t) Data[12]) << 7 ;
+ pts |= ((int64_t)(Data[13] & 0xFE)) >> 1 ;
+ return pts;
+ }
+ }
+ return -1ULL;
+}
+
+static inline void pes_change_pts(uchar *Data, int Length)
+{
+ /* assume mpeg2 pes header ... Assume header already HAS pts */
+ if((VIDEO_STREAM == (Data[3] & ~VIDEO_STREAM_MASK)) ||
+ (AUDIO_STREAM == (Data[3] & ~AUDIO_STREAM_MASK)) ||
+ (PRIVATE_STREAM1 == Data[3])) {
+
+ if ((Data[6] & 0xC0) != 0x80)
+ return;
+ if ((Data[6] & 0x30) != 0)
+ return;
+
+ if((Length > 14) && (Data[7] & 0x80)) { /* pts avail */
+ int64_t pts;
+ //pts = ((int64_t)(Data[ 9] & 0x0E)) << 29 ;
+ Data[ 9] |= ((pts >> 29) & 0x0E);
+ //pts |= ((int64_t) Data[10]) << 22 ;
+ Data[10] |= ((pts >> 22) & 0xFF);
+ //pts |= ((int64_t)(Data[11] & 0xFE)) << 14 ;
+ Data[11] |= ((pts >> 14) & 0xFE);
+ //pts |= ((int64_t) Data[12]) << 7 ;
+ Data[12] |= ((pts >> 7 ) & 0xFF);
+ //pts |= ((int64_t)(Data[13] & 0xFE)) >> 1 ;
+ Data[13] |= ((pts << 1 ) & 0xFE);
+ }
+ }
+}
+
+// Remove pts from PES packet (zero it)
+static inline void pes_strip_pts(uchar *Data, int Len)
+{
+ if(VIDEO_STREAM == (Data[3] & ~VIDEO_STREAM_MASK) ||
+ AUDIO_STREAM == (Data[3] & ~AUDIO_STREAM_MASK) ||
+ PRIVATE_STREAM1 == Data[3]) {
+
+ // MPEG1 PES
+ if ((Data[6] & 0xC0) != 0x80) {
+ Data += 6;
+ Len -= 6;
+
+ // skip stuffing
+ while ((Data[0] & 0x80) == 0x80) {
+ Data++;
+ Len--;
+ }
+ if ((Data[0] & 0xc0) == 0x40) {
+ // STD_buffer_scale, STD_buffer_size
+ Data += 2;
+ Len -= 2;
+ }
+
+ if(Len<5) return;
+ if ((Data[0] & 0xf0) == 0x20) {
+ // zero PTS
+ Data[0] &= ~0x0E;
+ Data[1] = 0;
+ Data[2] &= ~0xFE;
+ Data[3] = 0;
+ Data[4] &= ~0xFE;
+ return;
+ }
+ if(Len<10) return;
+ if ((Data[0] & 0xf0) == 0x30) {
+ // zero PTS & DTS
+//((uint32*)Data)[0] &= 0x0E00FE00;
+ Data[0] &= ~0x0E;
+ Data[1] = 0;
+ Data[2] &= ~0xFE;
+ Data[3] = 0;
+//((uint32*)Data)[1] &= 0xFE0E00FE;
+ Data[4] &= ~0xFE;
+ Data[5] &= ~0x0E;
+ Data[6] = 0;
+ Data[7] &= ~0xFE;
+//((uint32*)Data)[2] &= 0x00FEFFFF;
+ Data[8] = 0;
+ Data[9] &= ~0xFE;
+ return;
+ }
+
+ // MPEG2 PES
+ } else {
+ if ((Data[6] & 0xC0) != 0x80)
+ return;
+ if ((Data[6] & 0x30) != 0)
+ return;
+
+ if(Len<14) return;
+ if (Data[7] & 0x80) {
+ // PTS
+ if(Data[8]<5) return;
+ Data[ 9] &= ~0x0E;
+ Data[10] = 0;
+ Data[11] &= ~0xFE;
+ Data[12] = 0;
+ Data[13] &= ~0xFE;
+ }
+ if(Len<19) return;
+ if (Data[7] & 0x40) {
+ // DTS
+ if(Data[8]<10) return;
+ Data[14] &= ~0x0E;
+ Data[15] = 0;
+ Data[16] &= ~0xFE;
+ Data[17] = 0;
+ Data[18] &= ~0xFE;
+ }
+ }
+ }
+}
+
+static inline int pes_packet_len(const uchar *header, const int maxlen, bool &isMpeg1)
+{
+ if(VIDEO_STREAM == (header[3] & ~VIDEO_STREAM_MASK) ||
+ AUDIO_STREAM == (header[3] & ~AUDIO_STREAM_MASK) ||
+ PRIVATE_STREAM1 == header[3]) {
+ isMpeg1 = ((header[6] & 0xC0) != 0x80);
+ return 6 + (header[4] << 8 | header[5]);
+ } else if (header[3] == PADDING_STREAM) {
+ isMpeg1 = false;
+ return (6 + (header[4] << 8 | header[5]));
+ } else if (header[3] == 0xBA) {
+ if ((header[4] & 0x40) == 0) { /* mpeg1 */
+ isMpeg1 = true;
+ return 12;
+ } else { /* mpeg 2 */
+ isMpeg1 = false;
+ return 14 + (header[0xD] & 0x07);
+ }
+ } else if (header[3] <= 0xB9) {
+ int len=3;
+ return -3;
+ isMpeg1 = false;
+ while(len+2<maxlen) {
+ if(!header[len] && !header[len+1] && header[len+2] == 1)
+ return len;
+ len++;
+ }
+ return -len;
+ }
+ isMpeg1 = false;
+ return -(6 + (header[4] << 8 | header[5]));
+}
+
+// from vdr/remux.c:
+static inline int ScanVideoPacket(const uchar *Data, int Count, /*int Offset,*/
+ uchar &PictureType)
+{
+ // Scans the video packet starting at Offset and returns its length.
+ // If the return value is -1 the packet was not completely in the buffer.
+ int Offset = 0;
+ int Length = Count;
+ if (Length > 0 && Offset + Length <= Count) {
+ int i = Offset + 8; // the minimum length of the video packet header
+ i += Data[i] + 1; // possible additional header bytes
+ for (; i < Offset + Length; i++) {
+ if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1) {
+ switch (Data[i + 3]) {
+ case SC_PICTURE:
+ PictureType = (Data[i + 5] >> 3) & 0x07;
+ return Length;
+ }
+ }
+ }
+ PictureType = NO_PICTURE;
+ return Length;
+ }
+ return -1;
+}
+
+static inline const char *PictureTypeStr(int Type)
+{
+ switch(Type) {
+ case I_FRAME: return "I-Frame"; break;
+ case B_FRAME: return "B-Frame"; break;
+ case P_FRAME: return "P-Frame"; break;
+ case NO_PICTURE: return "(none)"; break;
+ default: break;
+ }
+ return "UNKNOWN";
+}
+
+#endif
diff --git a/tools/timer.c b/tools/timer.c
new file mode 100644
index 00000000..1730edab
--- /dev/null
+++ b/tools/timer.c
@@ -0,0 +1,292 @@
+/*
+ * timer.c: Threaded timer class
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: timer.c,v 1.1 2006-06-03 10:04:28 phintuka Exp $
+ *
+ */
+
+#include <sys/time.h>
+
+#include <vdr/config.h>
+#include <vdr/tools.h>
+#include <vdr/thread.h>
+
+#include "timer.h"
+
+//#define XINELIBOUTPUT_DEBUG
+//#define XINELIBOUTPUT_DEBUG_STDOUT
+#ifdef XINELIBOUTPUT_DEBUG
+# include "logdefs.h"
+#else
+# define TRACE(x)
+# define TRACEF(x)
+#endif
+
+#if VDRVERSNUM>10317
+ #define time_ms() cTimeMs::Now()
+#endif
+
+// ---------------------------- cTimerThreadEvent ----------------------------
+
+class cTimerThreadEvent : public cListObject {
+ public:
+ cTimerThreadEvent(cTimerCallback *Handler, unsigned int TimeoutMs,
+ bool DeleteOnCancel = false) :
+ m_Handler(Handler),
+ m_DeleteOnCancel(DeleteOnCancel),
+ m_TimeoutMs(TimeoutMs)
+ {
+ m_NextEventTime = time_ms();
+ UpdateEventTime();
+ }
+
+ ~cTimerThreadEvent()
+ {
+ if(m_DeleteOnCancel && m_Handler)
+ delete m_Handler;
+ }
+
+ void UpdateEventTime()
+ {
+ m_NextEventTime += m_TimeoutMs;
+ }
+
+ int TimeToNextEvent(void)
+ {
+ return m_NextEventTime - time_ms();
+ }
+
+ virtual bool operator< (const cListObject &ListObject)
+ {
+ const cTimerThreadEvent *o = (cTimerThreadEvent *)&ListObject;
+ return m_NextEventTime<o->m_NextEventTime;
+ }
+
+ virtual int Compare(const cListObject &ListObject) const
+ {
+ const cTimerThreadEvent *o = (cTimerThreadEvent *)&ListObject;
+ if(m_NextEventTime<o->m_NextEventTime)
+ return -1;
+ else if(m_NextEventTime>o->m_NextEventTime)
+ return 1;
+ return 0;
+ }
+
+ cTimerCallback *m_Handler;
+
+ protected:
+ bool m_DeleteOnCancel;
+ unsigned int m_TimeoutMs;
+ int64_t m_NextEventTime;
+};
+
+// ------------------------------- cTimerThread ------------------------------
+
+class cTimerThread : public cThread {
+ private:
+ cTimerThread(cTimerThread&); // copy not allowed
+
+ static cMutex m_InstanceLock;
+ static cTimerThread *m_Instance; // singleton
+
+ cMutex m_Lock;
+ cCondVar m_Signal;
+ cList<cTimerThreadEvent> m_Events;
+ cTimerThreadEvent *m_RunningEvent;
+ bool m_Finished;
+ bool m_HandlerRunning;
+
+ cTimerThread() :
+ m_RunningEvent(NULL),
+ m_Finished(false),
+ m_HandlerRunning(false)
+ {
+ }
+
+ virtual ~cTimerThread()
+ {
+ m_Lock.Lock();
+ cTimerThreadEvent *ev;
+ while(NULL != (ev = m_Events.First())) {
+ m_Events.Del(ev,true);
+ }
+ m_Lock.Unlock();
+ m_Signal.Broadcast();
+ Cancel(1);
+ }
+
+ protected:
+
+ virtual void Action()
+ {
+ TRACEF("cTimerThread::Action");
+ m_Lock.Lock();
+ while(m_Events.First()) {
+ m_Signal.TimedWait(m_Lock,
+ max(1, m_Events.First()->TimeToNextEvent()));
+ TRACE("cTimerThread::Action waked up");
+ while(NULL != (m_RunningEvent = m_Events.First()) &&
+ m_RunningEvent->TimeToNextEvent() <= 0) {
+ TRACE("cTimerThread::Action calling handler");
+ m_HandlerRunning=true;
+// m_Lock.Unlock();
+// - can't unlock or running timer handler may be deleted while
+// executing (or thread may be killed by Delete)
+ bool result = m_RunningEvent->m_Handler->TimerEvent();
+// m_Lock.Lock();
+ m_HandlerRunning=false;
+ if(!result) {
+ if(m_RunningEvent) { // check if event was cancelled in handler...
+ TRACE("cTimerThread::Action handler cancelled timer");
+ m_Events.Del(m_RunningEvent, true);
+ }
+ } else {
+ if(m_RunningEvent) {
+ TRACE("cTimerThread::Action timer re-scheduled");
+ m_RunningEvent->UpdateEventTime();
+ m_Events.Sort();
+ }
+ }
+ m_RunningEvent = NULL;
+ }
+ }
+ m_Finished = true;
+ m_Lock.Unlock();
+ }
+
+ void Add(cTimerThreadEvent *Event)
+ {
+ TRACEF("cTimerThread::Add");
+ //m_Events.Del(Event, false);
+ Event->Unlink();
+ Del(Event->m_Handler);
+ m_Events.Add(Event);
+ m_Events.Sort();
+ }
+
+ bool Del(cTimerCallback *Handler, void *TargetId=NULL,
+ bool inDestructor=false)
+ {
+ TRACEF("cTimerThread::Del");
+ cTimerThreadEvent *ev = m_Events.First();
+ while(ev) {
+ if(ev->m_Handler == Handler ||
+ (TargetId && ev->m_Handler->TargetId() == TargetId) ||
+ (Handler && ev->m_Handler->is(Handler,Handler->size()))) {
+ cTimerThreadEvent *nev = m_Events.Next(ev);
+ if(inDestructor) ev->m_Handler=NULL;
+ m_Events.Del(ev, true);
+ ev = nev;
+ } else
+ ev = m_Events.Next(ev);
+ }
+ if(m_RunningEvent &&
+ (m_RunningEvent->m_Handler == Handler ||
+ m_RunningEvent->m_Handler->TargetId() == TargetId))
+ m_RunningEvent = NULL;
+ return !m_HandlerRunning && !m_RunningEvent && !m_Events.First();
+ }
+
+ public:
+
+ static void AddEvent(cTimerCallback *Handler, unsigned int TimeoutMs,
+ bool DeleteOnCancel=false)
+ {
+ TRACEF("cTimerThread::AddEvent");
+ m_InstanceLock.Lock();
+ if(m_Instance && m_Instance->m_Finished) {
+ delete m_Instance;
+ m_Instance = NULL;
+ }
+ if(!m_Instance) {
+ m_Instance = new cTimerThread;
+ m_Instance->m_Lock.Lock();
+ m_Instance->Start();
+ } else {
+ m_Instance->m_Lock.Lock();
+ m_Instance->m_Signal.Broadcast();
+ }
+ m_Instance->Add(new cTimerThreadEvent(Handler, max(1U,TimeoutMs),
+ DeleteOnCancel));
+ m_Instance->m_Lock.Unlock();
+ m_InstanceLock.Unlock();
+ }
+
+ static void CancelEvent(cTimerCallback *Handler, void *TargetId = NULL,
+ bool inDestructor=false)
+ {
+ TRACEF("cTimerThread::CancelEvent");
+ m_InstanceLock.Lock();
+ if(m_Instance && !m_Instance->m_Finished) {
+ m_Instance->m_Lock.Lock();
+ if(m_Instance->Del(Handler, TargetId, inDestructor) && !inDestructor) {
+ m_Instance->m_Lock.Unlock();
+ delete m_Instance;
+ m_Instance = NULL;
+ } else
+ m_Instance->m_Lock.Unlock();
+ }
+ m_InstanceLock.Unlock();
+ }
+
+};
+
+cMutex cTimerThread::m_InstanceLock;
+cTimerThread *cTimerThread::m_Instance = NULL;
+
+// ------------------------------ cTimerCallback -----------------------------
+
+cTimerCallback::~cTimerCallback()
+{
+ TRACEF("cTimerCallback::~cTimerCallback");
+ cTimerThread::CancelEvent(this, NULL, true);
+}
+
+void cTimerCallback::Set(cTimerCallback *handler, unsigned int TimeoutMs)
+{
+ TRACEF("cTimerCallback::Set");
+ cTimerThread::AddEvent(handler, TimeoutMs);
+}
+
+void cTimerCallback::Cancel(cTimerCallback *handler)
+{
+ TRACEF("cTimerCallback::Cancel");
+ cTimerThread::CancelEvent(handler);
+}
+
+// ------------------------------- cTimerEvent -------------------------------
+
+//cTimerEvent::cTimerEvent(unsigned int TimeoutMs)
+//{
+// TRACEF("cTimerEvent::cTimerEvent");
+//// cTimerThread::AddEvent(this, TimeoutMs, true);
+//}
+
+void cTimerEvent::AddEvent(unsigned int TimeoutMs)
+{
+ TRACEF("cTimerEvent::AddEvent");
+ cTimerThread::AddEvent(this, TimeoutMs, true);
+}
+
+void cTimerEvent::Cancel(cTimerEvent *&event)
+{
+ TRACEF("cTimerEvent::Cancel");
+ cTimerThread::CancelEvent(event);
+ event = NULL;
+}
+
+void cTimerEvent::CancelAll(void *Target)
+{
+ TRACEF("cTimerEvent::CancelAll");
+ cTimerThread::CancelEvent(NULL, Target);
+}
+
+
+
+
+
+
+
diff --git a/tools/timer.h b/tools/timer.h
new file mode 100644
index 00000000..2ee8724b
--- /dev/null
+++ b/tools/timer.h
@@ -0,0 +1,296 @@
+/*
+ * timer.h: Threaded timer class
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: timer.h,v 1.1 2006-06-03 10:04:28 phintuka Exp $
+ *
+ */
+
+#ifndef __XINELIBOUTPUT_TIMER_H
+#define __XINELIBOUTPUT_TIMER_H
+
+//
+// cTimerCallback : timer callback handler interface
+//
+class cTimerCallback {
+ protected:
+ virtual bool TimerEvent() = 0; // return false to cancel timer
+
+ virtual void *TargetId() { return (void*)this; }
+ virtual int size() { return sizeof(*this); }
+ virtual bool is(void *data, int len)
+ {
+ return len==sizeof(*this) && TargetId()==data;
+ }
+
+ friend class cTimerThread;
+
+ public:
+ static void Set(cTimerCallback *, unsigned int TimeoutMs);
+ static void Cancel(cTimerCallback *);
+
+ virtual ~cTimerCallback();
+};
+
+//
+// cTimerEvent : base class for timer events
+//
+class cTimerEvent : protected cTimerCallback {
+ private:
+ cTimerEvent(cTimerEvent&);
+
+ protected:
+ cTimerEvent() {};
+
+ virtual void AddEvent(unsigned int TimeoutMs);
+
+ static void CancelAll(void *Target);
+
+ template<class TCLASS> friend void CancelTimerEvents(TCLASS*);
+ friend class cTimerThread;
+
+ public:
+ static void Cancel(cTimerEvent *&);
+};
+
+//
+// make gcc 3.4.5 happy
+//
+template<class TCLASS, class TRESULT>
+cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(void),
+ unsigned int TimeoutMs);
+template<class TCLASS, class TRESULT, class TARG1>
+cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(TARG1),
+ TARG1 arg1,
+ unsigned int TimeoutMs);
+template<class TCLASS>
+cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(void),
+ unsigned int TimeoutMs, bool runOnce = true);
+template<class TCLASS, class TARG1>
+cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(TARG1),
+ TARG1 arg1,
+ unsigned int TimeoutMs, bool runOnce = true);
+
+//
+// Timer event templates
+//
+
+template <class TCLASS, class TRESULT>
+class cTimerFunctorR0 : public cTimerEvent {
+
+ public:
+
+ protected:
+ typedef TRESULT (TCLASS::*TFUNC)(void);
+
+ cTimerFunctorR0(TCLASS *obj, TFUNC f, unsigned int TimeoutMs) :
+ m_obj(obj), m_f(f)
+ {
+ AddEvent(TimeoutMs);
+ }
+
+ virtual ~cTimerFunctorR0() {};
+
+ virtual bool TimerEvent(void)
+ {
+ return (*m_obj.*m_f)();
+ }
+
+ virtual void *TargetId() { return (void*)m_obj; }
+ virtual int size() { return sizeof(*this); }
+ virtual bool is(void *data, int len)
+ {
+ return sizeof(*this)==len && !memcmp(this,data,len);
+ }
+
+ private:
+ TCLASS *m_obj;
+ TFUNC m_f;
+
+ friend cTimerEvent *CreateTimerEvent<TCLASS,TRESULT>(TCLASS*,TFUNC,unsigned int);
+};
+
+template <class TCLASS, class TRESULT, class TARG1>
+class cTimerFunctorR1 : public cTimerEvent {
+
+ public:
+
+ protected:
+ typedef TRESULT (TCLASS::*TFUNC)(TARG1);
+
+ cTimerFunctorR1(TCLASS *obj, TFUNC f, TARG1 arg1, unsigned int TimeoutMs) :
+ m_obj(obj), m_f(f), m_arg1(arg1)
+ {
+ AddEvent(TimeoutMs);
+ }
+
+ virtual ~cTimerFunctorR1() {};
+
+ virtual bool TimerEvent(void)
+ {
+ return (*m_obj.*m_f)(m_arg1);
+ }
+
+ virtual void *TargetId() { return (void*)m_obj; }
+ virtual int size() { return sizeof(*this); }
+ virtual bool is(void *data, int len)
+ {
+ return sizeof(*this)==len && !memcmp(this,data,len);
+ }
+
+ private:
+ TCLASS *m_obj;
+ TFUNC m_f;
+ TARG1 m_arg1;
+
+ friend cTimerEvent *CreateTimerEvent<TCLASS,TRESULT,TARG1>(TCLASS*,TFUNC,TARG1,unsigned int);
+};
+
+template <class TCLASS>
+class cTimerFunctor0 : public cTimerEvent {
+
+ public:
+
+ protected:
+ typedef void (TCLASS::*TFUNC)(void);
+
+ cTimerFunctor0(TCLASS *obj, TFUNC f,
+ unsigned int TimeoutMs, bool runOnce) :
+ m_obj(obj), m_f(f), m_runAgain(!runOnce)
+ {
+ AddEvent(TimeoutMs);
+ }
+
+ virtual ~cTimerFunctor0() {};
+
+ virtual bool TimerEvent(void)
+ {
+ (*m_obj.*m_f)();
+ return m_runAgain;
+ }
+
+ virtual void *TargetId() { return (void*)m_obj; }
+ virtual int size() { return sizeof(*this); }
+ virtual bool is(void *data, int len)
+ {
+ return sizeof(*this)==len && !memcmp(this,data,len);
+ }
+
+ private:
+ TCLASS *m_obj;
+ TFUNC m_f;
+ bool m_runAgain;
+
+ friend cTimerEvent *CreateTimerEvent<TCLASS>(TCLASS*,TFUNC,unsigned int,bool);
+};
+
+template <class TCLASS, class TARG1>
+class cTimerFunctor1 : public cTimerEvent {
+
+ public:
+
+ protected:
+ typedef void (TCLASS::*TFUNC)(TARG1);
+
+ cTimerFunctor1(TCLASS *obj, TFUNC f, TARG1 arg1,
+ unsigned int TimeoutMs, bool runOnce) :
+ m_obj(obj), m_f(f), m_arg1(arg1), m_runAgain(!runOnce)
+ {
+ AddEvent(TimeoutMs);
+ }
+
+ virtual ~cTimerFunctor1() {};
+
+ virtual bool TimerEvent(void)
+ {
+ (*m_obj.*m_f)(m_arg1);
+ return m_runAgain;
+ }
+
+ virtual void *TargetId() { return (void*)m_obj; }
+ virtual int size() { return sizeof(*this); }
+ virtual bool is(void *data, int len)
+ {
+ return sizeof(*this)==len && !memcmp(this,data,len);
+ }
+
+ private:
+ TCLASS *m_obj;
+ TFUNC m_f;
+ TARG1 m_arg1;
+ bool m_runAgain;
+
+ friend cTimerEvent *CreateTimerEvent<TCLASS,TARG1>(TCLASS*,TFUNC,TARG1,unsigned int,bool);
+};
+
+//
+// Function templates for timer event creation and cancellation
+//
+
+template<class TCLASS, class TRESULT>
+cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(void),
+ unsigned int TimeoutMs)
+{
+ return new cTimerFunctorR0<TCLASS,TRESULT>(c,fp,TimeoutMs);
+}
+
+template<class TCLASS, class TRESULT, class TARG1>
+cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(TARG1),
+ TARG1 arg1,
+ unsigned int TimeoutMs)
+{
+ return new cTimerFunctorR1<TCLASS,TRESULT,TARG1>(c,fp,arg1,TimeoutMs);
+}
+
+template<class TCLASS>
+cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(void),
+ unsigned int TimeoutMs, bool runOnce = true)
+{
+ return new cTimerFunctor0<TCLASS>(c,fp,TimeoutMs,runOnce);
+}
+
+template<class TCLASS, class TARG1>
+cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(TARG1),
+ TARG1 arg1,
+ unsigned int TimeoutMs, bool runOnce = true)
+{
+ return new cTimerFunctor1<TCLASS,TARG1>(c,fp,arg1,TimeoutMs,runOnce);
+}
+
+template<class TCLASS>
+void CancelTimerEvents(TCLASS *c)
+{
+ cTimerEvent::CancelAll((void*)c);
+}
+
+
+// usage:
+//
+// 'this' derived from cTimerHandler:
+// Set timer:
+// cTimerCallback::Set(this, TimeoutMs);
+// Cancel timer:
+// - return false from handler or
+// - call cTimerCallback::Cancel(this); or
+// - delete 'this' object
+//
+// any function of any class:
+// Set timer:
+// - cTimerEvent *event = CreateTimerEvent(...);
+// example:
+// CreateTimerEvent(this, &cXinelibDevice::TimerEvent, 1, 1000);
+// -> calls this->cXinelibDevice::TimerEvent(1) every second until stopped.
+// Cancel timer:
+// - if handler returns bool: return false from handler
+// - handler is type of void: timer runs only once
+// - call cTimerEvent::Cancel(event)
+// Cancel all timers for object:
+// - Call CancelTimerEvents(object)
+// - Call CancelTimerEvents(this)
+
+
+#endif // __XINELIBOUTPUT_TIMER_H
+
+
diff --git a/tools/udp_buffer.h b/tools/udp_buffer.h
new file mode 100644
index 00000000..d3a5313d
--- /dev/null
+++ b/tools/udp_buffer.h
@@ -0,0 +1,115 @@
+/*
+ * udp_buffer.h: Ring buffer for UDP/RTP streams
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: udp_buffer.h,v 1.1 2006-06-03 10:04:28 phintuka Exp $
+ *
+ */
+
+#ifndef __UDP_BUFFER_H
+#define __UDP_BUFFER_H
+
+#include <stdint.h>
+
+#include "../xine_input_vdr_net.h" // frame headers
+
+
+#define UDP_BUFFER_SIZE 0x100 // 2^n
+#define UDP_BUFFER_MASK 0xff // 2^n - 1
+
+#if UDP_BUFFER_MASK != UDP_SEQ_MASK
+# error Buffer handling error !!!
+#endif
+
+
+class cUdpBackLog
+{
+ friend class cUdpScheduler;
+
+ private:
+
+ cUdpBackLog(cUdpBackLog&);
+
+ stream_udp_header_t *m_UdpBuffer[UDP_BUFFER_SIZE];
+ int m_UdpBufLen[UDP_BUFFER_SIZE]; /* size of allocated memory, not frame */
+ int m_PayloadSize[UDP_BUFFER_SIZE]; /* size of frame */
+ unsigned int m_SeqNo; /* next (outgoing) sequence number */
+
+ protected:
+
+ cUdpBackLog()
+ {
+ memset(m_UdpBuffer, 0, sizeof(stream_udp_header_t *)*UDP_BUFFER_SIZE);
+ memset(m_UdpBufLen, 0, sizeof(int) * UDP_BUFFER_SIZE);
+ memset(m_PayloadSize, 0, sizeof(int) * UDP_BUFFER_SIZE);
+ m_SeqNo = 0;
+ }
+
+ void Clear(int HowManyFrames)
+ {
+ // Clear n last frames from buffer.
+ // (called to adjust sequence numbering when some
+ // already allocated frames won't be sent)
+ //
+ // Note: Nothing is freed.
+ // To completely reset buffer it must be deleted and re-created.
+ //
+ m_SeqNo = (m_SeqNo + UDP_BUFFER_SIZE - HowManyFrames) & UDP_BUFFER_MASK;
+ }
+
+ virtual ~cUdpBackLog()
+ {
+ for(int i=0; i<UDP_BUFFER_SIZE; i++)
+ if(m_UdpBuffer[i]) {
+ //m_UdpBufLen[i] = 0;
+ DELETENULL(m_UdpBuffer[i]);
+ }
+ }
+
+ stream_udp_header_t *Get(int UdpSeqNo)
+ {
+ int BufIndex = UdpSeqNo & UDP_BUFFER_MASK;
+ return m_UdpBuffer[BufIndex];
+ }
+
+ int PayloadSize(int UdpSeqNo)
+ {
+ int BufIndex = UdpSeqNo & UDP_BUFFER_MASK;
+ return m_UdpBuffer[BufIndex] ? m_PayloadSize[BufIndex] : 0;
+ }
+
+ stream_udp_header_t *MakeFrame(uint64_t StreamPos,
+ const uchar *Data, int DataLen)
+ {
+ int UdpPacketLen = DataLen + sizeof(stream_udp_header_t);
+ int BufIndex = m_SeqNo & UDP_BUFFER_MASK;
+
+ // old buffer too small ? free it
+ if(m_UdpBuffer[BufIndex] && m_UdpBufLen[BufIndex] < UdpPacketLen)
+ DELETENULL(m_UdpBuffer[BufIndex]);
+
+ // no buffer ? alloc it
+ if(!m_UdpBuffer[BufIndex]) {
+ m_UdpBuffer[BufIndex] = (stream_udp_header_t*)new uchar[UdpPacketLen];
+ m_UdpBufLen[BufIndex] = UdpPacketLen;
+ }
+ m_PayloadSize[BufIndex] = DataLen;
+
+ // Fill frame to buffer
+ stream_udp_header_t *header = m_UdpBuffer[BufIndex];
+ uchar *Payload = UDP_PAYLOAD(header);
+
+ memcpy(Payload, Data, DataLen);
+ header->pos = htonll(StreamPos);
+ header->seq = htons(m_SeqNo);
+
+ m_SeqNo = (m_SeqNo + 1) & UDP_SEQ_MASK;
+
+ return header;
+ }
+};
+
+
+#endif
diff --git a/tools/udp_pes_scheduler.c b/tools/udp_pes_scheduler.c
new file mode 100644
index 00000000..5da4be74
--- /dev/null
+++ b/tools/udp_pes_scheduler.c
@@ -0,0 +1,595 @@
+/*
+ * udp_pes_scheduler.h: PES scheduler for UDP/RTP streams
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: udp_pes_scheduler.c,v 1.1 2006-06-03 10:04:28 phintuka Exp $
+ *
+ */
+
+#include <stdint.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>
+
+#include <vdr/config.h>
+#include <vdr/tools.h>
+
+#include "../logdefs.h"
+
+#include "udp_buffer.h"
+#include "pes.h"
+
+#include "udp_pes_scheduler.h"
+
+#include "../xine_input_vdr_net.h" // frame headers
+
+
+//----------------------- cTimePts ------------------------------------------
+
+cTimePts::cTimePts(void)
+{
+ Set();
+}
+
+int64_t cTimePts::Now(void)
+{
+ struct timeval t;
+
+ if (gettimeofday(&t, NULL) == 0) {
+ t.tv_sec -= tbegin.tv_sec;
+ if(t.tv_usec < tbegin.tv_usec) {
+ t.tv_sec--;
+ t.tv_usec += 1000000;
+ }
+ t.tv_usec -= tbegin.tv_usec;
+
+ return (uint64(t.tv_sec)) * 90000LL +
+ (uint64(t.tv_usec)) * 90LL / 1000LL +
+ begin;
+ }
+
+ return 0;
+}
+
+void cTimePts::Set(int64_t Pts)
+{
+ gettimeofday(&tbegin, NULL);
+ begin = Pts;
+}
+
+//----------------------- cUdpScheduler -------------------------------------
+
+//#define LOG_RESEND
+//#define LOG_SCR
+
+const int MAX_QUEUE_SIZE = 64; // ~ 65 ms with typical DVB stream
+const int MAX_LIVE_QUEUE_SIZE = (64+32); // ~ 100 ms with typical DVB stream
+const int HARD_LIMIT = (4*1024); // ~ 40 Mbit/s === 4 Mb/s
+
+// initial burst length after seek (500ms = ~13 video frames)
+const int64_t INITIAL_BURST_TIME = (int64_t)(45000); // pts units (90kHz)
+
+// assume seek when when pts difference between two frames exceeds this (1.5 seconds)
+const int64_t JUMP_LIMIT_TIME = (int64_t)(3*90000/2); // pts units (90kHz)
+
+cUdpScheduler::cUdpScheduler()
+{
+
+ // Scheduler data
+
+ current_audio_vtime = 0;
+ current_video_vtime = 0;
+
+#ifdef LOG_SCR
+ data_sent = 0;
+ frames_sent = 0;
+ frame_rate = 2000;
+ prev_frames = 200;
+#endif
+
+ last_delay_time = 0;
+
+ // queuing
+
+ int i;
+ for(i=0; i<MAX_UDP_HANDLES; i++)
+ m_Handles[i] = -1;
+
+ m_BackLog = new cUdpBackLog;
+
+ m_QueueNextSeq = 0;
+ m_QueuePending = 0;
+
+ // Thread
+
+ m_Running = 1;
+
+ Start();
+}
+
+cUdpScheduler::~cUdpScheduler()
+{
+ m_Lock.Lock();
+ m_Running = 0;
+ m_Cond.Broadcast();
+ m_Lock.Unlock();
+
+ Cancel(3);
+
+ delete m_BackLog;
+}
+
+bool cUdpScheduler::AddHandle(int fd)
+{
+ cMutexLock ml(&m_Lock);
+
+ int i;
+
+ for(i=0; i<MAX_UDP_HANDLES; i++)
+ if(m_Handles[i] < 0 || m_Handles[i] == fd) {
+ m_Handles[i] = fd;
+ return true;
+ }
+
+ return false;
+}
+
+void cUdpScheduler::RemoveHandle(int fd)
+{
+ cMutexLock ml(&m_Lock);
+
+ int i;
+ for(i=0; i<MAX_UDP_HANDLES; i++)
+ if(m_Handles[i] == fd)
+ break;
+
+ for(; i<MAX_UDP_HANDLES-1; i++)
+ m_Handles[i] = m_Handles[i+1];
+
+ m_Handles[MAX_UDP_HANDLES-1] = -1;
+
+ if(m_Handles[0] < 0) {
+ // No clients left ...
+
+ // Flush all buffers
+ m_QueueNextSeq = 0;
+ m_QueuePending = 0;
+ delete m_BackLog;
+ m_BackLog = new cUdpBackLog;
+ }
+}
+
+bool cUdpScheduler::Poll(int TimeoutMs, bool Master)
+{
+ cMutexLock ml(&m_Lock);
+
+ m_Master = Master;
+
+ if(m_Handles[0] < 0) {
+ // no clients, so we can eat all data we are given ...
+ return true;
+ }
+
+ uint64_t WaitEnd = cTimeMs::Now();
+ if(TimeoutMs >= 0)
+ WaitEnd += (uint64_t)TimeoutMs;
+
+ int limit = m_Master ? MAX_QUEUE_SIZE : MAX_LIVE_QUEUE_SIZE;
+ while(cTimeMs::Now() < WaitEnd &&
+ m_Running &&
+ m_QueuePending >= limit)
+ m_Cond.TimedWait(m_Lock, 5);
+
+ return m_QueuePending < limit;
+}
+
+bool cUdpScheduler::Flush(int TimeoutMs)
+{
+ uint64_t WaitEnd = cTimeMs::Now();
+ if(TimeoutMs >= 0)
+ WaitEnd += (uint64_t)TimeoutMs;
+
+ cMutexLock ml(&m_Lock);
+
+ if(m_Handles[0] < 0)
+ return true;
+
+ while(cTimeMs::Now() < WaitEnd &&
+ m_Running &&
+ m_QueuePending > 0)
+ m_Cond.TimedWait(m_Lock, 25);
+
+ return m_QueuePending == 0;
+}
+
+void cUdpScheduler::Clear(void)
+{
+ cMutexLock ml(&m_Lock);
+
+ m_BackLog->Clear(m_QueuePending);
+ m_QueuePending = 0;
+
+ m_Cond.Broadcast();
+}
+
+bool cUdpScheduler::Queue(uint64_t StreamPos, const uchar *Data, int Length)
+{
+ cMutexLock ml(&m_Lock);
+
+ if(m_Handles[0] < 0)
+ return true;
+
+ if(m_QueuePending >= MAX_QUEUE_SIZE)
+ return false;
+
+ m_BackLog->MakeFrame(StreamPos, Data, Length);
+ m_QueuePending++;
+
+ m_Cond.Broadcast();
+
+ return true;
+}
+
+int cUdpScheduler::calc_elapsed_vtime(int64_t pts, bool Audio)
+{
+ int64_t diff = 0;
+
+ if(!Audio /*Video*/) {
+ /* #warning TODO: should be possible to use video pts too (if ac3 or muted ...) */
+ diff = pts - current_video_vtime;
+ if(diff < 0) diff = -diff;
+ if(diff > JUMP_LIMIT_TIME) { // 1 s (must be > GOP)
+ // RESET
+#ifdef LOG_SCR
+ LOGDBG("cUdpScheduler RESET (Video jump %lld->%lld)",
+ current_audio_vtime, pts);
+ data_sent = frames_sent = 0;
+#endif
+ //diff = 0;
+ current_video_vtime = pts;
+ return -1;
+ }
+ current_video_vtime = pts;
+
+ } else if(Audio) {
+ diff = pts - current_audio_vtime;
+ if(diff < 0) diff = -diff;
+ if(diff > JUMP_LIMIT_TIME) { // 1 sec
+ // RESET
+#ifdef LOG_SCR
+ LOGDBG("cUdpScheduler RESET (Audio jump %lld->%lld)",
+ current_audio_vtime, pts);
+ data_sent = frames_sent = 0;
+#endif
+ //diff = 0;
+ current_audio_vtime = pts;
+
+ // Use audio pts for sync (audio has constant and increasing intervals)
+ MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME);
+
+ return -1;
+ }
+ current_audio_vtime = pts;
+ }
+
+#ifdef LOG_SCR
+ if(diff && Audio) {
+ frame_rate = (int)(90000*frames_sent/(int)diff);
+ LOGDBG("rate %d kbit/s (%d frames/s)",
+ (int)(90*data_sent/((int)diff)*8), frame_rate);
+ prev_frames = frames_sent;
+ data_sent = frames_sent = 0;
+ }
+#endif
+
+ return (int) diff;
+}
+
+void cUdpScheduler::Schedule(const uchar *Data, int Length)
+{
+ bool Audio=false, Video=false;
+ int64_t pts = pes_extract_pts(Data, Length, Audio, Video);
+ int elapsed = pts>0 ? calc_elapsed_vtime(pts, Audio) : 0;
+
+#ifdef LOG_SCR
+ if(elapsed > 0)
+ LOGMSG("PTS: %lld (%s) elapsed %d ms",
+ pts, Video?"Video":Audio?"Audio":"?", elapsed/90);
+#endif
+
+ if(elapsed > 0 && Audio/*Video*/) {
+ int64_t now = MasterClock.Now();
+ if(now > current_audio_vtime && (now - current_audio_vtime)>JUMP_LIMIT_TIME) {
+#ifdef LOG_SCR
+ LOGMSG("cUdpScheduler MasterClock init (was in past)");
+ elapsed = -1;
+#endif
+ MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME);
+ } else if(now < current_audio_vtime && (current_audio_vtime-now)>JUMP_LIMIT_TIME) {
+#ifdef LOG_SCR
+ LOGMSG("cUdpScheduler MasterClock init (was in future)");
+ elapsed = -1;
+#endif
+ MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME);
+ } else if(!last_delay_time) {
+ // first burst done, no delay yet ???
+ // (queue up to xxx bytes first)
+ } else {
+ if(current_audio_vtime > now) {
+ int delay_ms = (int)(current_audio_vtime - now)/90;
+#ifdef LOG_SCR
+ LOGDBG("cUdpScheduler sleeping %d ms "
+ "(time reference: %s, beat interval %d ms)",
+ delay_ms, (Audio?"Audio PTS":"Video PTS"), elapsed);
+#endif
+ if(delay_ms > 3) {
+ //LOGMSG("sleep %d ms (%d f)", delay_ms, prev_frames);
+ CondWait.Wait(delay_ms);
+ }
+ }
+ }
+ last_delay_time = now;
+ }
+
+#if 0
+ static int win = 0;
+ static int64_t prev;
+
+ if(data_sent == 0 || elapsed < 0) {
+ win = 0;
+ prev = MasterClock.Now();
+ }
+ win ++;
+ int mrate = 3*frame_rate/2;
+ if(mrate < 100) mrate = 100;
+ if(mrate > 2000) mrate = 2000;
+ if(MasterClock.Now() - prev >= win*90000 / frame_rate) {
+ LOGMSG("sleep:3");
+ CondWait.Wait(3);
+ }
+#endif
+
+#ifdef LOG_SCR
+ data_sent += Length;
+ frames_sent ++;
+#endif
+}
+
+void cUdpScheduler::Action(void)
+{
+#if 0
+ {
+ // Request real-time scheduling
+ sched_param temp;
+ temp.sched_priority = 2;
+
+ if (!pthread_setschedparam(pthread_self(), SCHED_RR, &temp)) {
+ LOGMSG("cUdpScheduler priority set successful SCHED_RR %d [%d,%d]",
+ temp.sched_priority,
+ sched_get_priority_min(SCHED_RR),
+ sched_get_priority_max(SCHED_RR));
+ } else {
+ LOGMSG("cUdpScheduer: Can't set priority to SCHED_RR %d [%d,%d]",
+ temp.sched_priority,
+ sched_get_priority_min(SCHED_RR),
+ sched_get_priority_max(SCHED_RR));
+ }
+
+ /* UDP Scheduler needs high priority */
+ SetPriority(0);
+ SetPriority(-1);
+ SetPriority(-2);
+ SetPriority(-3);
+ SetPriority(-4);
+ SetPriority(-5);
+ }
+#endif
+
+ m_Lock.Lock();
+
+ while(m_Running) {
+
+ if(m_Handles[0] < 0) {
+ m_Cond.TimedWait(m_Lock, 5000);
+ continue;
+ }
+
+ // Wait until we have outgoing data in queue
+ if(m_QueuePending <= 0) {
+ m_Cond.TimedWait(m_Lock, 100);
+ if(m_QueuePending <= 0) {
+ static unsigned char padding[] = {0x00,0x00,0x01,0xBE,0x00,0x02,0xff,0xff};
+ int prevseq = (m_QueueNextSeq + UDP_BUFFER_SIZE - 1) & UDP_BUFFER_MASK;
+ stream_udp_header_t *frame = m_BackLog->Get(prevseq);
+ if(frame)
+ m_BackLog->MakeFrame(ntohll(frame->pos), padding, 8);
+ else
+ m_BackLog->MakeFrame(0, padding, 8);
+ m_QueuePending++;
+ }
+ continue; // to check m_Running
+ }
+
+ // Take next frame from queue
+ stream_udp_header_t *frame = m_BackLog->Get(m_QueueNextSeq);
+ int PayloadSize = m_BackLog->PayloadSize(m_QueueNextSeq);
+ int UdpPacketLen = PayloadSize + sizeof(stream_udp_header_t);
+ m_QueueNextSeq = (m_QueueNextSeq + 1) & UDP_BUFFER_MASK;
+ m_QueuePending--;
+
+ m_Cond.Broadcast();
+
+ m_Lock.Unlock();
+
+#if 0 /* debugging checks */
+ {
+ if(!frame)
+ LOGMSG("frame == NULL !");
+ uint8_t *p = UDP_PAYLOAD(frame);
+
+ if(p[0] || p[1] || p[2]!=1)
+ LOGMSG("cUdpScheduler: invalid content");
+
+ int n = sizeof(stream_udp_header_t) + (p[4]<<8) + p[5] + 6;
+ if(n != UdpPacketLen)
+ LOGMSG("cUdpScheduler: length error -- %d != %d", n, UdpPacketLen);
+
+ static int seq = 0;
+ if(seq != ntohs(frame->seq))
+ LOGMSG("cUdpScheduler: SEQ jump %d -> %d !", seq, ntohs(frame->seq));
+ seq = (ntohs(frame->seq) + 1) & UDP_BUFFER_MASK;
+
+ if(PayloadSize != 8) {
+ static uint64_t pos = 0;
+ if(pos != ntohull(frame->pos))
+ LOGMSG("cUdpScheduler: POS jump %lld -> %lld !", pos, ntohull(frame->pos));
+ pos = ntohull(frame->pos) + PayloadSize;
+ }
+ }
+#endif
+
+ // Schedule frame
+ if(m_Master)
+ Schedule(UDP_PAYLOAD(frame), PayloadSize);
+
+ /* need some limit here for ex. sequence of stills when moving cutting marks very fast
+ (no audio or PTS available) */
+#if 1
+ // hard limit for used bandwidth:
+ // - ~1 frames/ms & 8kb/ms -> 8mb/s -> ~ 80 Mbit/s ( / client)
+ // - max burst 15 frames or 30kb
+ static int cnt = 0, bytes = 0;
+ static uint64_t dbg_timer = cTimeMs::Now();
+ static int dbg_bytes = 0;
+ cnt++;
+ bytes += PayloadSize;
+ if(cnt>=15 && bytes >= 30000) {
+ CondWait.Wait(4);
+ dbg_bytes += bytes;
+ cnt = 0;
+ bytes = 0;
+ if(dbg_timer+60000 <= cTimeMs::Now()) {
+ LOGDBG("UDP rate: %4d Kbps (queue %d)", dbg_bytes/(60*1024/8),
+ m_QueuePending);
+ dbg_bytes = 0;
+ dbg_timer = cTimeMs::Now();
+ }
+ }
+#endif
+
+ for(int i=0; i<MAX_UDP_HANDLES && m_Handles[i]>=0; i++) {
+
+ //
+ // use TIOCOUTQ ioctl instead of poll/select.
+ // - poll/select for UDP/RTP may return true even when queue
+ // is (almost) full
+ // - kernel silently drops frames it cant send
+ // -> poll() + send() just causes frames to be dropped
+ //
+ int size = 0;
+ if(!ioctl(m_Handles[i], TIOCOUTQ, &size))
+ if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer
+ int wmem=0;
+ socklen_t l = sizeof(int);
+ if(!getsockopt(m_Handles[i], SOL_SOCKET, SO_SNDBUF, &wmem, &l)) {
+#if 0
+// Large bursts cause client to loose data :(
+ if(size >= (wmem/2 - 8128)) {
+ LOGMSG("cUdpScheduler: kernel transmit queue > ~%dkb ! (master=%d)",
+ (wmem/2-8128)/1024, m_Master);
+ CondWait.Wait(2);
+ }
+ else
+#endif
+ {
+ if(m_QueuePending > (MAX_QUEUE_SIZE-5))
+ LOGMSG("cUdpScheduler: kernel transmit queue > ~30kb ! (master=%d ; Queue=%d)",
+ m_Master, m_QueuePending);
+ CondWait.Wait(2);
+ }
+ }
+ }
+
+ if(send(m_Handles[i], frame, UdpPacketLen, 0) <= 0)
+ LOGERR("cUdpScheduler: UDP send() failed !");
+ }
+
+ m_Lock.Lock();
+ }
+
+ m_Lock.Unlock();
+}
+
+void cUdpScheduler::ReSend(int fd, uint64_t Pos, int Seq1, int Seq2)
+{
+ cMutexLock ml(&m_Lock); // keeps also scheduler thread suspended ...
+
+ // Handle buffer wrap
+ if(Seq1 > Seq2)
+ Seq2 += UDP_BUFFER_SIZE;
+
+ if(Seq2-Seq1 > 64) {
+ LOGDBG("cUdpScheduler::ReSend: requested range too large (%d-%d)",
+ Seq1, Seq2);
+ return;
+ }
+
+ // re-send whole range
+ for(; Seq1 <= Seq2; Seq1++) {
+
+ // Wait if kernel queue is full
+ int size=0;
+ if(!ioctl(fd, TIOCOUTQ, &size))
+ if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer
+ LOGDBG("cUdpScheduler::ReSend: kernel transmit queue > ~30kb !");
+ cCondWait::SleepMs(2);
+ }
+
+ stream_udp_header_t *frame = m_BackLog->Get(Seq1);
+
+ if(frame) {
+ if(ntohull(frame->pos) == Pos) {
+ send(fd,
+ frame,
+ m_BackLog->PayloadSize(Seq1) + sizeof(stream_udp_header_t),
+ 0);
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: %d (%d bytes) @%lld sent",
+ Seq1, m_BackLog->PayloadSize(Seq1), Pos);
+#endif
+ Pos += m_BackLog->PayloadSize(Seq1);
+ continue;
+ } else {
+ // buffer has been lost long time ago...
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: Requested position does not match "
+ "(%lld ; has %lld)", Pos, ntohll(frame->pos));
+#endif
+ }
+ } else {
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: %d @%lld missing", Seq1, Pos);
+#endif
+ }
+ // buffer has been lost
+ // send packet missing info
+ char udp_ctrl[64];
+ ((stream_udp_header_t *)udp_ctrl)->seq = (uint16_t)(-1);
+ ((stream_udp_header_t *)udp_ctrl)->pos = (uint64_t)(-1);
+
+#ifdef LOG_RESEND
+ LOGDBG("cUdpScheduler::ReSend: missing %d-%d @%d (hdr 0x%llx 0x%x)",
+ Seq1, Seq1, Pos,
+ ((stream_udp_header_t *)udp_ctrl)->pos,
+ ((stream_udp_header_t *)udp_ctrl)->seq);
+#endif
+ sprintf((udp_ctrl+sizeof(stream_udp_header_t)),
+ "UDP MISSING %d-%d %lld",
+ Seq1, Seq1, Pos);
+
+ send(fd, udp_ctrl, 64, 0);
+ }
+}
diff --git a/tools/udp_pes_scheduler.h b/tools/udp_pes_scheduler.h
new file mode 100644
index 00000000..57fce688
--- /dev/null
+++ b/tools/udp_pes_scheduler.h
@@ -0,0 +1,103 @@
+/*
+ * udp_pes_scheduler.h: PES scheduler for UDP/RTP streams
+ *
+ * See the main source file 'xineliboutput.c' for copyright information and
+ * how to reach the author.
+ *
+ * $Id: udp_pes_scheduler.h,v 1.1 2006-06-03 10:04:28 phintuka Exp $
+ *
+ */
+
+#ifndef __UDP_PES_SCHEDULER_H
+#define __UDP_PES_SCHEDULER_H
+
+#include <stdint.h>
+
+#include <vdr/tools.h> // uchar
+#include <vdr/thread.h>
+
+//----------------------- cTimePts ------------------------------------------
+
+class cTimePts
+{
+ private:
+ int64_t begin;
+ struct timeval tbegin;
+
+ public:
+ cTimePts(void);
+
+ int64_t Now(void);
+ void Set(int64_t Pts = 0LL);
+};
+
+//----------------------- cUdpPesScheduler ----------------------------------
+
+#define MAX_UDP_HANDLES 16
+
+class cUdpBackLog;
+
+class cUdpScheduler : public cThread
+{
+ public:
+
+ cUdpScheduler();
+ virtual ~cUdpScheduler();
+
+ // fd should be binded & connected to IP:PORT (local+remote) pair !
+ bool AddHandle(int fd);
+ void RemoveHandle(int fd);
+
+ bool Poll(int TimeoutMs, bool Master);
+ bool Queue(uint64_t StreamPos, const uchar *Data, int Length);
+ void ReSend(int fd, uint64_t Pos, int Seq1, int Seq2);
+
+ void Clear(void);
+ bool Flush(int TimeoutMs);
+
+ protected:
+
+ // Data for payload handling & buffering
+
+ // Signalling
+ cCondVar m_Cond;
+ cMutex m_Lock;
+
+ // Clients
+ int m_Handles[MAX_UDP_HANDLES];
+
+ // Queue
+ int m_QueueNextSeq; /* next outgoing */
+ int m_QueuePending; /* outgoing queue size */
+ cUdpBackLog *m_BackLog; /* queue for incoming data (not yet send) and retransmissions */
+
+ // Data for scheduling algorithm
+
+ cTimePts RtpScr; // 90 kHz monotonic time source for RTP packets
+ cTimePts MasterClock; // Current MPEG PTS (synchronized with current stream)
+ cCondWait CondWait;
+
+ int64_t current_audio_vtime;
+ int64_t current_video_vtime;
+
+#if 0
+ int data_sent; /* in current time interval, bytes */
+ int frames_sent; /* in current time interval */
+ int frame_rate; /* pes frames / second */
+ int prev_frames;
+#endif
+
+ int64_t last_delay_time;
+
+ bool m_Master; /* if true, we are master metronom for playback */
+
+ // Scheduling
+
+ int calc_elapsed_vtime(int64_t pts, bool Audio);
+ void Schedule(const uchar *Data, int Length);
+
+ bool m_Running;
+ virtual void Action(void);
+};
+
+#endif