From 7aa6fcf6f8947faa67e095c8decff779673de17f Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Thu, 12 Jul 2007 11:24:27 +0100 Subject: Simplify input_rtp locking We have seen input_rtp lock up in use, and traced the problem to the separate tail/head locks on the input buffer. Reduce to a single lock, increasing lock contention between the reader and the writer, but removing the previous deadlock risk. Also use select() before recv(), to ensure that we never wait forever for packets (e.g. if we're trying to receive a multicast stream, but an administrator has blocked all multicast packets to the device - iptables -A INPUT --dst 224.0.0.0/4 -j DROP induces this failure for testing). --- src/input/input_rtp.c | 193 ++++++++++++++++++++++++++++---------------------- 1 file changed, 108 insertions(+), 85 deletions(-) diff --git a/src/input/input_rtp.c b/src/input/input_rtp.c index d4ba804c6..681bced0f 100644 --- a/src/input/input_rtp.c +++ b/src/input/input_rtp.c @@ -79,6 +79,7 @@ #include #include #include +#include #if defined (__SVR4) && defined (__sun) # include @@ -125,11 +126,9 @@ typedef struct { int fh; unsigned char *buffer; /* circular buffer */ - unsigned char *buffer_tail; /* tail pointer used by reader */ - unsigned char *buffer_head; /* head pointer used by writer */ + unsigned char *buffer_get_ptr; /* get pointer used by reader */ + unsigned char *buffer_put_ptr; /* put pointer used by writer */ long buffer_count; /* number of bytes in the buffer */ - pthread_mutex_t buffer_mutex; /* only used for locking the - * the buffer count variable */ unsigned char packet_buffer[65536]; @@ -143,13 +142,12 @@ typedef struct { char preview[MAX_PREVIEW_SIZE]; int preview_size; + int preview_read_done; /* boolean true after attempt to read input stream for preview */ nbc_t *nbc; - pthread_mutex_t writer_mut; + pthread_mutex_t buffer_ring_mut; pthread_cond_t writer_cond; - - pthread_mutex_t reader_mut; pthread_cond_t reader_cond; } rtp_input_plugin_t; @@ -198,7 +196,7 @@ static int host_connect_attempt(struct in_addr ia, int port, /* Try to increase receive buffer to 1MB to avoid dropping packets */ - optval = 1024 * 1024; + optval = BUFFER_SIZE; if ((setsockopt(s, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval))) < 0) { LOG_MSG(xine, _("setsockopt(SO_RCVBUF): %s.\n"), strerror(errno)); @@ -298,6 +296,7 @@ static void * input_plugin_read_loop(void *arg) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) arg; unsigned char *data; long length; + fd_set read_fds; while (1) { @@ -308,8 +307,28 @@ static void * input_plugin_read_loop(void *arg) { */ pthread_testcancel(); - length = recv(this->fh, this->packet_buffer, - sizeof(this->packet_buffer), 0); + { + struct timeval recv_timeout; + int rc; + + recv_timeout.tv_sec = 2; + recv_timeout.tv_usec = 0; + + FD_ZERO( &read_fds ); + FD_SET( this->fh, &read_fds ); + + /* wait for a packet to arrive - but do not hang! */ + rc = select( this->fh+1, &read_fds, NULL, NULL, &recv_timeout ); + if( rc > 0 ) + { + length = recv(this->fh, this->packet_buffer, + sizeof(this->packet_buffer), 0); + } + else if( rc == 0 ) + length = 0; + else + length = -1; + } pthread_testcancel(); if (length < 0) { @@ -362,28 +381,31 @@ static void * input_plugin_read_loop(void *arg) { } /* insert data into cyclic buffer */ - while (length > 0) { - - /* work with a copy of buffer count, while the variable can - * be updated by the reader - */ - - long buffer_count = this->buffer_count; - long n; + if (length > 0) { /* * if the buffer is full, wait for the reader * to signal */ - if(buffer_count >= BUFFER_SIZE) { - pthread_mutex_lock(&this->writer_mut); - pthread_cond_wait(&this->writer_cond, &this->writer_mut); - pthread_mutex_unlock(&this->writer_mut); - /* update the buffer count again */ - buffer_count = this->buffer_count; - } - + pthread_mutex_lock(&this->buffer_ring_mut); + /* wait for enough space to write the whole of the recv'ed data */ + while( (BUFFER_SIZE - this->buffer_count) < length ) + { + struct timeval tv; + struct timespec timeout; + + gettimeofday(&tv, NULL); + + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + 2; + + if( pthread_cond_timedwait(&this->writer_cond, &this->buffer_ring_mut, &timeout) != 0 ) + { + fprintf( stdout, "input_rtp: buffer ring not read within 2 seconds!\n" ); + } + } + /* Now there's enough space to write some bytes into the buffer * determine how many bytes can be written. If the buffer wraps * around, write in two pieces: from the head pointer to the @@ -391,37 +413,29 @@ static void * input_plugin_read_loop(void *arg) { * of bytes. */ - if(length > (BUFFER_SIZE - buffer_count)) { - n = BUFFER_SIZE - buffer_count; - } - else { - n = length; - } - - if(((this->buffer_head - this->buffer) + n) > BUFFER_SIZE) { - n = BUFFER_SIZE - (this->buffer_head - this->buffer); - } - - /* The actual write... */ - memcpy(this->buffer_head, data, n); + { + long buffer_space_remaining = BUFFER_SIZE - (this->buffer_put_ptr - this->buffer); + + if( buffer_space_remaining >= length ) + { + /* data fits inside the buffer */ + memcpy(this->buffer_put_ptr, data, length); + this->buffer_put_ptr += length; + } + else + { + /* data wrapped around the end of the buffer */ + memcpy(this->buffer_put_ptr, data, buffer_space_remaining); + memcpy(this->buffer, &data[buffer_space_remaining], length - buffer_space_remaining); + this->buffer_put_ptr = &this->buffer[ length - buffer_space_remaining ]; + } + } - data += n; - length -= n; - - /* update head pointer; and check for wrap around */ - this->buffer_head += n; - if(this->buffer_head - this->buffer >= BUFFER_SIZE) - this->buffer_head = this->buffer; - - /* lock the mutex; for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); - this->buffer_count += n; - pthread_mutex_unlock(&this->buffer_mutex); + this->buffer_count += length; /* signal the reader that there is new data */ - pthread_mutex_lock(&this->reader_mut); pthread_cond_signal(&this->reader_cond); - pthread_mutex_unlock(&this->reader_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); } } } @@ -443,33 +457,25 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, off_t n; - /* work with a copy of the buffer count, while the variable can - * be updated by the writer - */ + pthread_mutex_lock(&this->buffer_ring_mut); - long buffer_count = this->buffer_count; - /* * if nothing in the buffer, wait for data for 5 seconds. If * no data is received within this timeout, return the number * of bytes already received (which is likely to be 0) */ - if(buffer_count == 0) { + if(this->buffer_count == 0) { gettimeofday(&tv, NULL); timeout.tv_nsec = tv.tv_usec * 1000; timeout.tv_sec = tv.tv_sec + 5; - pthread_mutex_lock(&this->reader_mut); - if(pthread_cond_timedwait(&this->reader_cond, &this->reader_mut, &timeout) != 0) + if(pthread_cond_timedwait(&this->reader_cond, &this->buffer_ring_mut, &timeout) != 0) { /* we timed out, no data available */ - pthread_mutex_unlock(&this->reader_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); return copied; } - pthread_mutex_unlock(&this->reader_mut); - /* update the local buffer count variable again */ - buffer_count = this->buffer_count; } /* Now determine how many bytes can be read. If the buffer @@ -478,38 +484,34 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, * update the buffer count. Finally read the second piece * from the base to the remaining count */ - if(length > buffer_count) { - n = buffer_count; + if(length > this->buffer_count) { + n = this->buffer_count; } else { n = length; } - if(((this->buffer_tail - this->buffer) + n) > BUFFER_SIZE) { - n = BUFFER_SIZE - (this->buffer_tail - this->buffer); + if(((this->buffer_get_ptr - this->buffer) + n) > BUFFER_SIZE) { + n = BUFFER_SIZE - (this->buffer_get_ptr - this->buffer); } /* the actual read */ - memcpy(buf, this->buffer_tail, n); + memcpy(buf, this->buffer_get_ptr, n); buf += n; copied += n; length -= n; /* update the tail pointer, watch for wrap arounds */ - this->buffer_tail += n; - if(this->buffer_tail - this->buffer >= BUFFER_SIZE) - this->buffer_tail = this->buffer; + this->buffer_get_ptr += n; + if(this->buffer_get_ptr - this->buffer >= BUFFER_SIZE) + this->buffer_get_ptr = this->buffer; - /* lock the buffer, for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); this->buffer_count -= n; - pthread_mutex_unlock(&this->buffer_mutex); /* signal the writer that there's space in the buffer again */ - pthread_mutex_lock(&this->writer_mut); pthread_cond_signal(&this->writer_cond); - pthread_mutex_unlock(&this->writer_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); } this->curpos += copied; @@ -517,6 +519,27 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, return copied; } +static buf_element_t *rtp_plugin_read_block (input_plugin_t *this_gen, + fifo_buffer_t *fifo, off_t todo) { + buf_element_t *buf = fifo->buffer_pool_alloc (fifo); + int total_bytes; + + + buf->content = buf->mem; + buf->type = BUF_DEMUX_BLOCK; + + total_bytes = rtp_plugin_read (this_gen, buf->content, todo); + + if (total_bytes != todo) { + buf->free_buffer (buf); + return NULL; + } + + buf->size = total_bytes; + + return buf; +} + /* * */ @@ -584,9 +607,11 @@ static int rtp_plugin_get_optional_data (input_plugin_t *this_gen, */ if (data_type == INPUT_OPTIONAL_DATA_PREVIEW) { - if (this->preview_size == 0) { + if (!this->preview_read_done) { this->preview_size = rtp_plugin_read(this_gen, this->preview, MAX_PREVIEW_SIZE); lprintf("Preview data length = %d\n", this->preview_size); + + this->preview_read_done = 1; } memcpy(data, this->preview, this->preview_size); return this->preview_size; @@ -705,23 +730,21 @@ static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, if (iptr) this->interface = iptr; - pthread_mutex_init(&this->buffer_mutex, NULL); - pthread_mutex_init(&this->reader_mut, NULL); - pthread_mutex_init(&this->writer_mut, NULL); + pthread_mutex_init(&this->buffer_ring_mut, NULL); pthread_cond_init(&this->reader_cond, NULL); pthread_cond_init(&this->writer_cond, NULL); this->buffer = malloc(BUFFER_SIZE); - this->buffer_head = this->buffer; - this->buffer_tail = this->buffer; + this->buffer_put_ptr = this->buffer; + this->buffer_get_ptr = this->buffer; this->buffer_count = 0; this->curpos = 0; this->input_plugin.open = rtp_plugin_open; this->input_plugin.get_capabilities = rtp_plugin_get_capabilities; this->input_plugin.read = rtp_plugin_read; - this->input_plugin.read_block = NULL; + this->input_plugin.read_block = rtp_plugin_read_block; this->input_plugin.seek = rtp_plugin_seek; this->input_plugin.get_current_pos = rtp_plugin_get_current_pos; this->input_plugin.get_length = rtp_plugin_get_length; -- cgit v1.2.3