summaryrefslogtreecommitdiff
path: root/client/assembler.c
blob: c6f811570dd61010968ea6e17222853b865a745a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/*
 *  $Id: assembler.c,v 1.3 2008/04/07 14:27:28 schmirl Exp $
 */

#include "client/assembler.h"
#include "common.h"

#include "tools/socket.h"
#include "tools/select.h"

#include <vdr/tools.h>
#include <vdr/device.h>
#include <vdr/ringbuffer.h>

#include <unistd.h>

cStreamdevAssembler::cStreamdevAssembler(cTBSocket *Socket)
		:cThread("Streamdev: UDP-TS Assembler")
{
	m_Socket = Socket;
	if (pipe(m_Pipe) != 0) {
		esyslog("streamdev-client: Couldn't open assembler pipe: %m");
		return;
	}
	fcntl(m_Pipe[0], F_SETFL, O_NONBLOCK);
	fcntl(m_Pipe[1], F_SETFL, O_NONBLOCK);
	m_Mutex.Lock();
	Start();
}

cStreamdevAssembler::~cStreamdevAssembler() {
	if (m_Active) {
		m_Active = false;
/*      WakeUp();*/
		Cancel(3);
	}
	close(m_Pipe[0]);
	close(m_Pipe[1]);
}

void cStreamdevAssembler::Action(void) {
	cTBSelect sel;
	uchar buffer[2048];
	bool fillup = true;

	const int rbsize = TS_SIZE * 5600;
	const int rbmargin = TS_SIZE * 2;
	const int rbminfill = rbmargin * 50;
	cRingBufferLinear ringbuf(rbsize, rbmargin, true);

	m_Mutex.Lock();

	m_Active = true;
	while (m_Active) {
		sel.Clear();

		if (ringbuf.Available() < rbsize * 80 / 100)
			sel.Add(*m_Socket, false);
		if (ringbuf.Available() > rbminfill) {
			if (fillup) {
				Dprintf("giving signal\n");
				m_WaitFill.Broadcast();
				m_Mutex.Unlock();
				fillup = false;
			}
			sel.Add(m_Pipe[1], true);
		}

		if (sel.Select(1500) < 0) {
			if (!m_Active) // Exit was requested
				break;
			esyslog("streamdev-client: Fatal error: %m");
			Dprintf("streamdev-client: select failed (%m)\n");
			m_Active = false;
			break;
		}

		if (sel.CanRead(*m_Socket)) {
			int b;
			if ((b = m_Socket->Read(buffer, sizeof(buffer))) < 0) {
				esyslog("streamdev-client: Couldn't read from server: %m");
				Dprintf("streamdev-client: read failed (%m)\n");
				m_Active = false;
				break;
			}
			if (b == 0)
				m_Active = false;
			else
				ringbuf.Put(buffer, b);
		}

		if (sel.CanWrite(m_Pipe[1])) {
			int recvd;
			const uchar *block = ringbuf.Get(recvd);
			if (block && recvd > 0) {
				int result;
				if (recvd > ringbuf.Available() - rbminfill)
					recvd = ringbuf.Available() - rbminfill;
				if ((result = write(m_Pipe[1], block, recvd)) == -1) {
					esyslog("streamdev-client: Couldn't write to VDR: %m"); // TODO
					Dprintf("streamdev-client: write failed (%m)\n");
					m_Active = false;
					break;
				}
				ringbuf.Del(result);
			}
		}
	}
}

void cStreamdevAssembler::WaitForFill(void) {
	m_WaitFill.Wait(m_Mutex);
	m_Mutex.Unlock();
}