summaryrefslogtreecommitdiff
path: root/tools/backgroundwriter.c
blob: 6dae8e9952859c5733386cb0ea9eecd45916ab5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
/*
 * backgroundwriter.h: Buffered socket/file writing thread
 *
 * See the main source file 'xineliboutput.c' for copyright information and
 * how to reach the author.
 *
 * $Id: backgroundwriter.c,v 1.2 2006-06-11 10:19:50 phintuka Exp $
 *
 */

#include <stdint.h>
#include <unistd.h>

#include <vdr/tools.h>

#include "../logdefs.h"
#include "../xine_input_vdr_net.h" // stream_tcp_header_t

#include "backgroundwriter.h"

//#define DISABLE_DISCARD
//#define LOG_DISCARDS

#define MAX_OVERFLOWS_BEFORE_DISCONNECT 500 // ~ 1 second

cBackgroundWriter::cBackgroundWriter(int fd, int Size) 
  : m_RingBuffer(Size, sizeof(stream_tcp_header_t))
{ 
  m_fd = fd; 
  m_RingBuffer.SetTimeouts(0, 100);
  m_Active = true;

  m_PutPos = 0;
  m_DiscardStart = 0;
  m_DiscardEnd   = 0;

  m_BufferOverflows = 0;

  LOGDBG("cBackgroundWriter initialized (buffer %d kb)", Size/1024);

  Start();
}

cBackgroundWriter::~cBackgroundWriter() 
{
  m_Active = false;
  Cancel(3);
}

int cBackgroundWriter::Free(void) 
{
  return m_RingBuffer.Free();
}

void cBackgroundWriter::Action(void)
{
  uint64_t NextHeaderPos = 0ULL;
  uint64_t GetPos = 0ULL;
  cPoller Poller(m_fd, true);

  while(m_Active) {

    if(Poller.Poll(100)) {

      int Count = 0;
      uchar *Data = m_RingBuffer.Get(Count);

      if(Data && Count > 0) {

#ifndef DISABLE_DISCARD
	Lock(); // uint64_t m_DiscardStart can not be read atomically (IA32)
	if(m_DiscardEnd > GetPos) {

# ifdef LOG_DISCARDS
	  LOGMSG("TCP: queue: discard request: queue %d bytes, "
		 "next point %d bytes forward (Count=%d)",
		 m_RingBuffer.Available(),
		 NextHeaderPos - GetPos, 
		 Count);
# endif
	  if(NextHeaderPos == GetPos) {
	    // we're at frame boundary
# ifdef LOG_DISCARDS
	    uint8_t *pkt = TCP_PAYLOAD(Data);
	    if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) {
	      LOGMSG("  -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]);
	    }      
# endif
	    Count = min(Count, (int)(m_DiscardEnd - GetPos));
# ifdef LOG_DISCARDS
	    LOGMSG("Flushing %d bytes", Count);
#endif
	    Unlock();

	    m_RingBuffer.Del(Count);
	    GetPos += Count;
	    NextHeaderPos = GetPos;
# ifdef LOG_DISCARDS
	    LOGMSG("Queue now %d bytes", m_RingBuffer.Available());
	    pkt = TCP_PAYLOAD(Data);
	    if(pkt[0] || pkt[1] || pkt[2] != 1 || hdr->len > 2100) {
	      LOGMSG("  -> %x %x %x %x", pkt[0], pkt[1], pkt[2], pkt[3]);
# endif
	    continue;
	  }
	}
	Unlock();
#endif

#ifndef DISABLE_DISCARD
	if(GetPos == NextHeaderPos) {
	  if(Count < (int)sizeof(stream_tcp_header_t))
	    LOGMSG("cBackgroundWriter @NextHeaderPos: Count < header size !");

	  stream_tcp_header_t *header = (stream_tcp_header_t*)Data;
	  if(Count < (int)(ntohl(header->len) + sizeof(stream_tcp_header_t)))
	    ;//LOGMSG("Count = %d < %d", Count, 
	     //   header->len + sizeof(stream_tcp_header_t));
	  else
	    Count = ntohl(header->len) + sizeof(stream_tcp_header_t);
	  NextHeaderPos = GetPos + ntohl(header->len) + sizeof(stream_tcp_header_t);
	} else {
	  Count = min(Count, (int)(NextHeaderPos-GetPos));
	}
#endif

	errno = 0;
	int n = write(m_fd, Data, Count);

	if(n == 0) {
	  LOGERR("cBackgroundWriter: Client disconnected data stream ?");
	  break;
	  
	} else if(n < 0) {
	  
	  if (errno == EINTR || errno == EWOULDBLOCK) {
	    TRACE("cBackgroundWriter: EINTR while writing to file handle "
		  <<m_fd<<" - retrying");
	    continue;

	  } else {
	    LOGERR("cBackgroundWriter: TCP write error");
	    break;
	  }
	}

	GetPos += n;
	m_RingBuffer.Del(n);
      }
    }
  }

  m_RingBuffer.Clear();
  m_Active = false;
}

void cBackgroundWriter::Clear(void) 
{
  // Can't just drop buffer contents or PES frames will be broken.

  // Serialize with Put
  LOCK_THREAD; 
#ifdef LOG_DISCARDS
  LOGMSG("cBackgroundWriter::Clear() @%lld", m_PutPos);
#endif
  m_DiscardEnd = m_PutPos;
}

bool cBackgroundWriter::Flush(int TimeoutMs)
{
  uint64_t WaitEnd = cTimeMs::Now();

  if(TimeoutMs > 0)
    WaitEnd += (uint64_t)TimeoutMs;

  while(cTimeMs::Now() < WaitEnd &&
	m_Active &&
	m_RingBuffer.Available() > 0)
    cCondWait::SleepMs(3);
  
  return m_RingBuffer.Available() <= 0;
}

int cBackgroundWriter::Put(uint64_t StreamPos, 
			   const uchar *Data, int DataCount)
{
  stream_tcp_header_t header;
  header.pos = htonull(StreamPos);
  header.len = htonl(DataCount);
  return Put((uchar*)&header, sizeof(header), Data, DataCount);
}

int cBackgroundWriter::Put(const uchar *Header, int HeaderCount, 
			   const uchar *Data, int DataCount) 
{
  if(m_Active) {

    // Serialize Put access to keep Data and Header together
    LOCK_THREAD;  
    
    if(m_RingBuffer.Free() < HeaderCount+DataCount) {
      //LOGMSG("cXinelibServer: TCP buffer overflow !");
      if(m_BufferOverflows++ > MAX_OVERFLOWS_BEFORE_DISCONNECT) {
	LOGMSG("cXinelibServer: Too many TCP buffer overflows, dropping client");
	m_RingBuffer.Clear();
	m_Active = false;
	return 0;
      }
      return -HeaderCount-DataCount;
    }
    int n = m_RingBuffer.Put(Header, HeaderCount) +
            m_RingBuffer.Put(Data, DataCount);
    if(n == HeaderCount+DataCount) {
      m_BufferOverflows = 0;
      m_PutPos += n;
      return n;
    }
    
    LOGMSG("cXinelibServer: TCP buffer internal error ?!?");
    m_RingBuffer.Clear();
    m_Active = false;
  }

  return 0;
}