diff options
author | phintuka <phintuka> | 2006-06-03 10:04:49 +0000 |
---|---|---|
committer | phintuka <phintuka> | 2006-06-03 10:04:49 +0000 |
commit | 0e345486181ef82b681dd6047f3b6ccb44c77146 (patch) | |
tree | a5401c7f97ab047a0afa890e6806d8537564102a /tools | |
parent | 321eb114c9fe9abd954ce4270595d53df6cccbae (diff) | |
download | xineliboutput-0_99rc4.tar.gz xineliboutput-0_99rc4.tar.bz2 |
Initial importxineliboutput-0_99rc4
Diffstat (limited to 'tools')
-rw-r--r-- | tools/backgroundwriter.c | 215 | ||||
-rw-r--r-- | tools/backgroundwriter.h | 62 | ||||
-rw-r--r-- | tools/cxsocket.c | 3 | ||||
-rw-r--r-- | tools/cxsocket.h | 192 | ||||
-rw-r--r-- | tools/future.h | 84 | ||||
-rw-r--r-- | tools/general_remote.h | 27 | ||||
-rw-r--r-- | tools/listiter.h | 83 | ||||
-rw-r--r-- | tools/pes.h | 258 | ||||
-rw-r--r-- | tools/timer.c | 292 | ||||
-rw-r--r-- | tools/timer.h | 296 | ||||
-rw-r--r-- | tools/udp_buffer.h | 115 | ||||
-rw-r--r-- | tools/udp_pes_scheduler.c | 595 | ||||
-rw-r--r-- | tools/udp_pes_scheduler.h | 103 |
13 files changed, 2325 insertions, 0 deletions
diff --git a/tools/backgroundwriter.c b/tools/backgroundwriter.c new file mode 100644 index 00000000..26e1f34b --- /dev/null +++ b/tools/backgroundwriter.c @@ -0,0 +1,215 @@ +/* + * backgroundwriter.h: Buffered socket/file writing thread + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: backgroundwriter.c,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#include <stdint.h> +#include <unistd.h> + +#include <vdr/tools.h> + +#include "../logdefs.h" +#include "../xine_input_vdr_net.h" // stream_tcp_header_t + +#include "backgroundwriter.h" + +//#define DISABLE_DISCARD +//#define LOG_DISCARDS + + +cBackgroundWriter::cBackgroundWriter(int fd, int Size) + : m_RingBuffer(Size, sizeof(stream_tcp_header_t)) +{ + m_fd = fd; + m_RingBuffer.SetTimeouts(0, 100); + m_Active = true; + + m_PutPos = 0; + m_DiscardStart = 0; + m_DiscardEnd = 0; + + LOGDBG("cBackgroundWriter initialized (buffer %d kb)", Size/1024); + + Start(); +} + +cBackgroundWriter::~cBackgroundWriter() +{ + m_Active = false; + Cancel(3); +} + +int cBackgroundWriter::Free(void) +{ + return m_RingBuffer.Free(); +} + +void cBackgroundWriter::Action(void) +{ + uint64_t NextHeaderPos = 0ULL; + uint64_t GetPos = 0ULL; + cPoller Poller(m_fd, true); + + while(m_Active) { + + if(Poller.Poll(100)) { + + int Count = 0; + uchar *Data = m_RingBuffer.Get(Count); + + if(Data && Count > 0) { + +#ifndef DISABLE_DISCARD + Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32) + if(m_DiscardEnd > GetPos) { + +# ifdef LOG_DISCARDS + LOGMSG("TCP: queue: discard request: queue %d bytes, " + "next point %d bytes forward (Count=%d)", + m_RingBuffer.Available(), + NextHeaderPos - GetPos, + Count); +# endif + if(NextHeaderPos == GetPos) { + // we're at frame boundary +# ifdef LOG_DISCARDS + uint8_t *pkt = TCP_PAYLOAD(Data); + if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { + LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); + } +# endif + Count = min(Count, (int)(m_DiscardEnd - GetPos)); +# ifdef LOG_DISCARDS + LOGMSG("Flushing %d bytes", Count); +#endif + Unlock(); + + m_RingBuffer.Del(Count); + GetPos += Count; + NextHeaderPos = GetPos; +# ifdef LOG_DISCARDS + LOGMSG("Queue now %d bytes", m_RingBuffer.Available()); + pkt = TCP_PAYLOAD(Data); + if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) { + LOGMSG(" -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]); +# endif + continue; + } + } + Unlock(); +#endif + +#ifndef DISABLE_DISCARD + if(GetPos == NextHeaderPos) { + if(Count < (int)sizeof(stream_tcp_header_t)) + LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !"); + + stream_tcp_header_t *header = (stream_tcp_header_t*)Data; + if(Count < (int)(ntohl(header->len) + sizeof(stream_tcp_header_t))) + ;//LOGMSG("Count = %d < %d", Count, + // header->len + sizeof(stream_tcp_header_t)); + else + Count = ntohl(header->len) + sizeof(stream_tcp_header_t); + NextHeaderPos = GetPos + ntohl(header->len) + sizeof(stream_tcp_header_t); + } else { + Count = min(Count, (int)(NextHeaderPos-GetPos)); + } +#endif + + errno = 0; + int n = write(m_fd, Data, Count); + + if(n == 0) { + LOGERR("cBackgroundWriter: Client disconnected data stream ?"); + break; + + } else if(n < 0) { + + if (errno == EINTR || errno == EWOULDBLOCK) { + TRACE("cBackgroundWriter: EINTR while writing to file handle " + <<m_fd<<" - retrying"); + continue; + + } else { + LOGERR("cBackgroundWriter: TCP write error"); + break; + } + } + + GetPos += n; + m_RingBuffer.Del(n); + } + } + } + + m_RingBuffer.Clear(); + m_Active = false; +} + +void cBackgroundWriter::Clear(void) +{ + // Can't just drop buffer contents or PES frames will be broken. + + // Serialize with Put + LOCK_THREAD; +#ifdef LOG_DISCARDS + LOGMSG("cBackgroundWriter::Clear() @%lld", m_PutPos); +#endif + m_DiscardEnd = m_PutPos; +} + +bool cBackgroundWriter::Flush(int TimeoutMs) +{ + uint64_t WaitEnd = cTimeMs::Now(); + + if(TimeoutMs > 0) + WaitEnd += (uint64_t)TimeoutMs; + + while(cTimeMs::Now() < WaitEnd && + m_Active && + m_RingBuffer.Available() > 0) + cCondWait::SleepMs(3); + + return m_RingBuffer.Available() <= 0; +} + +int cBackgroundWriter::Put(uint64_t StreamPos, + const uchar *Data, int DataCount) +{ + stream_tcp_header_t header; + header.pos = htonull(StreamPos); + header.len = htonl(DataCount); + return Put((uchar*)&header, sizeof(header), Data, DataCount); +} + +int cBackgroundWriter::Put(const uchar *Header, int HeaderCount, + const uchar *Data, int DataCount) +{ + if(m_Active) { + + // Serialize Put access to keep Data and Header together + LOCK_THREAD; + + if(m_RingBuffer.Free() < HeaderCount+DataCount) { + LOGMSG("cXinelibServer: TCP buffer overflow !"); + return -HeaderCount-DataCount; + } + int n = m_RingBuffer.Put(Header, HeaderCount) + + m_RingBuffer.Put(Data, DataCount); + if(n == HeaderCount+DataCount) { + m_PutPos += n; + return n; + } + + LOGMSG("cXinelibServer: TCP buffer internal error ?!?"); + m_RingBuffer.Clear(); + m_Active = false; + } + + return 0; +} diff --git a/tools/backgroundwriter.h b/tools/backgroundwriter.h new file mode 100644 index 00000000..619b2a29 --- /dev/null +++ b/tools/backgroundwriter.h @@ -0,0 +1,62 @@ +/* + * backgroundwriter.h: Buffered socket/file writing thread + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: backgroundwriter.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#ifndef __BACKGROUNDWRITER_H +#define __BACKGROUNDWRITER_H + +#include <stdint.h> + +#include <vdr/thread.h> +#include <vdr/ringbuffer.h> + + +class cBackgroundWriter : public cThread { + + private: + cRingBufferLinear m_RingBuffer; + + volatile bool m_Active; + int m_fd; + + uint64_t m_PutPos; + uint64_t m_DiscardStart; + uint64_t m_DiscardEnd; + + protected: + virtual void Action(void); + + int Put(const uchar *Header, int HeaderCount, + const uchar *Data, int DataCount); + + public: + cBackgroundWriter(int fd, int Size = KILOBYTE(512)); + virtual ~cBackgroundWriter() ; + + // Return largest possible Put size + int Free(void); + + // Add PES frame to buffer + // + // Return value: + // Success: Count (all bytes pushed to queue) + // Error: 0 (write error ; socket disconnected) + // Buffer full: -Count (no bytes will be pushed to queue) + // + int Put(uint64_t StreamPos, const uchar *Data, int DataCount); + + // Drop all data (only complete frames) from buffer + void Clear(void); + + //bool Poll(int TimeoutMs); + bool Flush(int TimeoutMs); +}; + + +#endif diff --git a/tools/cxsocket.c b/tools/cxsocket.c new file mode 100644 index 00000000..a8a1953e --- /dev/null +++ b/tools/cxsocket.c @@ -0,0 +1,3 @@ +/*static*/ int cxsocket_c_dummy; + +/* #warning TODO: socket helper classes / functions */ diff --git a/tools/cxsocket.h b/tools/cxsocket.h new file mode 100644 index 00000000..f3d823e5 --- /dev/null +++ b/tools/cxsocket.h @@ -0,0 +1,192 @@ +/* + * cxsocket.h: Socket wrapper classes + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: cxsocket.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#ifndef __CXSOCKET_H +#define __CXSOCKET_H + +#define CLOSESOCKET(fd) do { if(fd>=0) { close(fd); fd=-1; } } while(0) + +// +// Connect data socket to client (take address from fd_control) +// +static inline int sock_connect(int fd_control, int port, int type) +{ + struct sockaddr_in sin; + socklen_t len = sizeof(sin); + int s; + + if(getpeername(fd_control, (struct sockaddr *)&sin, &len)) { + LOGERR("sock_connect: getpeername failed"); + return -1; + } + + uint32_t tmp = ntohl(sin.sin_addr.s_addr); + LOGMSG("Client address: %d.%d.%d.%d", + ((tmp>>24)&0xff), ((tmp>>16)&0xff), + ((tmp>>8)&0xff), ((tmp)&0xff)); + +#if 0 + if ((h = gethostbyname(tmp)) == NULL) { + LOGDBG("sock_connect: unable to resolve host name", tmp); + } +#endif + + if ((s = socket(PF_INET, type, + type==SOCK_DGRAM?IPPROTO_UDP:IPPROTO_TCP)) < 0) { + LOGERR("sock_connect: failed to create socket"); + return -1; + } + +#if 1 + // Set socket buffers: large send buffer, small receive buffer + { + int max_buf = KILOBYTE(128); + //while(max_buf) { + errno = 0; + if(setsockopt(s, SOL_SOCKET, SO_SNDBUF, &max_buf, sizeof(int))) { + LOGERR("setsockopt(SO_SNDBUF,%d) failed", max_buf); + max_buf >>= 1; + } else { + int tmp = 0; + int len = sizeof(int); + errno = 0; + if(getsockopt(s, SOL_SOCKET, SO_SNDBUF, &tmp, (socklen_t*)&len)) { + LOGERR("getsockopt(SO_SNDBUF,%d) failed", max_buf); + max_buf >>= 1; + } else if(tmp != max_buf) { + LOGDBG("setsockopt(SO_SNDBUF): got %d bytes", tmp); + max_buf >>= 1; + } + } + max_buf = 1024; + setsockopt(s, SOL_SOCKET, SO_RCVBUF, &max_buf, sizeof(int)); + } +#endif + + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + if (connect(s, (struct sockaddr *)&sin, sizeof(sin))==-1 && + errno != EINPROGRESS) { + LOGERR("connect() failed"); + CLOSESOCKET(s); + } + + if (fcntl (s, F_SETFL, fcntl (s, F_GETFL) | O_NONBLOCK) == -1) { + LOGERR("can't put socket in non-blocking mode"); + CLOSESOCKET(s); + return -1; + } + + return s; +} + +static inline int timed_write(int fd, const void *buffer, size_t size, + int timeout_ms) +{ + int written = size; + const unsigned char *ptr = (const unsigned char *)buffer; + cPoller poller(fd, true); + + while (size > 0) { + + if(!poller.Poll(timeout_ms)) { + LOGERR("timed_write: poll() failed"); + return written-size; + } + + errno = 0; + int p = write(fd, ptr, size); + + if (p <= 0) { + if (errno == EINTR || errno == EAGAIN) { + LOGDBG("timed_write: EINTR during write(), retrying"); + continue; + } + LOGERR("timed_write: write() error"); + return p; + } + + ptr += p; + size -= p; + } + + return written; +} + +//#include "xine_osd_command.h" + +static inline int write_osd_command(int fd, osd_command_t *cmd) +{ + if(8 != timed_write(fd, "OSDCMD\r\n", 8, 200)) { + LOGDBG("write_osd_command: write (command) failed"); + return 0; + } + if(sizeof(osd_command_t) != + timed_write(fd, cmd, sizeof(osd_command_t), 200)) { + LOGDBG("write_osd_command: write (data) failed"); + return 0; + } + if(cmd->palette && cmd->colors && + (int)(sizeof(xine_clut_t)*ntohl(cmd->colors)) != + timed_write(fd, cmd->palette, (int)(sizeof(xine_clut_t)*ntohl(cmd->colors)), 200)) { + LOGDBG("write_osd_command: write (palette) failed"); + return 0; + } + if(cmd->data && cmd->datalen && + (int)ntohl(cmd->datalen) != timed_write(fd, cmd->data, ntohl(cmd->datalen), 1000)) { + LOGDBG("write_osd_command: write (bitmap) failed"); + return 0; + } + return 1; +} + +static inline int write_str(int fd, const char *str, int timeout_ms=-1) +{ + return timed_write(fd, str, strlen(str), timeout_ms); +} + +static inline int write_cmd(int fd, const char *str) +{ + return write_str(fd, str, 10); +} + +static inline int udp_discovery_broadcast(int fd_discovery, int m_Port) +{ + if(!xc.remote_usebcast) { + LOGDBG("UDP broadcasts (discovery) disabled in configuration"); + return -1; + } + + struct sockaddr_in sin; + + sin.sin_family = AF_INET; + sin.sin_port = htons(DISCOVERY_PORT); + sin.sin_addr.s_addr = INADDR_BROADCAST; + + char *test = NULL; + asprintf(&test, + "VDR xineliboutput DISCOVERY 1.0\r\n" + "Server port: %d\r\n" + "\r\n", + m_Port); + int testlen = strlen(test); + if(testlen != sendto(fd_discovery, test, testlen, 0, + (struct sockaddr *)&sin, sizeof(sin))) { + LOGERR("UDP broadcast send failed (discovery)"); + return -1; + } else { + LOGDBG("UDP broadcast send succeed (discovery)"); + } + return 1; +} + + +#endif // __CXSOCKET_H diff --git a/tools/future.h b/tools/future.h new file mode 100644 index 00000000..73d4bfec --- /dev/null +++ b/tools/future.h @@ -0,0 +1,84 @@ +/* + * future.h: A variable that gets its value in future. + * Used to convert asynchronous IPCs to synchronous. + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: future.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#ifndef __FUTURE_H +#define __FUTURE_H + +#include <vdr/thread.h> + +template <class T> +class cFuture { + + private: + cMutex mutex; + cCondVar cond; + bool m_Ready; + T m_Value; + + public: + + cFuture() + { + m_Ready = false; + } + + void Reset(void) + { + cMutexLock l(&mutex); + m_Ready = false; + } + + // + // Producer interface + // + + void Set(T& Value) + { + cMutexLock l(&mutex); + m_Value = Value; + m_Ready = true; + cond.Broadcast(); + } + + // + // Consumer interface + // + + bool Wait(int Timeout = -1) + { + cMutexLock l(&mutex); + + if(Timeout >= 0) + return cond.TimedWait(mutex, Timeout); + + while(!m_Ready) + cond.Wait(mutex); + + return true; + } + + bool IsReady(void) + { + cMutexLock l(&mutex); + return m_Ready; + } + + T Value(void) + { + cMutexLock l(&mutex); + while(!m_Ready) + cond.Wait(mutex); + return m_Value; + } +}; + + +#endif // __FUTURE_H diff --git a/tools/general_remote.h b/tools/general_remote.h new file mode 100644 index 00000000..aed60463 --- /dev/null +++ b/tools/general_remote.h @@ -0,0 +1,27 @@ +/* + * general_remote.h: + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: general_remote.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#ifndef __GENERAL_REMOTE_H +#define __GENERAL_REMOTE_H + + +//----------------------------- cGeneralRemote -------------------------------- + +#include <vdr/remote.h> + +class cGeneralRemote : public cRemote { + public: + cGeneralRemote(const char *Name) : cRemote(Name) {}; + bool Put(const char *Code, bool Repeat=false, bool Release=false) + { return cRemote::Put(Code, Repeat, Release); }; +}; + + +#endif diff --git a/tools/listiter.h b/tools/listiter.h new file mode 100644 index 00000000..9c88940e --- /dev/null +++ b/tools/listiter.h @@ -0,0 +1,83 @@ +/* + * listiter.h: + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: listiter.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + + +#ifndef _LISTITER_H_ +#define _LISTITER_H_ + +//------------------------------ list ---------------------------------------- + +template <class LIST,class ITEM, class RESULT> +void ForEach(LIST& List, RESULT (ITEM::*f)()) +{ + for(ITEM *it = List.First(); it; it = List.Next(it)) + (*it.*f)(); +} + +template <class LIST,class ITEM, class ARG1, class RESULT> +void ForEach(LIST& List, RESULT (ITEM::*f)(ARG1), ARG1 arg1) +{ + for(ITEM *it = List.First(); it; it = List.Next(it)) + (*it.*f)(arg1); +} + +template <class LIST,class ITEM, class ARG1, class ARG2> +void ForEach(LIST& List, void (ITEM::*f)(ARG1,ARG2), ARG1 arg1, ARG2 arg2) +{ + for(ITEM *it = List.First(); it; it = List.Next(it)) + (*it.*f)(arg1,arg2); +} + +template <class LIST,class ITEM, class ARG1, class RESULT> +RESULT ForEach(LIST& List, RESULT (ITEM::*f)(ARG1), ARG1 arg1, + RESULT (*combiner)(RESULT,RESULT), RESULT def) +{ + RESULT result = def; + for(ITEM *it = List.First(); it; it = List.Next(it)) + result = (*combiner)((*it.*f)(arg1),result); + return result; +} + +template <class LIST,class ITEM, class ARG1, class ARG2, class RESULT> +RESULT ForEach(LIST& List, RESULT (ITEM::*f)(ARG1,ARG2), + ARG1 arg1, ARG2 arg2, + RESULT (*combiner)(RESULT,RESULT), RESULT def) +{ + RESULT result = def; + for(ITEM *it = List.First(); it; it = List.Next(it)) + result = (*combiner)((*it.*f)(arg1,arg2),result); + return result; +} + +template <class LIST,class ITEM, class ARG1, class ARG2, class ARG3, + class RESULT> +RESULT ForEach(LIST& List, RESULT (ITEM::*f)(ARG1,ARG2,ARG3), + ARG1 arg1, ARG2 arg2, ARG3 arg3, + RESULT (*combiner)(RESULT,RESULT), RESULT def) +{ + RESULT result = def; + for(ITEM *it = List.First(); it; it = List.Next(it)) + result = (*combiner)((*it.*f)(arg1,arg2,arg3),result); + return result; +} + +template<class T> +T mmin(T a, T b) {return a<b ? a : b;} + +template<class T> +T mmax(T a, T b) {return a>b ? a : b;} + +template<class T> +T mand(T a, T b) {return a&&b;} + +template<class T> +T mor(T a, T b) {return a||b;} + +#endif diff --git a/tools/pes.h b/tools/pes.h new file mode 100644 index 00000000..c01b98db --- /dev/null +++ b/tools/pes.h @@ -0,0 +1,258 @@ +/* + * pes.h: PES header definitions + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: pes.h,v 1.1 2006-06-03 10:04:27 phintuka Exp $ + * + */ + +#ifndef _PES_H_ +#define _PES_H_ + + +#define PES_CHUNK_SIZE 2048 + +#define MAX_SCR ((int64_t)0x1ffffffffLL) + +// PES PIDs +#define PRIVATE_STREAM1 0xBD +#define PADDING_STREAM 0xBE +#define PRIVATE_STREAM2 0xBF +#define AUDIO_STREAM_S 0xC0 /* 1100 0000 */ +#define AUDIO_STREAM_E 0xDF /* 1101 1111 */ +#define VIDEO_STREAM_S 0xE0 /* 1110 0000 */ +#define VIDEO_STREAM_E 0xEF /* 1110 1111 */ + +#define AUDIO_STREAM_MASK 0x1F /* 0001 1111 */ +#define VIDEO_STREAM_MASK 0x0F /* 0000 1111 */ +#define AUDIO_STREAM 0xC0 /* 1100 0000 */ +#define VIDEO_STREAM 0xE0 /* 1110 0000 */ + +#define ECM_STREAM 0xF0 +#define EMM_STREAM 0xF1 +#define DSM_CC_STREAM 0xF2 +#define ISO13522_STREAM 0xF3 +#define PROG_STREAM_DIR 0xFF + +// "picture header" +#define SC_PICTURE 0x00 + +// Picture types +#define NO_PICTURE 0 +#define I_FRAME 1 +#define P_FRAME 2 +#define B_FRAME 3 + +static inline int64_t pes_extract_pts(const uchar *Data, int Length, + bool& Audio, bool& Video) +{ + /* assume mpeg2 pes header ... */ + Audio = Video = false; + + if((VIDEO_STREAM == (Data[3] & ~VIDEO_STREAM_MASK) && (Video=true)) || + (AUDIO_STREAM == (Data[3] & ~AUDIO_STREAM_MASK) && (Audio=true)) || + (PRIVATE_STREAM1 == Data[3] && (Audio=true))) { + + if ((Data[6] & 0xC0) != 0x80) + return -1; + if ((Data[6] & 0x30) != 0) + return -1; + + if((Length > 14) && (Data[7] & 0x80)) { /* pts avail */ + int64_t pts; + pts = ((int64_t)(Data[ 9] & 0x0E)) << 29 ; + pts |= ((int64_t) Data[10]) << 22 ; + pts |= ((int64_t)(Data[11] & 0xFE)) << 14 ; + pts |= ((int64_t) Data[12]) << 7 ; + pts |= ((int64_t)(Data[13] & 0xFE)) >> 1 ; + return pts; + } + } + return -1ULL; +} + +static inline void pes_change_pts(uchar *Data, int Length) +{ + /* assume mpeg2 pes header ... Assume header already HAS pts */ + if((VIDEO_STREAM == (Data[3] & ~VIDEO_STREAM_MASK)) || + (AUDIO_STREAM == (Data[3] & ~AUDIO_STREAM_MASK)) || + (PRIVATE_STREAM1 == Data[3])) { + + if ((Data[6] & 0xC0) != 0x80) + return; + if ((Data[6] & 0x30) != 0) + return; + + if((Length > 14) && (Data[7] & 0x80)) { /* pts avail */ + int64_t pts; + //pts = ((int64_t)(Data[ 9] & 0x0E)) << 29 ; + Data[ 9] |= ((pts >> 29) & 0x0E); + //pts |= ((int64_t) Data[10]) << 22 ; + Data[10] |= ((pts >> 22) & 0xFF); + //pts |= ((int64_t)(Data[11] & 0xFE)) << 14 ; + Data[11] |= ((pts >> 14) & 0xFE); + //pts |= ((int64_t) Data[12]) << 7 ; + Data[12] |= ((pts >> 7 ) & 0xFF); + //pts |= ((int64_t)(Data[13] & 0xFE)) >> 1 ; + Data[13] |= ((pts << 1 ) & 0xFE); + } + } +} + +// Remove pts from PES packet (zero it) +static inline void pes_strip_pts(uchar *Data, int Len) +{ + if(VIDEO_STREAM == (Data[3] & ~VIDEO_STREAM_MASK) || + AUDIO_STREAM == (Data[3] & ~AUDIO_STREAM_MASK) || + PRIVATE_STREAM1 == Data[3]) { + + // MPEG1 PES + if ((Data[6] & 0xC0) != 0x80) { + Data += 6; + Len -= 6; + + // skip stuffing + while ((Data[0] & 0x80) == 0x80) { + Data++; + Len--; + } + if ((Data[0] & 0xc0) == 0x40) { + // STD_buffer_scale, STD_buffer_size + Data += 2; + Len -= 2; + } + + if(Len<5) return; + if ((Data[0] & 0xf0) == 0x20) { + // zero PTS + Data[0] &= ~0x0E; + Data[1] = 0; + Data[2] &= ~0xFE; + Data[3] = 0; + Data[4] &= ~0xFE; + return; + } + if(Len<10) return; + if ((Data[0] & 0xf0) == 0x30) { + // zero PTS & DTS +//((uint32*)Data)[0] &= 0x0E00FE00; + Data[0] &= ~0x0E; + Data[1] = 0; + Data[2] &= ~0xFE; + Data[3] = 0; +//((uint32*)Data)[1] &= 0xFE0E00FE; + Data[4] &= ~0xFE; + Data[5] &= ~0x0E; + Data[6] = 0; + Data[7] &= ~0xFE; +//((uint32*)Data)[2] &= 0x00FEFFFF; + Data[8] = 0; + Data[9] &= ~0xFE; + return; + } + + // MPEG2 PES + } else { + if ((Data[6] & 0xC0) != 0x80) + return; + if ((Data[6] & 0x30) != 0) + return; + + if(Len<14) return; + if (Data[7] & 0x80) { + // PTS + if(Data[8]<5) return; + Data[ 9] &= ~0x0E; + Data[10] = 0; + Data[11] &= ~0xFE; + Data[12] = 0; + Data[13] &= ~0xFE; + } + if(Len<19) return; + if (Data[7] & 0x40) { + // DTS + if(Data[8]<10) return; + Data[14] &= ~0x0E; + Data[15] = 0; + Data[16] &= ~0xFE; + Data[17] = 0; + Data[18] &= ~0xFE; + } + } + } +} + +static inline int pes_packet_len(const uchar *header, const int maxlen, bool &isMpeg1) +{ + if(VIDEO_STREAM == (header[3] & ~VIDEO_STREAM_MASK) || + AUDIO_STREAM == (header[3] & ~AUDIO_STREAM_MASK) || + PRIVATE_STREAM1 == header[3]) { + isMpeg1 = ((header[6] & 0xC0) != 0x80); + return 6 + (header[4] << 8 | header[5]); + } else if (header[3] == PADDING_STREAM) { + isMpeg1 = false; + return (6 + (header[4] << 8 | header[5])); + } else if (header[3] == 0xBA) { + if ((header[4] & 0x40) == 0) { /* mpeg1 */ + isMpeg1 = true; + return 12; + } else { /* mpeg 2 */ + isMpeg1 = false; + return 14 + (header[0xD] & 0x07); + } + } else if (header[3] <= 0xB9) { + int len=3; + return -3; + isMpeg1 = false; + while(len+2<maxlen) { + if(!header[len] && !header[len+1] && header[len+2] == 1) + return len; + len++; + } + return -len; + } + isMpeg1 = false; + return -(6 + (header[4] << 8 | header[5])); +} + +// from vdr/remux.c: +static inline int ScanVideoPacket(const uchar *Data, int Count, /*int Offset,*/ + uchar &PictureType) +{ + // Scans the video packet starting at Offset and returns its length. + // If the return value is -1 the packet was not completely in the buffer. + int Offset = 0; + int Length = Count; + if (Length > 0 && Offset + Length <= Count) { + int i = Offset + 8; // the minimum length of the video packet header + i += Data[i] + 1; // possible additional header bytes + for (; i < Offset + Length; i++) { + if (Data[i] == 0 && Data[i + 1] == 0 && Data[i + 2] == 1) { + switch (Data[i + 3]) { + case SC_PICTURE: + PictureType = (Data[i + 5] >> 3) & 0x07; + return Length; + } + } + } + PictureType = NO_PICTURE; + return Length; + } + return -1; +} + +static inline const char *PictureTypeStr(int Type) +{ + switch(Type) { + case I_FRAME: return "I-Frame"; break; + case B_FRAME: return "B-Frame"; break; + case P_FRAME: return "P-Frame"; break; + case NO_PICTURE: return "(none)"; break; + default: break; + } + return "UNKNOWN"; +} + +#endif diff --git a/tools/timer.c b/tools/timer.c new file mode 100644 index 00000000..1730edab --- /dev/null +++ b/tools/timer.c @@ -0,0 +1,292 @@ +/* + * timer.c: Threaded timer class + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: timer.c,v 1.1 2006-06-03 10:04:28 phintuka Exp $ + * + */ + +#include <sys/time.h> + +#include <vdr/config.h> +#include <vdr/tools.h> +#include <vdr/thread.h> + +#include "timer.h" + +//#define XINELIBOUTPUT_DEBUG +//#define XINELIBOUTPUT_DEBUG_STDOUT +#ifdef XINELIBOUTPUT_DEBUG +# include "logdefs.h" +#else +# define TRACE(x) +# define TRACEF(x) +#endif + +#if VDRVERSNUM>10317 + #define time_ms() cTimeMs::Now() +#endif + +// ---------------------------- cTimerThreadEvent ---------------------------- + +class cTimerThreadEvent : public cListObject { + public: + cTimerThreadEvent(cTimerCallback *Handler, unsigned int TimeoutMs, + bool DeleteOnCancel = false) : + m_Handler(Handler), + m_DeleteOnCancel(DeleteOnCancel), + m_TimeoutMs(TimeoutMs) + { + m_NextEventTime = time_ms(); + UpdateEventTime(); + } + + ~cTimerThreadEvent() + { + if(m_DeleteOnCancel && m_Handler) + delete m_Handler; + } + + void UpdateEventTime() + { + m_NextEventTime += m_TimeoutMs; + } + + int TimeToNextEvent(void) + { + return m_NextEventTime - time_ms(); + } + + virtual bool operator< (const cListObject &ListObject) + { + const cTimerThreadEvent *o = (cTimerThreadEvent *)&ListObject; + return m_NextEventTime<o->m_NextEventTime; + } + + virtual int Compare(const cListObject &ListObject) const + { + const cTimerThreadEvent *o = (cTimerThreadEvent *)&ListObject; + if(m_NextEventTime<o->m_NextEventTime) + return -1; + else if(m_NextEventTime>o->m_NextEventTime) + return 1; + return 0; + } + + cTimerCallback *m_Handler; + + protected: + bool m_DeleteOnCancel; + unsigned int m_TimeoutMs; + int64_t m_NextEventTime; +}; + +// ------------------------------- cTimerThread ------------------------------ + +class cTimerThread : public cThread { + private: + cTimerThread(cTimerThread&); // copy not allowed + + static cMutex m_InstanceLock; + static cTimerThread *m_Instance; // singleton + + cMutex m_Lock; + cCondVar m_Signal; + cList<cTimerThreadEvent> m_Events; + cTimerThreadEvent *m_RunningEvent; + bool m_Finished; + bool m_HandlerRunning; + + cTimerThread() : + m_RunningEvent(NULL), + m_Finished(false), + m_HandlerRunning(false) + { + } + + virtual ~cTimerThread() + { + m_Lock.Lock(); + cTimerThreadEvent *ev; + while(NULL != (ev = m_Events.First())) { + m_Events.Del(ev,true); + } + m_Lock.Unlock(); + m_Signal.Broadcast(); + Cancel(1); + } + + protected: + + virtual void Action() + { + TRACEF("cTimerThread::Action"); + m_Lock.Lock(); + while(m_Events.First()) { + m_Signal.TimedWait(m_Lock, + max(1, m_Events.First()->TimeToNextEvent())); + TRACE("cTimerThread::Action waked up"); + while(NULL != (m_RunningEvent = m_Events.First()) && + m_RunningEvent->TimeToNextEvent() <= 0) { + TRACE("cTimerThread::Action calling handler"); + m_HandlerRunning=true; +// m_Lock.Unlock(); +// - can't unlock or running timer handler may be deleted while +// executing (or thread may be killed by Delete) + bool result = m_RunningEvent->m_Handler->TimerEvent(); +// m_Lock.Lock(); + m_HandlerRunning=false; + if(!result) { + if(m_RunningEvent) { // check if event was cancelled in handler... + TRACE("cTimerThread::Action handler cancelled timer"); + m_Events.Del(m_RunningEvent, true); + } + } else { + if(m_RunningEvent) { + TRACE("cTimerThread::Action timer re-scheduled"); + m_RunningEvent->UpdateEventTime(); + m_Events.Sort(); + } + } + m_RunningEvent = NULL; + } + } + m_Finished = true; + m_Lock.Unlock(); + } + + void Add(cTimerThreadEvent *Event) + { + TRACEF("cTimerThread::Add"); + //m_Events.Del(Event, false); + Event->Unlink(); + Del(Event->m_Handler); + m_Events.Add(Event); + m_Events.Sort(); + } + + bool Del(cTimerCallback *Handler, void *TargetId=NULL, + bool inDestructor=false) + { + TRACEF("cTimerThread::Del"); + cTimerThreadEvent *ev = m_Events.First(); + while(ev) { + if(ev->m_Handler == Handler || + (TargetId && ev->m_Handler->TargetId() == TargetId) || + (Handler && ev->m_Handler->is(Handler,Handler->size()))) { + cTimerThreadEvent *nev = m_Events.Next(ev); + if(inDestructor) ev->m_Handler=NULL; + m_Events.Del(ev, true); + ev = nev; + } else + ev = m_Events.Next(ev); + } + if(m_RunningEvent && + (m_RunningEvent->m_Handler == Handler || + m_RunningEvent->m_Handler->TargetId() == TargetId)) + m_RunningEvent = NULL; + return !m_HandlerRunning && !m_RunningEvent && !m_Events.First(); + } + + public: + + static void AddEvent(cTimerCallback *Handler, unsigned int TimeoutMs, + bool DeleteOnCancel=false) + { + TRACEF("cTimerThread::AddEvent"); + m_InstanceLock.Lock(); + if(m_Instance && m_Instance->m_Finished) { + delete m_Instance; + m_Instance = NULL; + } + if(!m_Instance) { + m_Instance = new cTimerThread; + m_Instance->m_Lock.Lock(); + m_Instance->Start(); + } else { + m_Instance->m_Lock.Lock(); + m_Instance->m_Signal.Broadcast(); + } + m_Instance->Add(new cTimerThreadEvent(Handler, max(1U,TimeoutMs), + DeleteOnCancel)); + m_Instance->m_Lock.Unlock(); + m_InstanceLock.Unlock(); + } + + static void CancelEvent(cTimerCallback *Handler, void *TargetId = NULL, + bool inDestructor=false) + { + TRACEF("cTimerThread::CancelEvent"); + m_InstanceLock.Lock(); + if(m_Instance && !m_Instance->m_Finished) { + m_Instance->m_Lock.Lock(); + if(m_Instance->Del(Handler, TargetId, inDestructor) && !inDestructor) { + m_Instance->m_Lock.Unlock(); + delete m_Instance; + m_Instance = NULL; + } else + m_Instance->m_Lock.Unlock(); + } + m_InstanceLock.Unlock(); + } + +}; + +cMutex cTimerThread::m_InstanceLock; +cTimerThread *cTimerThread::m_Instance = NULL; + +// ------------------------------ cTimerCallback ----------------------------- + +cTimerCallback::~cTimerCallback() +{ + TRACEF("cTimerCallback::~cTimerCallback"); + cTimerThread::CancelEvent(this, NULL, true); +} + +void cTimerCallback::Set(cTimerCallback *handler, unsigned int TimeoutMs) +{ + TRACEF("cTimerCallback::Set"); + cTimerThread::AddEvent(handler, TimeoutMs); +} + +void cTimerCallback::Cancel(cTimerCallback *handler) +{ + TRACEF("cTimerCallback::Cancel"); + cTimerThread::CancelEvent(handler); +} + +// ------------------------------- cTimerEvent ------------------------------- + +//cTimerEvent::cTimerEvent(unsigned int TimeoutMs) +//{ +// TRACEF("cTimerEvent::cTimerEvent"); +//// cTimerThread::AddEvent(this, TimeoutMs, true); +//} + +void cTimerEvent::AddEvent(unsigned int TimeoutMs) +{ + TRACEF("cTimerEvent::AddEvent"); + cTimerThread::AddEvent(this, TimeoutMs, true); +} + +void cTimerEvent::Cancel(cTimerEvent *&event) +{ + TRACEF("cTimerEvent::Cancel"); + cTimerThread::CancelEvent(event); + event = NULL; +} + +void cTimerEvent::CancelAll(void *Target) +{ + TRACEF("cTimerEvent::CancelAll"); + cTimerThread::CancelEvent(NULL, Target); +} + + + + + + + diff --git a/tools/timer.h b/tools/timer.h new file mode 100644 index 00000000..2ee8724b --- /dev/null +++ b/tools/timer.h @@ -0,0 +1,296 @@ +/* + * timer.h: Threaded timer class + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: timer.h,v 1.1 2006-06-03 10:04:28 phintuka Exp $ + * + */ + +#ifndef __XINELIBOUTPUT_TIMER_H +#define __XINELIBOUTPUT_TIMER_H + +// +// cTimerCallback : timer callback handler interface +// +class cTimerCallback { + protected: + virtual bool TimerEvent() = 0; // return false to cancel timer + + virtual void *TargetId() { return (void*)this; } + virtual int size() { return sizeof(*this); } + virtual bool is(void *data, int len) + { + return len==sizeof(*this) && TargetId()==data; + } + + friend class cTimerThread; + + public: + static void Set(cTimerCallback *, unsigned int TimeoutMs); + static void Cancel(cTimerCallback *); + + virtual ~cTimerCallback(); +}; + +// +// cTimerEvent : base class for timer events +// +class cTimerEvent : protected cTimerCallback { + private: + cTimerEvent(cTimerEvent&); + + protected: + cTimerEvent() {}; + + virtual void AddEvent(unsigned int TimeoutMs); + + static void CancelAll(void *Target); + + template<class TCLASS> friend void CancelTimerEvents(TCLASS*); + friend class cTimerThread; + + public: + static void Cancel(cTimerEvent *&); +}; + +// +// make gcc 3.4.5 happy +// +template<class TCLASS, class TRESULT> +cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(void), + unsigned int TimeoutMs); +template<class TCLASS, class TRESULT, class TARG1> +cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(TARG1), + TARG1 arg1, + unsigned int TimeoutMs); +template<class TCLASS> +cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(void), + unsigned int TimeoutMs, bool runOnce = true); +template<class TCLASS, class TARG1> +cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(TARG1), + TARG1 arg1, + unsigned int TimeoutMs, bool runOnce = true); + +// +// Timer event templates +// + +template <class TCLASS, class TRESULT> +class cTimerFunctorR0 : public cTimerEvent { + + public: + + protected: + typedef TRESULT (TCLASS::*TFUNC)(void); + + cTimerFunctorR0(TCLASS *obj, TFUNC f, unsigned int TimeoutMs) : + m_obj(obj), m_f(f) + { + AddEvent(TimeoutMs); + } + + virtual ~cTimerFunctorR0() {}; + + virtual bool TimerEvent(void) + { + return (*m_obj.*m_f)(); + } + + virtual void *TargetId() { return (void*)m_obj; } + virtual int size() { return sizeof(*this); } + virtual bool is(void *data, int len) + { + return sizeof(*this)==len && !memcmp(this,data,len); + } + + private: + TCLASS *m_obj; + TFUNC m_f; + + friend cTimerEvent *CreateTimerEvent<TCLASS,TRESULT>(TCLASS*,TFUNC,unsigned int); +}; + +template <class TCLASS, class TRESULT, class TARG1> +class cTimerFunctorR1 : public cTimerEvent { + + public: + + protected: + typedef TRESULT (TCLASS::*TFUNC)(TARG1); + + cTimerFunctorR1(TCLASS *obj, TFUNC f, TARG1 arg1, unsigned int TimeoutMs) : + m_obj(obj), m_f(f), m_arg1(arg1) + { + AddEvent(TimeoutMs); + } + + virtual ~cTimerFunctorR1() {}; + + virtual bool TimerEvent(void) + { + return (*m_obj.*m_f)(m_arg1); + } + + virtual void *TargetId() { return (void*)m_obj; } + virtual int size() { return sizeof(*this); } + virtual bool is(void *data, int len) + { + return sizeof(*this)==len && !memcmp(this,data,len); + } + + private: + TCLASS *m_obj; + TFUNC m_f; + TARG1 m_arg1; + + friend cTimerEvent *CreateTimerEvent<TCLASS,TRESULT,TARG1>(TCLASS*,TFUNC,TARG1,unsigned int); +}; + +template <class TCLASS> +class cTimerFunctor0 : public cTimerEvent { + + public: + + protected: + typedef void (TCLASS::*TFUNC)(void); + + cTimerFunctor0(TCLASS *obj, TFUNC f, + unsigned int TimeoutMs, bool runOnce) : + m_obj(obj), m_f(f), m_runAgain(!runOnce) + { + AddEvent(TimeoutMs); + } + + virtual ~cTimerFunctor0() {}; + + virtual bool TimerEvent(void) + { + (*m_obj.*m_f)(); + return m_runAgain; + } + + virtual void *TargetId() { return (void*)m_obj; } + virtual int size() { return sizeof(*this); } + virtual bool is(void *data, int len) + { + return sizeof(*this)==len && !memcmp(this,data,len); + } + + private: + TCLASS *m_obj; + TFUNC m_f; + bool m_runAgain; + + friend cTimerEvent *CreateTimerEvent<TCLASS>(TCLASS*,TFUNC,unsigned int,bool); +}; + +template <class TCLASS, class TARG1> +class cTimerFunctor1 : public cTimerEvent { + + public: + + protected: + typedef void (TCLASS::*TFUNC)(TARG1); + + cTimerFunctor1(TCLASS *obj, TFUNC f, TARG1 arg1, + unsigned int TimeoutMs, bool runOnce) : + m_obj(obj), m_f(f), m_arg1(arg1), m_runAgain(!runOnce) + { + AddEvent(TimeoutMs); + } + + virtual ~cTimerFunctor1() {}; + + virtual bool TimerEvent(void) + { + (*m_obj.*m_f)(m_arg1); + return m_runAgain; + } + + virtual void *TargetId() { return (void*)m_obj; } + virtual int size() { return sizeof(*this); } + virtual bool is(void *data, int len) + { + return sizeof(*this)==len && !memcmp(this,data,len); + } + + private: + TCLASS *m_obj; + TFUNC m_f; + TARG1 m_arg1; + bool m_runAgain; + + friend cTimerEvent *CreateTimerEvent<TCLASS,TARG1>(TCLASS*,TFUNC,TARG1,unsigned int,bool); +}; + +// +// Function templates for timer event creation and cancellation +// + +template<class TCLASS, class TRESULT> +cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(void), + unsigned int TimeoutMs) +{ + return new cTimerFunctorR0<TCLASS,TRESULT>(c,fp,TimeoutMs); +} + +template<class TCLASS, class TRESULT, class TARG1> +cTimerEvent *CreateTimerEvent(TCLASS *c, TRESULT (TCLASS::*fp)(TARG1), + TARG1 arg1, + unsigned int TimeoutMs) +{ + return new cTimerFunctorR1<TCLASS,TRESULT,TARG1>(c,fp,arg1,TimeoutMs); +} + +template<class TCLASS> +cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(void), + unsigned int TimeoutMs, bool runOnce = true) +{ + return new cTimerFunctor0<TCLASS>(c,fp,TimeoutMs,runOnce); +} + +template<class TCLASS, class TARG1> +cTimerEvent *CreateTimerEvent(TCLASS *c, void (TCLASS::*fp)(TARG1), + TARG1 arg1, + unsigned int TimeoutMs, bool runOnce = true) +{ + return new cTimerFunctor1<TCLASS,TARG1>(c,fp,arg1,TimeoutMs,runOnce); +} + +template<class TCLASS> +void CancelTimerEvents(TCLASS *c) +{ + cTimerEvent::CancelAll((void*)c); +} + + +// usage: +// +// 'this' derived from cTimerHandler: +// Set timer: +// cTimerCallback::Set(this, TimeoutMs); +// Cancel timer: +// - return false from handler or +// - call cTimerCallback::Cancel(this); or +// - delete 'this' object +// +// any function of any class: +// Set timer: +// - cTimerEvent *event = CreateTimerEvent(...); +// example: +// CreateTimerEvent(this, &cXinelibDevice::TimerEvent, 1, 1000); +// -> calls this->cXinelibDevice::TimerEvent(1) every second until stopped. +// Cancel timer: +// - if handler returns bool: return false from handler +// - handler is type of void: timer runs only once +// - call cTimerEvent::Cancel(event) +// Cancel all timers for object: +// - Call CancelTimerEvents(object) +// - Call CancelTimerEvents(this) + + +#endif // __XINELIBOUTPUT_TIMER_H + + diff --git a/tools/udp_buffer.h b/tools/udp_buffer.h new file mode 100644 index 00000000..d3a5313d --- /dev/null +++ b/tools/udp_buffer.h @@ -0,0 +1,115 @@ +/* + * udp_buffer.h: Ring buffer for UDP/RTP streams + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: udp_buffer.h,v 1.1 2006-06-03 10:04:28 phintuka Exp $ + * + */ + +#ifndef __UDP_BUFFER_H +#define __UDP_BUFFER_H + +#include <stdint.h> + +#include "../xine_input_vdr_net.h" // frame headers + + +#define UDP_BUFFER_SIZE 0x100 // 2^n +#define UDP_BUFFER_MASK 0xff // 2^n - 1 + +#if UDP_BUFFER_MASK != UDP_SEQ_MASK +# error Buffer handling error !!! +#endif + + +class cUdpBackLog +{ + friend class cUdpScheduler; + + private: + + cUdpBackLog(cUdpBackLog&); + + stream_udp_header_t *m_UdpBuffer[UDP_BUFFER_SIZE]; + int m_UdpBufLen[UDP_BUFFER_SIZE]; /* size of allocated memory, not frame */ + int m_PayloadSize[UDP_BUFFER_SIZE]; /* size of frame */ + unsigned int m_SeqNo; /* next (outgoing) sequence number */ + + protected: + + cUdpBackLog() + { + memset(m_UdpBuffer, 0, sizeof(stream_udp_header_t *)*UDP_BUFFER_SIZE); + memset(m_UdpBufLen, 0, sizeof(int) * UDP_BUFFER_SIZE); + memset(m_PayloadSize, 0, sizeof(int) * UDP_BUFFER_SIZE); + m_SeqNo = 0; + } + + void Clear(int HowManyFrames) + { + // Clear n last frames from buffer. + // (called to adjust sequence numbering when some + // already allocated frames won't be sent) + // + // Note: Nothing is freed. + // To completely reset buffer it must be deleted and re-created. + // + m_SeqNo = (m_SeqNo + UDP_BUFFER_SIZE - HowManyFrames) & UDP_BUFFER_MASK; + } + + virtual ~cUdpBackLog() + { + for(int i=0; i<UDP_BUFFER_SIZE; i++) + if(m_UdpBuffer[i]) { + //m_UdpBufLen[i] = 0; + DELETENULL(m_UdpBuffer[i]); + } + } + + stream_udp_header_t *Get(int UdpSeqNo) + { + int BufIndex = UdpSeqNo & UDP_BUFFER_MASK; + return m_UdpBuffer[BufIndex]; + } + + int PayloadSize(int UdpSeqNo) + { + int BufIndex = UdpSeqNo & UDP_BUFFER_MASK; + return m_UdpBuffer[BufIndex] ? m_PayloadSize[BufIndex] : 0; + } + + stream_udp_header_t *MakeFrame(uint64_t StreamPos, + const uchar *Data, int DataLen) + { + int UdpPacketLen = DataLen + sizeof(stream_udp_header_t); + int BufIndex = m_SeqNo & UDP_BUFFER_MASK; + + // old buffer too small ? free it + if(m_UdpBuffer[BufIndex] && m_UdpBufLen[BufIndex] < UdpPacketLen) + DELETENULL(m_UdpBuffer[BufIndex]); + + // no buffer ? alloc it + if(!m_UdpBuffer[BufIndex]) { + m_UdpBuffer[BufIndex] = (stream_udp_header_t*)new uchar[UdpPacketLen]; + m_UdpBufLen[BufIndex] = UdpPacketLen; + } + m_PayloadSize[BufIndex] = DataLen; + + // Fill frame to buffer + stream_udp_header_t *header = m_UdpBuffer[BufIndex]; + uchar *Payload = UDP_PAYLOAD(header); + + memcpy(Payload, Data, DataLen); + header->pos = htonll(StreamPos); + header->seq = htons(m_SeqNo); + + m_SeqNo = (m_SeqNo + 1) & UDP_SEQ_MASK; + + return header; + } +}; + + +#endif diff --git a/tools/udp_pes_scheduler.c b/tools/udp_pes_scheduler.c new file mode 100644 index 00000000..5da4be74 --- /dev/null +++ b/tools/udp_pes_scheduler.c @@ -0,0 +1,595 @@ +/* + * udp_pes_scheduler.h: PES scheduler for UDP/RTP streams + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: udp_pes_scheduler.c,v 1.1 2006-06-03 10:04:28 phintuka Exp $ + * + */ + +#include <stdint.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <sys/time.h> + +#include <vdr/config.h> +#include <vdr/tools.h> + +#include "../logdefs.h" + +#include "udp_buffer.h" +#include "pes.h" + +#include "udp_pes_scheduler.h" + +#include "../xine_input_vdr_net.h" // frame headers + + +//----------------------- cTimePts ------------------------------------------ + +cTimePts::cTimePts(void) +{ + Set(); +} + +int64_t cTimePts::Now(void) +{ + struct timeval t; + + if (gettimeofday(&t, NULL) == 0) { + t.tv_sec -= tbegin.tv_sec; + if(t.tv_usec < tbegin.tv_usec) { + t.tv_sec--; + t.tv_usec += 1000000; + } + t.tv_usec -= tbegin.tv_usec; + + return (uint64(t.tv_sec)) * 90000LL + + (uint64(t.tv_usec)) * 90LL / 1000LL + + begin; + } + + return 0; +} + +void cTimePts::Set(int64_t Pts) +{ + gettimeofday(&tbegin, NULL); + begin = Pts; +} + +//----------------------- cUdpScheduler ------------------------------------- + +//#define LOG_RESEND +//#define LOG_SCR + +const int MAX_QUEUE_SIZE = 64; // ~ 65 ms with typical DVB stream +const int MAX_LIVE_QUEUE_SIZE = (64+32); // ~ 100 ms with typical DVB stream +const int HARD_LIMIT = (4*1024); // ~ 40 Mbit/s === 4 Mb/s + +// initial burst length after seek (500ms = ~13 video frames) +const int64_t INITIAL_BURST_TIME = (int64_t)(45000); // pts units (90kHz) + +// assume seek when when pts difference between two frames exceeds this (1.5 seconds) +const int64_t JUMP_LIMIT_TIME = (int64_t)(3*90000/2); // pts units (90kHz) + +cUdpScheduler::cUdpScheduler() +{ + + // Scheduler data + + current_audio_vtime = 0; + current_video_vtime = 0; + +#ifdef LOG_SCR + data_sent = 0; + frames_sent = 0; + frame_rate = 2000; + prev_frames = 200; +#endif + + last_delay_time = 0; + + // queuing + + int i; + for(i=0; i<MAX_UDP_HANDLES; i++) + m_Handles[i] = -1; + + m_BackLog = new cUdpBackLog; + + m_QueueNextSeq = 0; + m_QueuePending = 0; + + // Thread + + m_Running = 1; + + Start(); +} + +cUdpScheduler::~cUdpScheduler() +{ + m_Lock.Lock(); + m_Running = 0; + m_Cond.Broadcast(); + m_Lock.Unlock(); + + Cancel(3); + + delete m_BackLog; +} + +bool cUdpScheduler::AddHandle(int fd) +{ + cMutexLock ml(&m_Lock); + + int i; + + for(i=0; i<MAX_UDP_HANDLES; i++) + if(m_Handles[i] < 0 || m_Handles[i] == fd) { + m_Handles[i] = fd; + return true; + } + + return false; +} + +void cUdpScheduler::RemoveHandle(int fd) +{ + cMutexLock ml(&m_Lock); + + int i; + for(i=0; i<MAX_UDP_HANDLES; i++) + if(m_Handles[i] == fd) + break; + + for(; i<MAX_UDP_HANDLES-1; i++) + m_Handles[i] = m_Handles[i+1]; + + m_Handles[MAX_UDP_HANDLES-1] = -1; + + if(m_Handles[0] < 0) { + // No clients left ... + + // Flush all buffers + m_QueueNextSeq = 0; + m_QueuePending = 0; + delete m_BackLog; + m_BackLog = new cUdpBackLog; + } +} + +bool cUdpScheduler::Poll(int TimeoutMs, bool Master) +{ + cMutexLock ml(&m_Lock); + + m_Master = Master; + + if(m_Handles[0] < 0) { + // no clients, so we can eat all data we are given ... + return true; + } + + uint64_t WaitEnd = cTimeMs::Now(); + if(TimeoutMs >= 0) + WaitEnd += (uint64_t)TimeoutMs; + + int limit = m_Master ? MAX_QUEUE_SIZE : MAX_LIVE_QUEUE_SIZE; + while(cTimeMs::Now() < WaitEnd && + m_Running && + m_QueuePending >= limit) + m_Cond.TimedWait(m_Lock, 5); + + return m_QueuePending < limit; +} + +bool cUdpScheduler::Flush(int TimeoutMs) +{ + uint64_t WaitEnd = cTimeMs::Now(); + if(TimeoutMs >= 0) + WaitEnd += (uint64_t)TimeoutMs; + + cMutexLock ml(&m_Lock); + + if(m_Handles[0] < 0) + return true; + + while(cTimeMs::Now() < WaitEnd && + m_Running && + m_QueuePending > 0) + m_Cond.TimedWait(m_Lock, 25); + + return m_QueuePending == 0; +} + +void cUdpScheduler::Clear(void) +{ + cMutexLock ml(&m_Lock); + + m_BackLog->Clear(m_QueuePending); + m_QueuePending = 0; + + m_Cond.Broadcast(); +} + +bool cUdpScheduler::Queue(uint64_t StreamPos, const uchar *Data, int Length) +{ + cMutexLock ml(&m_Lock); + + if(m_Handles[0] < 0) + return true; + + if(m_QueuePending >= MAX_QUEUE_SIZE) + return false; + + m_BackLog->MakeFrame(StreamPos, Data, Length); + m_QueuePending++; + + m_Cond.Broadcast(); + + return true; +} + +int cUdpScheduler::calc_elapsed_vtime(int64_t pts, bool Audio) +{ + int64_t diff = 0; + + if(!Audio /*Video*/) { + /* #warning TODO: should be possible to use video pts too (if ac3 or muted ...) */ + diff = pts - current_video_vtime; + if(diff < 0) diff = -diff; + if(diff > JUMP_LIMIT_TIME) { // 1 s (must be > GOP) + // RESET +#ifdef LOG_SCR + LOGDBG("cUdpScheduler RESET (Video jump %lld->%lld)", + current_audio_vtime, pts); + data_sent = frames_sent = 0; +#endif + //diff = 0; + current_video_vtime = pts; + return -1; + } + current_video_vtime = pts; + + } else if(Audio) { + diff = pts - current_audio_vtime; + if(diff < 0) diff = -diff; + if(diff > JUMP_LIMIT_TIME) { // 1 sec + // RESET +#ifdef LOG_SCR + LOGDBG("cUdpScheduler RESET (Audio jump %lld->%lld)", + current_audio_vtime, pts); + data_sent = frames_sent = 0; +#endif + //diff = 0; + current_audio_vtime = pts; + + // Use audio pts for sync (audio has constant and increasing intervals) + MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME); + + return -1; + } + current_audio_vtime = pts; + } + +#ifdef LOG_SCR + if(diff && Audio) { + frame_rate = (int)(90000*frames_sent/(int)diff); + LOGDBG("rate %d kbit/s (%d frames/s)", + (int)(90*data_sent/((int)diff)*8), frame_rate); + prev_frames = frames_sent; + data_sent = frames_sent = 0; + } +#endif + + return (int) diff; +} + +void cUdpScheduler::Schedule(const uchar *Data, int Length) +{ + bool Audio=false, Video=false; + int64_t pts = pes_extract_pts(Data, Length, Audio, Video); + int elapsed = pts>0 ? calc_elapsed_vtime(pts, Audio) : 0; + +#ifdef LOG_SCR + if(elapsed > 0) + LOGMSG("PTS: %lld (%s) elapsed %d ms", + pts, Video?"Video":Audio?"Audio":"?", elapsed/90); +#endif + + if(elapsed > 0 && Audio/*Video*/) { + int64_t now = MasterClock.Now(); + if(now > current_audio_vtime && (now - current_audio_vtime)>JUMP_LIMIT_TIME) { +#ifdef LOG_SCR + LOGMSG("cUdpScheduler MasterClock init (was in past)"); + elapsed = -1; +#endif + MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME); + } else if(now < current_audio_vtime && (current_audio_vtime-now)>JUMP_LIMIT_TIME) { +#ifdef LOG_SCR + LOGMSG("cUdpScheduler MasterClock init (was in future)"); + elapsed = -1; +#endif + MasterClock.Set(current_audio_vtime + INITIAL_BURST_TIME); + } else if(!last_delay_time) { + // first burst done, no delay yet ??? + // (queue up to xxx bytes first) + } else { + if(current_audio_vtime > now) { + int delay_ms = (int)(current_audio_vtime - now)/90; +#ifdef LOG_SCR + LOGDBG("cUdpScheduler sleeping %d ms " + "(time reference: %s, beat interval %d ms)", + delay_ms, (Audio?"Audio PTS":"Video PTS"), elapsed); +#endif + if(delay_ms > 3) { + //LOGMSG("sleep %d ms (%d f)", delay_ms, prev_frames); + CondWait.Wait(delay_ms); + } + } + } + last_delay_time = now; + } + +#if 0 + static int win = 0; + static int64_t prev; + + if(data_sent == 0 || elapsed < 0) { + win = 0; + prev = MasterClock.Now(); + } + win ++; + int mrate = 3*frame_rate/2; + if(mrate < 100) mrate = 100; + if(mrate > 2000) mrate = 2000; + if(MasterClock.Now() - prev >= win*90000 / frame_rate) { + LOGMSG("sleep:3"); + CondWait.Wait(3); + } +#endif + +#ifdef LOG_SCR + data_sent += Length; + frames_sent ++; +#endif +} + +void cUdpScheduler::Action(void) +{ +#if 0 + { + // Request real-time scheduling + sched_param temp; + temp.sched_priority = 2; + + if (!pthread_setschedparam(pthread_self(), SCHED_RR, &temp)) { + LOGMSG("cUdpScheduler priority set successful SCHED_RR %d [%d,%d]", + temp.sched_priority, + sched_get_priority_min(SCHED_RR), + sched_get_priority_max(SCHED_RR)); + } else { + LOGMSG("cUdpScheduer: Can't set priority to SCHED_RR %d [%d,%d]", + temp.sched_priority, + sched_get_priority_min(SCHED_RR), + sched_get_priority_max(SCHED_RR)); + } + + /* UDP Scheduler needs high priority */ + SetPriority(0); + SetPriority(-1); + SetPriority(-2); + SetPriority(-3); + SetPriority(-4); + SetPriority(-5); + } +#endif + + m_Lock.Lock(); + + while(m_Running) { + + if(m_Handles[0] < 0) { + m_Cond.TimedWait(m_Lock, 5000); + continue; + } + + // Wait until we have outgoing data in queue + if(m_QueuePending <= 0) { + m_Cond.TimedWait(m_Lock, 100); + if(m_QueuePending <= 0) { + static unsigned char padding[] = {0x00,0x00,0x01,0xBE,0x00,0x02,0xff,0xff}; + int prevseq = (m_QueueNextSeq + UDP_BUFFER_SIZE - 1) & UDP_BUFFER_MASK; + stream_udp_header_t *frame = m_BackLog->Get(prevseq); + if(frame) + m_BackLog->MakeFrame(ntohll(frame->pos), padding, 8); + else + m_BackLog->MakeFrame(0, padding, 8); + m_QueuePending++; + } + continue; // to check m_Running + } + + // Take next frame from queue + stream_udp_header_t *frame = m_BackLog->Get(m_QueueNextSeq); + int PayloadSize = m_BackLog->PayloadSize(m_QueueNextSeq); + int UdpPacketLen = PayloadSize + sizeof(stream_udp_header_t); + m_QueueNextSeq = (m_QueueNextSeq + 1) & UDP_BUFFER_MASK; + m_QueuePending--; + + m_Cond.Broadcast(); + + m_Lock.Unlock(); + +#if 0 /* debugging checks */ + { + if(!frame) + LOGMSG("frame == NULL !"); + uint8_t *p = UDP_PAYLOAD(frame); + + if(p[0] || p[1] || p[2]!=1) + LOGMSG("cUdpScheduler: invalid content"); + + int n = sizeof(stream_udp_header_t) + (p[4]<<8) + p[5] + 6; + if(n != UdpPacketLen) + LOGMSG("cUdpScheduler: length error -- %d != %d", n, UdpPacketLen); + + static int seq = 0; + if(seq != ntohs(frame->seq)) + LOGMSG("cUdpScheduler: SEQ jump %d -> %d !", seq, ntohs(frame->seq)); + seq = (ntohs(frame->seq) + 1) & UDP_BUFFER_MASK; + + if(PayloadSize != 8) { + static uint64_t pos = 0; + if(pos != ntohull(frame->pos)) + LOGMSG("cUdpScheduler: POS jump %lld -> %lld !", pos, ntohull(frame->pos)); + pos = ntohull(frame->pos) + PayloadSize; + } + } +#endif + + // Schedule frame + if(m_Master) + Schedule(UDP_PAYLOAD(frame), PayloadSize); + + /* need some limit here for ex. sequence of stills when moving cutting marks very fast + (no audio or PTS available) */ +#if 1 + // hard limit for used bandwidth: + // - ~1 frames/ms & 8kb/ms -> 8mb/s -> ~ 80 Mbit/s ( / client) + // - max burst 15 frames or 30kb + static int cnt = 0, bytes = 0; + static uint64_t dbg_timer = cTimeMs::Now(); + static int dbg_bytes = 0; + cnt++; + bytes += PayloadSize; + if(cnt>=15 && bytes >= 30000) { + CondWait.Wait(4); + dbg_bytes += bytes; + cnt = 0; + bytes = 0; + if(dbg_timer+60000 <= cTimeMs::Now()) { + LOGDBG("UDP rate: %4d Kbps (queue %d)", dbg_bytes/(60*1024/8), + m_QueuePending); + dbg_bytes = 0; + dbg_timer = cTimeMs::Now(); + } + } +#endif + + for(int i=0; i<MAX_UDP_HANDLES && m_Handles[i]>=0; i++) { + + // + // use TIOCOUTQ ioctl instead of poll/select. + // - poll/select for UDP/RTP may return true even when queue + // is (almost) full + // - kernel silently drops frames it cant send + // -> poll() + send() just causes frames to be dropped + // + int size = 0; + if(!ioctl(m_Handles[i], TIOCOUTQ, &size)) + if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer + int wmem=0; + socklen_t l = sizeof(int); + if(!getsockopt(m_Handles[i], SOL_SOCKET, SO_SNDBUF, &wmem, &l)) { +#if 0 +// Large bursts cause client to loose data :( + if(size >= (wmem/2 - 8128)) { + LOGMSG("cUdpScheduler: kernel transmit queue > ~%dkb ! (master=%d)", + (wmem/2-8128)/1024, m_Master); + CondWait.Wait(2); + } + else +#endif + { + if(m_QueuePending > (MAX_QUEUE_SIZE-5)) + LOGMSG("cUdpScheduler: kernel transmit queue > ~30kb ! (master=%d ; Queue=%d)", + m_Master, m_QueuePending); + CondWait.Wait(2); + } + } + } + + if(send(m_Handles[i], frame, UdpPacketLen, 0) <= 0) + LOGERR("cUdpScheduler: UDP send() failed !"); + } + + m_Lock.Lock(); + } + + m_Lock.Unlock(); +} + +void cUdpScheduler::ReSend(int fd, uint64_t Pos, int Seq1, int Seq2) +{ + cMutexLock ml(&m_Lock); // keeps also scheduler thread suspended ... + + // Handle buffer wrap + if(Seq1 > Seq2) + Seq2 += UDP_BUFFER_SIZE; + + if(Seq2-Seq1 > 64) { + LOGDBG("cUdpScheduler::ReSend: requested range too large (%d-%d)", + Seq1, Seq2); + return; + } + + // re-send whole range + for(; Seq1 <= Seq2; Seq1++) { + + // Wait if kernel queue is full + int size=0; + if(!ioctl(fd, TIOCOUTQ, &size)) + if(size > ((0x10000)/2 - 2048)) { // assume 64k kernel buffer + LOGDBG("cUdpScheduler::ReSend: kernel transmit queue > ~30kb !"); + cCondWait::SleepMs(2); + } + + stream_udp_header_t *frame = m_BackLog->Get(Seq1); + + if(frame) { + if(ntohull(frame->pos) == Pos) { + send(fd, + frame, + m_BackLog->PayloadSize(Seq1) + sizeof(stream_udp_header_t), + 0); +#ifdef LOG_RESEND + LOGDBG("cUdpScheduler::ReSend: %d (%d bytes) @%lld sent", + Seq1, m_BackLog->PayloadSize(Seq1), Pos); +#endif + Pos += m_BackLog->PayloadSize(Seq1); + continue; + } else { + // buffer has been lost long time ago... +#ifdef LOG_RESEND + LOGDBG("cUdpScheduler::ReSend: Requested position does not match " + "(%lld ; has %lld)", Pos, ntohll(frame->pos)); +#endif + } + } else { +#ifdef LOG_RESEND + LOGDBG("cUdpScheduler::ReSend: %d @%lld missing", Seq1, Pos); +#endif + } + // buffer has been lost + // send packet missing info + char udp_ctrl[64]; + ((stream_udp_header_t *)udp_ctrl)->seq = (uint16_t)(-1); + ((stream_udp_header_t *)udp_ctrl)->pos = (uint64_t)(-1); + +#ifdef LOG_RESEND + LOGDBG("cUdpScheduler::ReSend: missing %d-%d @%d (hdr 0x%llx 0x%x)", + Seq1, Seq1, Pos, + ((stream_udp_header_t *)udp_ctrl)->pos, + ((stream_udp_header_t *)udp_ctrl)->seq); +#endif + sprintf((udp_ctrl+sizeof(stream_udp_header_t)), + "UDP MISSING %d-%d %lld", + Seq1, Seq1, Pos); + + send(fd, udp_ctrl, 64, 0); + } +} diff --git a/tools/udp_pes_scheduler.h b/tools/udp_pes_scheduler.h new file mode 100644 index 00000000..57fce688 --- /dev/null +++ b/tools/udp_pes_scheduler.h @@ -0,0 +1,103 @@ +/* + * udp_pes_scheduler.h: PES scheduler for UDP/RTP streams + * + * See the main source file 'xineliboutput.c' for copyright information and + * how to reach the author. + * + * $Id: udp_pes_scheduler.h,v 1.1 2006-06-03 10:04:28 phintuka Exp $ + * + */ + +#ifndef __UDP_PES_SCHEDULER_H +#define __UDP_PES_SCHEDULER_H + +#include <stdint.h> + +#include <vdr/tools.h> // uchar +#include <vdr/thread.h> + +//----------------------- cTimePts ------------------------------------------ + +class cTimePts +{ + private: + int64_t begin; + struct timeval tbegin; + + public: + cTimePts(void); + + int64_t Now(void); + void Set(int64_t Pts = 0LL); +}; + +//----------------------- cUdpPesScheduler ---------------------------------- + +#define MAX_UDP_HANDLES 16 + +class cUdpBackLog; + +class cUdpScheduler : public cThread +{ + public: + + cUdpScheduler(); + virtual ~cUdpScheduler(); + + // fd should be binded & connected to IP:PORT (local+remote) pair ! + bool AddHandle(int fd); + void RemoveHandle(int fd); + + bool Poll(int TimeoutMs, bool Master); + bool Queue(uint64_t StreamPos, const uchar *Data, int Length); + void ReSend(int fd, uint64_t Pos, int Seq1, int Seq2); + + void Clear(void); + bool Flush(int TimeoutMs); + + protected: + + // Data for payload handling & buffering + + // Signalling + cCondVar m_Cond; + cMutex m_Lock; + + // Clients + int m_Handles[MAX_UDP_HANDLES]; + + // Queue + int m_QueueNextSeq; /* next outgoing */ + int m_QueuePending; /* outgoing queue size */ + cUdpBackLog *m_BackLog; /* queue for incoming data (not yet send) and retransmissions */ + + // Data for scheduling algorithm + + cTimePts RtpScr; // 90 kHz monotonic time source for RTP packets + cTimePts MasterClock; // Current MPEG PTS (synchronized with current stream) + cCondWait CondWait; + + int64_t current_audio_vtime; + int64_t current_video_vtime; + +#if 0 + int data_sent; /* in current time interval, bytes */ + int frames_sent; /* in current time interval */ + int frame_rate; /* pes frames / second */ + int prev_frames; +#endif + + int64_t last_delay_time; + + bool m_Master; /* if true, we are master metronom for playback */ + + // Scheduling + + int calc_elapsed_vtime(int64_t pts, bool Audio); + void Schedule(const uchar *Data, int Length); + + bool m_Running; + virtual void Action(void); +}; + +#endif |