diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/input/input_rtp.c | 193 |
1 files changed, 108 insertions, 85 deletions
diff --git a/src/input/input_rtp.c b/src/input/input_rtp.c index d4ba804c6..681bced0f 100644 --- a/src/input/input_rtp.c +++ b/src/input/input_rtp.c @@ -79,6 +79,7 @@ #include <sys/time.h> #include <stdlib.h> #include <net/if.h> +#include <sys/select.h> #if defined (__SVR4) && defined (__sun) # include <sys/sockio.h> @@ -125,11 +126,9 @@ typedef struct { int fh; unsigned char *buffer; /* circular buffer */ - unsigned char *buffer_tail; /* tail pointer used by reader */ - unsigned char *buffer_head; /* head pointer used by writer */ + unsigned char *buffer_get_ptr; /* get pointer used by reader */ + unsigned char *buffer_put_ptr; /* put pointer used by writer */ long buffer_count; /* number of bytes in the buffer */ - pthread_mutex_t buffer_mutex; /* only used for locking the - * the buffer count variable */ unsigned char packet_buffer[65536]; @@ -143,13 +142,12 @@ typedef struct { char preview[MAX_PREVIEW_SIZE]; int preview_size; + int preview_read_done; /* boolean true after attempt to read input stream for preview */ nbc_t *nbc; - pthread_mutex_t writer_mut; + pthread_mutex_t buffer_ring_mut; pthread_cond_t writer_cond; - - pthread_mutex_t reader_mut; pthread_cond_t reader_cond; } rtp_input_plugin_t; @@ -198,7 +196,7 @@ static int host_connect_attempt(struct in_addr ia, int port, /* Try to increase receive buffer to 1MB to avoid dropping packets */ - optval = 1024 * 1024; + optval = BUFFER_SIZE; if ((setsockopt(s, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval))) < 0) { LOG_MSG(xine, _("setsockopt(SO_RCVBUF): %s.\n"), strerror(errno)); @@ -298,6 +296,7 @@ static void * input_plugin_read_loop(void *arg) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) arg; unsigned char *data; long length; + fd_set read_fds; while (1) { @@ -308,8 +307,28 @@ static void * input_plugin_read_loop(void *arg) { */ pthread_testcancel(); - length = recv(this->fh, this->packet_buffer, - sizeof(this->packet_buffer), 0); + { + struct timeval recv_timeout; + int rc; + + recv_timeout.tv_sec = 2; + recv_timeout.tv_usec = 0; + + FD_ZERO( &read_fds ); + FD_SET( this->fh, &read_fds ); + + /* wait for a packet to arrive - but do not hang! */ + rc = select( this->fh+1, &read_fds, NULL, NULL, &recv_timeout ); + if( rc > 0 ) + { + length = recv(this->fh, this->packet_buffer, + sizeof(this->packet_buffer), 0); + } + else if( rc == 0 ) + length = 0; + else + length = -1; + } pthread_testcancel(); if (length < 0) { @@ -362,28 +381,31 @@ static void * input_plugin_read_loop(void *arg) { } /* insert data into cyclic buffer */ - while (length > 0) { - - /* work with a copy of buffer count, while the variable can - * be updated by the reader - */ - - long buffer_count = this->buffer_count; - long n; + if (length > 0) { /* * if the buffer is full, wait for the reader * to signal */ - if(buffer_count >= BUFFER_SIZE) { - pthread_mutex_lock(&this->writer_mut); - pthread_cond_wait(&this->writer_cond, &this->writer_mut); - pthread_mutex_unlock(&this->writer_mut); - /* update the buffer count again */ - buffer_count = this->buffer_count; - } - + pthread_mutex_lock(&this->buffer_ring_mut); + /* wait for enough space to write the whole of the recv'ed data */ + while( (BUFFER_SIZE - this->buffer_count) < length ) + { + struct timeval tv; + struct timespec timeout; + + gettimeofday(&tv, NULL); + + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + 2; + + if( pthread_cond_timedwait(&this->writer_cond, &this->buffer_ring_mut, &timeout) != 0 ) + { + fprintf( stdout, "input_rtp: buffer ring not read within 2 seconds!\n" ); + } + } + /* Now there's enough space to write some bytes into the buffer * determine how many bytes can be written. If the buffer wraps * around, write in two pieces: from the head pointer to the @@ -391,37 +413,29 @@ static void * input_plugin_read_loop(void *arg) { * of bytes. */ - if(length > (BUFFER_SIZE - buffer_count)) { - n = BUFFER_SIZE - buffer_count; - } - else { - n = length; - } - - if(((this->buffer_head - this->buffer) + n) > BUFFER_SIZE) { - n = BUFFER_SIZE - (this->buffer_head - this->buffer); - } - - /* The actual write... */ - memcpy(this->buffer_head, data, n); + { + long buffer_space_remaining = BUFFER_SIZE - (this->buffer_put_ptr - this->buffer); + + if( buffer_space_remaining >= length ) + { + /* data fits inside the buffer */ + memcpy(this->buffer_put_ptr, data, length); + this->buffer_put_ptr += length; + } + else + { + /* data wrapped around the end of the buffer */ + memcpy(this->buffer_put_ptr, data, buffer_space_remaining); + memcpy(this->buffer, &data[buffer_space_remaining], length - buffer_space_remaining); + this->buffer_put_ptr = &this->buffer[ length - buffer_space_remaining ]; + } + } - data += n; - length -= n; - - /* update head pointer; and check for wrap around */ - this->buffer_head += n; - if(this->buffer_head - this->buffer >= BUFFER_SIZE) - this->buffer_head = this->buffer; - - /* lock the mutex; for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); - this->buffer_count += n; - pthread_mutex_unlock(&this->buffer_mutex); + this->buffer_count += length; /* signal the reader that there is new data */ - pthread_mutex_lock(&this->reader_mut); pthread_cond_signal(&this->reader_cond); - pthread_mutex_unlock(&this->reader_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); } } } @@ -443,33 +457,25 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, off_t n; - /* work with a copy of the buffer count, while the variable can - * be updated by the writer - */ + pthread_mutex_lock(&this->buffer_ring_mut); - long buffer_count = this->buffer_count; - /* * if nothing in the buffer, wait for data for 5 seconds. If * no data is received within this timeout, return the number * of bytes already received (which is likely to be 0) */ - if(buffer_count == 0) { + if(this->buffer_count == 0) { gettimeofday(&tv, NULL); timeout.tv_nsec = tv.tv_usec * 1000; timeout.tv_sec = tv.tv_sec + 5; - pthread_mutex_lock(&this->reader_mut); - if(pthread_cond_timedwait(&this->reader_cond, &this->reader_mut, &timeout) != 0) + if(pthread_cond_timedwait(&this->reader_cond, &this->buffer_ring_mut, &timeout) != 0) { /* we timed out, no data available */ - pthread_mutex_unlock(&this->reader_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); return copied; } - pthread_mutex_unlock(&this->reader_mut); - /* update the local buffer count variable again */ - buffer_count = this->buffer_count; } /* Now determine how many bytes can be read. If the buffer @@ -478,38 +484,34 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, * update the buffer count. Finally read the second piece * from the base to the remaining count */ - if(length > buffer_count) { - n = buffer_count; + if(length > this->buffer_count) { + n = this->buffer_count; } else { n = length; } - if(((this->buffer_tail - this->buffer) + n) > BUFFER_SIZE) { - n = BUFFER_SIZE - (this->buffer_tail - this->buffer); + if(((this->buffer_get_ptr - this->buffer) + n) > BUFFER_SIZE) { + n = BUFFER_SIZE - (this->buffer_get_ptr - this->buffer); } /* the actual read */ - memcpy(buf, this->buffer_tail, n); + memcpy(buf, this->buffer_get_ptr, n); buf += n; copied += n; length -= n; /* update the tail pointer, watch for wrap arounds */ - this->buffer_tail += n; - if(this->buffer_tail - this->buffer >= BUFFER_SIZE) - this->buffer_tail = this->buffer; + this->buffer_get_ptr += n; + if(this->buffer_get_ptr - this->buffer >= BUFFER_SIZE) + this->buffer_get_ptr = this->buffer; - /* lock the buffer, for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); this->buffer_count -= n; - pthread_mutex_unlock(&this->buffer_mutex); /* signal the writer that there's space in the buffer again */ - pthread_mutex_lock(&this->writer_mut); pthread_cond_signal(&this->writer_cond); - pthread_mutex_unlock(&this->writer_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); } this->curpos += copied; @@ -517,6 +519,27 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, return copied; } +static buf_element_t *rtp_plugin_read_block (input_plugin_t *this_gen, + fifo_buffer_t *fifo, off_t todo) { + buf_element_t *buf = fifo->buffer_pool_alloc (fifo); + int total_bytes; + + + buf->content = buf->mem; + buf->type = BUF_DEMUX_BLOCK; + + total_bytes = rtp_plugin_read (this_gen, buf->content, todo); + + if (total_bytes != todo) { + buf->free_buffer (buf); + return NULL; + } + + buf->size = total_bytes; + + return buf; +} + /* * */ @@ -584,9 +607,11 @@ static int rtp_plugin_get_optional_data (input_plugin_t *this_gen, */ if (data_type == INPUT_OPTIONAL_DATA_PREVIEW) { - if (this->preview_size == 0) { + if (!this->preview_read_done) { this->preview_size = rtp_plugin_read(this_gen, this->preview, MAX_PREVIEW_SIZE); lprintf("Preview data length = %d\n", this->preview_size); + + this->preview_read_done = 1; } memcpy(data, this->preview, this->preview_size); return this->preview_size; @@ -705,23 +730,21 @@ static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, if (iptr) this->interface = iptr; - pthread_mutex_init(&this->buffer_mutex, NULL); - pthread_mutex_init(&this->reader_mut, NULL); - pthread_mutex_init(&this->writer_mut, NULL); + pthread_mutex_init(&this->buffer_ring_mut, NULL); pthread_cond_init(&this->reader_cond, NULL); pthread_cond_init(&this->writer_cond, NULL); this->buffer = malloc(BUFFER_SIZE); - this->buffer_head = this->buffer; - this->buffer_tail = this->buffer; + this->buffer_put_ptr = this->buffer; + this->buffer_get_ptr = this->buffer; this->buffer_count = 0; this->curpos = 0; this->input_plugin.open = rtp_plugin_open; this->input_plugin.get_capabilities = rtp_plugin_get_capabilities; this->input_plugin.read = rtp_plugin_read; - this->input_plugin.read_block = NULL; + this->input_plugin.read_block = rtp_plugin_read_block; this->input_plugin.seek = rtp_plugin_seek; this->input_plugin.get_current_pos = rtp_plugin_get_current_pos; this->input_plugin.get_length = rtp_plugin_get_length; |