diff options
Diffstat (limited to 'plugins/streamdev/streamdev-cvs/server/streamer.c')
| -rw-r--r-- | plugins/streamdev/streamdev-cvs/server/streamer.c | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/plugins/streamdev/streamdev-cvs/server/streamer.c b/plugins/streamdev/streamdev-cvs/server/streamer.c new file mode 100644 index 0000000..42e7efa --- /dev/null +++ b/plugins/streamdev/streamdev-cvs/server/streamer.c @@ -0,0 +1,162 @@ +/* + * $Id: streamer.c,v 1.19 2009/06/19 06:32:45 schmirl Exp $ + */ + +#include <vdr/ringbuffer.h> +#include <vdr/device.h> +#include <sys/types.h> +#include <unistd.h> + +#include "server/streamer.h" +#include "server/suspend.h" +#include "server/setup.h" +#include "tools/socket.h" +#include "tools/select.h" +#include "common.h" + +// --- cStreamdevBuffer ------------------------------------------------------- + +cStreamdevBuffer::cStreamdevBuffer(int Size, int Margin, bool Statistics, const char *Description): + cRingBufferLinear(Size, Margin, Statistics, Description) +{ +} + +// --- cStreamdevWriter ------------------------------------------------------- + +cStreamdevWriter::cStreamdevWriter(cTBSocket *Socket, + cStreamdevStreamer *Streamer): + cThread("streamdev-writer"), + m_Streamer(Streamer), + m_Socket(Socket) +{ +} + +cStreamdevWriter::~cStreamdevWriter() +{ + Dprintf("destructing writer\n"); + if (Running()) + Cancel(3); +} + +void cStreamdevWriter::Action(void) +{ + cTBSelect sel; + Dprintf("Writer start\n"); + int max = 0; + uchar *block = NULL; + int count, offset = 0; + + sel.Clear(); + sel.Add(*m_Socket, true); + while (Running()) { + if (block == NULL) { + block = m_Streamer->Get(count); + offset = 0; + } + + if (block != NULL) { + if (sel.Select(15000) == -1) { + esyslog("ERROR: streamdev-server: couldn't send data: %m"); + break; + } + + if (sel.CanWrite(*m_Socket)) { + int written; + int pkgsize = count; + // SOCK_DGRAM indicates multicast + if (m_Socket->Type() == SOCK_DGRAM) { + // don't fragment multicast packets + // max. payload on standard local ethernet is 1416 to 1456 bytes + // and some STBs expect complete TS packets + // so let's always limit to 7 * TS_SIZE = 1316 + if (pkgsize > 7 * TS_SIZE) + pkgsize = 7 * TS_SIZE; + else + pkgsize -= pkgsize % TS_SIZE; + } + if ((written = m_Socket->Write(block + offset, pkgsize)) == -1) { + esyslog("ERROR: streamdev-server: couldn't send %d bytes: %m", pkgsize); + break; + } + + // statistics + if (count > max) + max = count; + + offset += written; + count -= written; + + // less than one TS packet left: + // delete what we've written so far and get next chunk + if (count < TS_SIZE) { + m_Streamer->Del(offset); + block = NULL; + } + } + } + } + Dprintf("Max. Transmit Blocksize was: %d\n", max); +} + +// --- cStreamdevStreamer ----------------------------------------------------- + +cStreamdevStreamer::cStreamdevStreamer(const char *Name): + cThread(Name), + m_Writer(NULL), + m_RingBuffer(new cStreamdevBuffer(STREAMERBUFSIZE, TS_SIZE * 2, + true, "streamdev-streamer")), + m_SendBuffer(new cStreamdevBuffer(WRITERBUFSIZE, TS_SIZE * 2)) +{ + m_RingBuffer->SetTimeouts(0, 100); + m_SendBuffer->SetTimeouts(100, 100); +} + +cStreamdevStreamer::~cStreamdevStreamer() +{ + Dprintf("Desctructing streamer\n"); + delete m_RingBuffer; + delete m_SendBuffer; +} + +void cStreamdevStreamer::Start(cTBSocket *Socket) +{ + Dprintf("start streamer\n"); + m_Writer = new cStreamdevWriter(Socket, this); + Attach(); +} + +void cStreamdevStreamer::Activate(bool On) +{ + if (On && !Active()) { + Dprintf("activate streamer\n"); + m_Writer->Start(); + cThread::Start(); + } +} + +void cStreamdevStreamer::Stop(void) +{ + if (Running()) { + Dprintf("stopping streamer\n"); + Cancel(3); + } + if (m_Writer) { + Detach(); + DELETENULL(m_Writer); + } +} + +void cStreamdevStreamer::Action(void) +{ + while (Running()) { + int got; + uchar *block = m_RingBuffer->Get(got); + + if (block) { + int count = Put(block, got); + if (count) + m_RingBuffer->Del(count); + } + } +} + |
