summaryrefslogtreecommitdiff
path: root/packetbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'packetbuffer.c')
-rw-r--r--packetbuffer.c283
1 files changed, 283 insertions, 0 deletions
diff --git a/packetbuffer.c b/packetbuffer.c
new file mode 100644
index 0000000..0e2b53d
--- /dev/null
+++ b/packetbuffer.c
@@ -0,0 +1,283 @@
+/*
+ * (c) BayCom GmbH, http://www.baycom.de, info@baycom.de
+ *
+ * See the COPYING file for copyright information and
+ * how to reach the author.
+ *
+ */
+
+#include <vdr/plugin.h>
+#include <sys/time.h>
+#include "packetbuffer.h"
+
+uint64_t Now (void)
+{
+#if 0
+ struct timeval t;
+ if (gettimeofday (&t, NULL) == 0)
+ return (uint64_t (t.tv_sec)) * 1000 + t.tv_usec / 1000;
+ return 0;
+#else
+ return clock();
+#endif
+}
+
+//--------------------------------------------------------------------------
+//--------------------------------------------------------------------------
+cMyPacketBuffer::cMyPacketBuffer (int Size, int Packets)
+{
+ if (Packets == 0)
+ Packets = Size / 2048;
+
+ // Make Packets a power of 2 to avoid expensive modulo
+ int n = 1;
+ for (int i = 0; i < 16; i++) {
+ if (n >= Packets) {
+ Packets = n;
+ break;
+ }
+ n <<= 1;
+ }
+
+ dataBuffer = (uchar *) malloc (Size);
+ memset (dataBuffer, 0, Size);
+ posBuffer = (posData *) malloc (Packets * sizeof (posData));
+ pthread_mutex_init (&m_lock, NULL);
+
+ posSize = Packets;
+ dataSize = Size;
+ memset (posBuffer, 0, Packets * sizeof (posData));
+ rp = wp = 0;
+ posRead = NULL;
+ posWrite = NULL;
+ posReadNum = 0;
+ invalidate = 0;
+ putTimeout = getTimeout = 0;
+}
+
+//--------------------------------------------------------------------------
+cMyPacketBuffer::~cMyPacketBuffer (void)
+{
+ free (dataBuffer);
+ free (posBuffer);
+}
+
+//--------------------------------------------------------------------------
+int cMyPacketBuffer::FindSpace (int size)
+{
+ int wpm = (wp - 1) & (posSize - 1);
+ posData *pr, *pw;
+
+ if (wpm < 0)
+ wpm += posSize;
+
+ if (rp == wp) {
+ if (size > dataSize)
+ return -1;
+ return 0;
+ }
+ pr = posBuffer + rp;
+ pw = posBuffer + wpm;
+
+ if (pr->offset <= pw->offset) {
+ if (pw->offset + pw->realSize + size < dataSize) {
+ return pw->offset + pw->realSize;
+ }
+ if (size < pr->offset)
+ return 0;
+ return -1;
+ } else {
+ if (pw->offset + pw->realSize + size < dataSize)
+ return pw->offset + pw->realSize;
+ return -1;
+ }
+ return -1;
+}
+
+//--------------------------------------------------------------------------
+uchar *cMyPacketBuffer::PutStart (int size)
+{
+ uint64_t starttime = 0;
+ int offset;
+ int nwp;
+ int rsize;
+ pthread_mutex_lock (&m_lock);
+// rsize= (size+15)&~15;
+ rsize = size;
+ while (true) {
+ offset = FindSpace (rsize);
+ if (offset != -1)
+ break;
+ if (putTimeout && !starttime)
+ starttime = Now ();
+ if (!putTimeout || (Now () - starttime) > (uint64_t) (putTimeout)) {
+ pthread_mutex_unlock (&m_lock);
+ return NULL;
+ }
+ usleep (5 * 1000);
+ }
+ nwp = (wp) & (posSize - 1);
+
+ posWrite = posBuffer + nwp;
+ posWrite->offset = offset;
+ posWrite->realSize = rsize;
+
+// printf("PUTSTART wp %i, start %x\n",nwp, offset);
+ return dataBuffer + offset;
+}
+
+//--------------------------------------------------------------------------
+void cMyPacketBuffer::PutEnd (int size, int flags, uint64_t timestamp)
+{
+ if (!posWrite)
+ return;
+
+ if (size > posWrite->realSize)
+ size = posWrite->realSize;
+
+ posWrite->size = size;
+ posWrite->flags = flags;
+ posWrite->timestamp = timestamp;
+ wp = (wp + 1) & (posSize - 1);
+ pthread_mutex_unlock (&m_lock);
+
+}
+
+//--------------------------------------------------------------------------
+uchar *cMyPacketBuffer::GetStartSub (int *readp, int timeout, int *size, int *flags, uint64_t * timestamp)
+{
+ uint64_t starttime = 0;
+
+ if (*readp == wp && timeout)
+ starttime = Now ();
+// printf("GET rp %i wp %i\n",readp,wp);
+ while (*readp == wp) {
+ if (!timeout || (Now () - starttime) > (uint64_t) (timeout))
+ return 0;
+ usleep (20 * 1000);
+ }
+#if 0
+ if (readp > posSize) {
+ // Fixme sync
+ return 0;
+ }
+#endif
+ posRead = posBuffer + *readp;
+
+ if (flags)
+ *flags = posRead->flags;
+ if (size)
+ *size = posRead->size;
+ if (timestamp)
+ *timestamp = posRead->timestamp;
+// printf("GET rp %i, offset %x\n",readp,posRead->offset);
+ return dataBuffer + posRead->offset;
+}
+
+//--------------------------------------------------------------------------
+uchar *cMyPacketBuffer::GetStart (int *size, int *flags, uint64_t * timestamp)
+{
+ if (posRead) {
+#if 1
+ if (flags)
+ *flags = posRead->flags;
+ if (size)
+ *size = posRead->size;
+ return dataBuffer + posRead->offset;
+#else
+ GetEnd ();
+#endif
+ }
+
+ if (invalidate) {
+ rp = wp;
+ invalidate = 0;
+ return 0;
+ }
+ posReadNum = 1;
+ return GetStartSub (&rp, getTimeout, size, flags, timestamp);
+}
+
+//--------------------------------------------------------------------------
+void cMyPacketBuffer::GetEnd (void)
+{
+ if (!posRead)
+ return;
+ rp = (rp + posReadNum) & (posSize - 1);
+ posRead = NULL;
+ posReadNum = 0;
+}
+
+//--------------------------------------------------------------------------
+// Try to get multiple PES at once
+uchar *cMyPacketBuffer::GetStartMultiple (int maxsize, int *size, int *flags, uint64_t * timestamp)
+{
+ uchar *buf, *lastbuf, *startbuf;
+ int sz, fl;
+ int readp, packets;
+ int totalsize;
+ int startflags;
+ int timeout = getTimeout;
+ uint64_t tsp, starttsp;
+#if 0
+ if (posRead)
+ GetEnd ();
+#endif
+//printf("fill %d \r", (wp>=rp) ? wp-rp : posSize-rp+wp);
+ if (invalidate) {
+ rp = wp;
+ invalidate = 0;
+ return 0;
+ }
+ readp = rp;
+ startbuf = NULL;
+ lastbuf = NULL;
+ totalsize = 0;
+ packets = 0;
+ startflags = 0;
+ starttsp = 0;
+ while (1) {
+ sz = 0;
+ buf = GetStartSub (&readp, timeout, &sz, &fl, &tsp);
+// printf("GOT %x %i\n",buf,sz);
+ if (!startbuf) {
+ if (!buf)
+ return NULL;
+ startbuf = buf;
+ startflags = fl;
+ starttsp = tsp;
+ } else {
+ if (lastbuf + sz != buf || // Buffer wraparound or no buffer
+ (totalsize + sz) > maxsize || //
+ fl != 0) { // packet start
+ if (size)
+ *size = totalsize;
+ if (flags)
+ *flags = startflags;
+ if (timestamp)
+ *timestamp = starttsp;
+ posReadNum = packets;
+ return startbuf;
+ }
+ }
+ readp = (readp + 1) & (posSize - 1);
+ packets++;
+ totalsize += sz;
+ lastbuf = buf;
+ timeout = 0;
+ }
+ return NULL;
+}
+
+//--------------------------------------------------------------------------
+void cMyPacketBuffer::SetTimeouts (int PutTimeout, int GetTimeout)
+{
+ putTimeout = PutTimeout;
+ getTimeout = GetTimeout;
+}
+
+//--------------------------------------------------------------------------
+void cMyPacketBuffer::Invalidate (void)
+{
+ invalidate = 1;
+}