summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorphintuka <phintuka>2009-07-30 11:38:45 +0000
committerphintuka <phintuka>2009-07-30 11:38:45 +0000
commite6435ec6e1a04825fa53a0604111d19bd1dea50d (patch)
tree711b4b3d5d55927cf4ce51ce4f20db71af0fdf22
parent6b56daf4bdd705c06cf4cbfb0042b0586125885b (diff)
downloadxineliboutput-e6435ec6e1a04825fa53a0604111d19bd1dea50d.tar.gz
xineliboutput-e6435ec6e1a04825fa53a0604111d19bd1dea50d.tar.bz2
Removed data thread.
-rw-r--r--xine_input_vdr.c111
1 files changed, 42 insertions, 69 deletions
diff --git a/xine_input_vdr.c b/xine_input_vdr.c
index a0cbd847..ef07b3a9 100644
--- a/xine_input_vdr.c
+++ b/xine_input_vdr.c
@@ -4,7 +4,7 @@
* See the main source file 'xineliboutput.c' for copyright information and
* how to reach the author.
*
- * $Id: xine_input_vdr.c,v 1.276 2009-07-30 10:48:34 phintuka Exp $
+ * $Id: xine_input_vdr.c,v 1.277 2009-07-30 11:38:45 phintuka Exp $
*
*/
@@ -325,7 +325,6 @@ typedef struct vdr_input_plugin_s {
/* Network */
pthread_t control_thread;
- pthread_t data_thread;
pthread_mutex_t fd_control_lock;
buf_element_t *read_buffer;
int threads_initialized;
@@ -3533,7 +3532,7 @@ static int wait_stream_sync(vdr_input_plugin_t *this)
while(this->control_running &&
this->discard_index < this->discard_index_ds &&
- /*!this->stream->demux_action_pending &&*/
+ !this->stream->demux_action_pending &&
--counter > 0) {
struct timespec abstime;
create_timeout_time(&abstime, 10);
@@ -3614,16 +3613,23 @@ static buf_element_t *vdr_plugin_read_block_tcp(vdr_input_plugin_t *this)
int warnings = 0;
int result, n;
+ if (this->discard_index < this->discard_index_ds &&
+ wait_stream_sync(this))
+ return NULL;
+
if (read_buffer && read_buffer->size >= sizeof(stream_tcp_header_t))
todo += ((stream_tcp_header_t *)read_buffer->content)->len;
while (XIO_READY == (result = _x_io_select(this->stream, this->fd_data, XIO_READ_READY, 100))) {
- pthread_testcancel();
if (!this->control_running || this->fd_data < 0) {
errno = ENOTCONN;
return NULL;
}
+ if (this->stream->demux_action_pending) {
+ errno = EINTR;
+ return NULL;
+ }
/* Allocate buffer */
if (!read_buffer) {
@@ -3705,18 +3711,6 @@ static buf_element_t *vdr_plugin_read_block_tcp(vdr_input_plugin_t *this)
return NULL;
}
-static int vdr_plugin_read_net_tcp(vdr_input_plugin_t *this)
-{
- buf_element_t *buf = vdr_plugin_read_block_tcp(this);
- if (buf) {
- this->block_buffer->put(this->block_buffer, buf);
- return XIO_READY;
- }
- if (errno == EAGAIN || errno == EINTR)
- return XIO_TIMEOUT;
- return XIO_ERROR;
-}
-
/*
* read_socket_udp()
*
@@ -3738,6 +3732,10 @@ static buf_element_t *read_socket_udp(vdr_input_plugin_t *this)
errno = ENOTCONN;
return NULL;
}
+ if (this->stream->demux_action_pending) {
+ errno = EINTR;
+ return NULL;
+ }
if (result != XIO_READY) {
if (result == XIO_ERROR)
LOGERR("read_socket_udp(): select() failed");
@@ -3748,8 +3746,6 @@ static buf_element_t *read_socket_udp(vdr_input_plugin_t *this)
return NULL;
}
- pthread_testcancel();
-
/*
* allocate buffer
*/
@@ -4066,12 +4062,26 @@ static buf_element_t *vdr_plugin_read_block_udp(vdr_input_plugin_t *this)
while (this->control_running && this->fd_data >= 0) {
+ if (this->discard_index < this->discard_index_ds &&
+ wait_stream_sync(this))
+ return NULL;
+
+ if (!this->control_running || this->fd_data < 0) {
+ errno = ENOTCONN;
+ return NULL;
+ }
+
buf_element_t *read_buffer;
/* return next packet from reordering queue (if any) */
if (NULL != (read_buffer = udp_process_queue(this)))
return read_buffer;
+ if (this->stream->demux_action_pending) {
+ errno = EINTR;
+ return NULL;
+ }
+
/* poll + read socket */
if ( ! (read_buffer = read_socket_udp(this)))
return NULL;
@@ -4159,48 +4169,6 @@ static buf_element_t *vdr_plugin_read_block_udp(vdr_input_plugin_t *this)
return NULL;
}
-static int vdr_plugin_read_net_udp(vdr_input_plugin_t *this)
-{
- buf_element_t *buf = vdr_plugin_read_block_udp(this);
- if (buf) {
- this->block_buffer->put(this->block_buffer, buf);
- return XIO_READY;
- }
- if (errno == EAGAIN || errno == EINTR)
- return XIO_TIMEOUT;
- return XIO_ERROR;
-}
-
-static void *vdr_data_thread(void *this_gen)
-{
- vdr_input_plugin_t *this = (vdr_input_plugin_t *) this_gen;
-
- LOGDBG("Data thread started");
-
- const int priority = -1;
- errno = 0;
- if((nice(priority) == -1) && errno)
- LOGDBG("Data thread: Can't nice to value: %d", priority);
-
- if(this->udp || this->rtp) {
- while(this->control_running) {
- if(vdr_plugin_read_net_udp(this) == XIO_ERROR)
- break;
- pthread_testcancel();
- }
- } else {
- while(this->control_running) {
- if(vdr_plugin_read_net_tcp(this) == XIO_ERROR)
- break;
- pthread_testcancel();
- }
- }
-
- this->control_running = 0;
- LOGDBG("Data thread terminated");
- pthread_exit(NULL);
-}
-
#ifdef TEST_PIP
static int write_slave_stream(vdr_input_plugin_t *this, const char *data, int len)
{
@@ -4556,7 +4524,20 @@ static buf_element_t *vdr_plugin_read_block (input_plugin_t *this_gen,
need_pause = adjust_scr_speed(this);
/* get next buffer */
- buf = fifo_buffer_timed_get(this->block_buffer, 100);
+ if (this->funcs.push_input_write /* local mode */) {
+ buf = fifo_buffer_timed_get(this->block_buffer, 100);
+ } else {
+ if ( !(buf= fifo_buffer_try_get(this->block_buffer))) {
+ if (this->udp || this->rtp)
+ buf = vdr_plugin_read_block_udp(this);
+ else
+ buf = vdr_plugin_read_block_tcp(this);
+ if (!buf && errno != EAGAIN && errno != EINTR) {
+ handle_disconnect(this);
+ return NULL;
+ }
+ }
+ }
if (!buf) {
if (!this->is_paused &&
@@ -4683,9 +4664,6 @@ static void vdr_plugin_dispose (input_plugin_t *this_gen)
LOGDBG("Cancel control thread ...");
/*pthread_cancel(this->control_thread);*/
pthread_join (this->control_thread, &p);
- LOGDBG("Cancel data thread ...");
- /*pthread_cancel(this->data_thread);*/
- pthread_join (this->data_thread, &p);
LOGDBG("Threads joined");
}
@@ -5426,11 +5404,6 @@ static int vdr_plugin_open_net (input_plugin_t *this_gen)
LOGERR("Can't create new thread");
return 0;
}
- if ((err = pthread_create (&this->data_thread,
- NULL, vdr_data_thread, (void*)this)) != 0) {
- LOGERR("Can't create new thread");
- return 0;
- }
this->class->xine->port_ticket->acquire(this->class->xine->port_ticket, 1);
if(!(this->stream->video_out->get_capabilities(this->stream->video_out) &