From 66f6a0678b31724de407240c72f33701a386b385 Mon Sep 17 00:00:00 2001 From: Miguel Freitas Date: Mon, 8 Dec 2003 12:35:20 +0000 Subject: fix performance problems patch by Ramon van der Aar CVS patchset: 5869 CVS date: 2003/12/08 12:35:20 --- src/input/Makefile.am | 2 +- src/input/input_rtp.c | 296 ++++++++++++++++++++++++++------------------------ 2 files changed, 158 insertions(+), 140 deletions(-) (limited to 'src') diff --git a/src/input/Makefile.am b/src/input/Makefile.am index b644e6494..5d834fb22 100644 --- a/src/input/Makefile.am +++ b/src/input/Makefile.am @@ -88,7 +88,7 @@ xineplug_inp_stdin_fifo_la_SOURCES = input_stdin_fifo.c net_buf_ctrl.c xineplug_inp_stdin_fifo_la_LIBADD = $(XINE_LIB) xineplug_inp_stdin_fifo_la_LDFLAGS = -avoid-version -module @XINE_PLUGIN_MIN_SYMS@ -xineplug_inp_rtp_la_SOURCES = input_rtp.c +xineplug_inp_rtp_la_SOURCES = input_rtp.c net_buf_ctrl.c xineplug_inp_rtp_la_LIBADD = $(XINE_LIB) xineplug_inp_rtp_la_LDFLAGS = -avoid-version -module @XINE_PLUGIN_MIN_SYMS@ diff --git a/src/input/input_rtp.c b/src/input/input_rtp.c index d67567c93..1423c5613 100644 --- a/src/input/input_rtp.c +++ b/src/input/input_rtp.c @@ -88,6 +88,7 @@ #include "xine_internal.h" #include "xineutils.h" #include "input_plugin.h" +#include "net_buf_ctrl.h" #ifdef __GNUC__ #define LOG_MSG(xine, message, args...) { \ @@ -117,16 +118,13 @@ typedef struct { int fh; - pthread_mutex_t buffer_mutex; - pthread_cond_t buffer_notempty; - pthread_cond_t buffer_empty; + unsigned char *buffer; /* circular buffer */ + unsigned char *buffer_tail; /* tail pointer used by reader */ + unsigned char *buffer_head; /* head pointer used by writer */ + long buffer_count; /* number of bytes in the buffer */ + pthread_mutex_t buffer_mutex; /* only used for locking the + * the buffer count variable */ - unsigned char *buffer; - long buffer_start; /* first data byte */ - long buffer_length; /* no data bytes */ - - struct timespec preview_timeout; - unsigned char packet_buffer[65536]; int last_input_error; @@ -134,9 +132,16 @@ typedef struct { pthread_t reader_thread; - int curpos; + int curpos; int rtp_running; + + nbc_t *nbc; + pthread_mutex_t writer_mut; + pthread_cond_t writer_cond; + + pthread_mutex_t reader_mut; + pthread_cond_t reader_cond; } rtp_input_plugin_t; typedef struct { @@ -243,7 +248,7 @@ static void * input_plugin_read_loop(void *arg) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) arg; unsigned char *data; long length; - + while (1) { /* System calls are not a thread cancellation point in Linux @@ -314,107 +319,158 @@ static void * input_plugin_read_loop(void *arg) { } /* insert data into cyclic buffer */ - pthread_mutex_lock(&this->buffer_mutex); - while (length > 0) { - long n, pos; - - while (this->buffer_length == BUFFER_SIZE) + /* work with a copy of buffer count, while the variable can + * be updated by the reader + */ + long buffer_count = this->buffer_count; + long n; + + /* + * if the buffer is full, wait for the reader + * to signal + */ + if(buffer_count >= BUFFER_SIZE) { - pthread_cond_wait(&this->buffer_empty, - &this->buffer_mutex); + pthread_mutex_lock(&this->writer_mut); + pthread_cond_wait(&this->writer_cond, &this->writer_mut); + pthread_mutex_unlock(&this->writer_mut); + /* update the buffer count again */ + buffer_count = this->buffer_count; } - pos = (this->buffer_start + this->buffer_length) % BUFFER_SIZE; - n = length; - - /* Don't write past end of buffer */ - if (pos + n > BUFFER_SIZE) - { - n = BUFFER_SIZE - pos; + /* Now there's enough space to write some bytes into the buffer + * determine how many bytes can be written. If the buffer wraps + * around, write in two pieces: from the head pointer to the + * end of the buffer and from the base to the remaining number + * of bytes. + */ + if(length > (BUFFER_SIZE - buffer_count)) + { + n = BUFFER_SIZE - buffer_count; } + else + { + n = length; + } + + if(((this->buffer_head - this->buffer) + n) > BUFFER_SIZE) + { + n = BUFFER_SIZE - (this->buffer_head - this->buffer); + } + + /* The actual write... */ + memcpy(this->buffer_head, data, n); - /* And don't write into start of data either */ - if (pos < this->buffer_start && pos + n > this->buffer_start) - { - n = this->buffer_start - pos; - } - - memcpy(this->buffer + pos, data, n); - - data += n; - length -= n; + data += n; + length -= n; - this->buffer_length += n; - pthread_cond_signal(&this->buffer_notempty); + /* update head pointer; and check for wrap around */ + this->buffer_head += n; + if(this->buffer_head - this->buffer >= BUFFER_SIZE) + this->buffer_head = this->buffer; + + /* lock the mutex; for updating the count */ + pthread_mutex_lock(&this->buffer_mutex); + this->buffer_count += n; + pthread_mutex_unlock(&this->buffer_mutex); + + /* signal the reader that there is new data */ + pthread_mutex_lock(&this->reader_mut); + pthread_cond_signal(&this->reader_cond); + pthread_mutex_unlock(&this->reader_mut); } - - pthread_mutex_unlock(&this->buffer_mutex); } } } + /* ***************************************************************** */ /* END OF PRIVATES */ /* ***************************************************************** */ -/* - * - */ static off_t rtp_plugin_read (input_plugin_t *this_gen, char *buf, off_t length) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + struct timeval tv; struct timespec timeout; - off_t copied = 0; - - gettimeofday(&tv, NULL); - timeout.tv_nsec = tv.tv_usec * 1000; - timeout.tv_sec = tv.tv_sec + 1; - - pthread_mutex_lock(&this->buffer_mutex); + off_t copied = 0; - while (length > 0) + while(length > 0) { - long n; - - while (this->buffer_length == 0) - { - if (pthread_cond_timedwait(&this->buffer_notempty, - &this->buffer_mutex, - &timeout) != 0) - { - pthread_mutex_unlock(&this->buffer_mutex); - return copied; - } - } - - n = length; - - /* Don't read past end of buffer */ - if (this->buffer_start + n > BUFFER_SIZE) - { - n = BUFFER_SIZE - this->buffer_start; - } + off_t n; + /* work with a copy of the buffer count, while the variable can + * be updated by the writer + */ + long buffer_count = this->buffer_count; + - /* Don't copy past end of data either */ - if (n > this->buffer_length) + /* + * if nothing in the buffer, wait for data for 5 seconds. If + * no data is received within this timeout, return the number + * of bytes already received (which is likely to be 0) + */ + if(buffer_count == 0) { - n = this->buffer_length; + gettimeofday(&tv, NULL); + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + 5; + + pthread_mutex_lock(&this->reader_mut); + if(pthread_cond_timedwait(&this->reader_cond, &this->reader_mut, &timeout) != 0) + { + /* we timed out, no data available */ + pthread_mutex_unlock(&this->reader_mut); + return copied; + } + pthread_mutex_unlock(&this->reader_mut); + /* update the local buffer count variable again */ + buffer_count = this->buffer_count; } - memcpy(buf, this->buffer + this->buffer_start, n); + /* Now determine how many bytes can be read. If the buffer + * will wrap the buffer is read in two pieces, first read + * to the end of the buffer, wrap the tail pointer and + * update the buffer count. Finally read the second piece + * from the base to the remaining count + */ + if(length > buffer_count) + { + n = buffer_count; + } + else + { + n = length; + } + + if(((this->buffer_tail - this->buffer) + n) > BUFFER_SIZE) + { + n = BUFFER_SIZE - (this->buffer_tail - this->buffer); + } + + /* the actual read */ + memcpy(buf, this->buffer_tail, n); buf += n; copied += n; length -= n; - this->buffer_start = (this->buffer_start + n) % BUFFER_SIZE; - this->buffer_length -= n; - pthread_cond_signal(&this->buffer_empty); - } + /* update the tail pointer, watch for wrap arounds */ + this->buffer_tail += n; + if(this->buffer_tail - this->buffer >= BUFFER_SIZE) + this->buffer_tail = this->buffer; - pthread_mutex_unlock(&this->buffer_mutex); + /* lock the buffer, for updating the count */ + pthread_mutex_lock(&this->buffer_mutex); + this->buffer_count -= n; + pthread_mutex_unlock(&this->buffer_mutex); + + /* signal the writer that there's space in the buffer again */ + pthread_mutex_lock(&this->writer_mut); + pthread_cond_signal(&this->writer_cond); + pthread_mutex_unlock(&this->writer_mut); + } this->curpos += copied; @@ -452,7 +508,7 @@ static off_t rtp_plugin_get_current_pos (input_plugin_t *this_gen){ */ static uint32_t rtp_plugin_get_capabilities (input_plugin_t *this_gen) { - return INPUT_CAP_PREVIEW; + return 0; } /* @@ -478,58 +534,16 @@ static char* rtp_plugin_get_mrl (input_plugin_t *this_gen) { static int rtp_plugin_get_optional_data (input_plugin_t *this_gen, void *data, int data_type) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; - - if (data_type == INPUT_OPTIONAL_DATA_PREVIEW) - { - unsigned char *buf = (unsigned char *) data; - long pos, len, copied; - - pthread_mutex_lock(&this->buffer_mutex); - while (this->buffer_length < MAX_PREVIEW_SIZE) - { - LOG_MSG(this->stream->xine, _("RTP: waiting for preview data\n")); - - if (pthread_cond_timedwait(&this->buffer_notempty, - &this->buffer_mutex, - &this->preview_timeout) == ETIMEDOUT) - { - LOG_MSG(this->stream->xine, _("RTP: waiting for preview data: timeout\n")); - break; - } - } - - pos = this->buffer_start; - len = this->buffer_length < MAX_PREVIEW_SIZE - ? this->buffer_length : MAX_PREVIEW_SIZE; - copied = len; - - /* we really shouldn't wrap, but maybe we will. */ - if (pos + len > BUFFER_SIZE) - { - long n = BUFFER_SIZE - pos; - - memcpy(buf, this->buffer + pos, n); - - buf += n; - len -= n; - pos = 0; - } - - memcpy(buf, this->buffer + pos, len); - - pthread_mutex_unlock(&this->buffer_mutex); - - return copied; - } - else - { - return INPUT_OPTIONAL_UNSUPPORTED; - } + + /* TODO: this input plugin should support preview */ + return INPUT_OPTIONAL_UNSUPPORTED; } static void rtp_plugin_dispose (input_plugin_t *this_gen ) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + if(this->nbc) + nbc_close(this->nbc); if (this->rtp_running) { LOG_MSG(this->stream->xine, _("RTP: stopping reading thread...\n")); @@ -570,9 +584,6 @@ static int rtp_plugin_open (input_plugin_t *this_gen ) { abort(); } - this->preview_timeout.tv_sec = time(NULL) + 5; - this->preview_timeout.tv_nsec = 0; - return 1; } @@ -586,6 +597,7 @@ static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, int is_rtp = 0; int port = 7658; + mrl = strdup(data); if (!strncasecmp (mrl, "rtp://", 6)) { filename = &mrl[6]; @@ -617,16 +629,18 @@ static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, this->rtp_running = 0; pthread_mutex_init(&this->buffer_mutex, NULL); - pthread_cond_init(&this->buffer_notempty, NULL); - pthread_cond_init(&this->buffer_empty, NULL); + pthread_mutex_init(&this->reader_mut, NULL); + pthread_mutex_init(&this->writer_mut, NULL); + + pthread_cond_init(&this->reader_cond, NULL); + pthread_cond_init(&this->writer_cond, NULL); this->buffer = malloc(BUFFER_SIZE); - this->buffer_start = 0; - this->buffer_length = 0; - - this->preview_timeout.tv_sec = time(NULL) + 5; - this->preview_timeout.tv_nsec = 0; - + this->buffer_head = this->buffer; + this->buffer_tail = this->buffer; + this->buffer_count = 0; + this->curpos = 0; + this->input_plugin.open = rtp_plugin_open; this->input_plugin.get_capabilities = rtp_plugin_get_capabilities; this->input_plugin.read = rtp_plugin_read; @@ -639,7 +653,10 @@ static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, this->input_plugin.get_optional_data = rtp_plugin_get_optional_data; this->input_plugin.dispose = rtp_plugin_dispose; this->input_plugin.input_class = cls_gen; - + + this->nbc = NULL; + this->nbc = nbc_init(this->stream); + return &this->input_plugin; } @@ -666,6 +683,7 @@ static void *init_class (xine_t *xine, void *data) { rtp_input_class_t *this; + this = (rtp_input_class_t *) xine_xmalloc(sizeof(rtp_input_class_t)); this->config = xine->config; this->xine = xine; -- cgit v1.2.3