From 51fb11cadcea9b86eed7955c7ea0b2a02f3ee358 Mon Sep 17 00:00:00 2001 From: Miguel Freitas Date: Mon, 26 Jan 2004 20:07:33 +0000 Subject: add preview to rtp plugin patch by Graham Brooks CVS patchset: 6067 CVS date: 2004/01/26 20:07:33 --- src/input/input_rtp.c | 486 +++++++++++++++++++++++++------------------------- 1 file changed, 244 insertions(+), 242 deletions(-) (limited to 'src') diff --git a/src/input/input_rtp.c b/src/input/input_rtp.c index 6e24bcb90..411815b26 100644 --- a/src/input/input_rtp.c +++ b/src/input/input_rtp.c @@ -19,7 +19,6 @@ * * Xine input plugin for multicast video streams. * - * * This is something of an experiment - it doesn't work well yet. Originally * the intent was to read an rtp stream, from, for example, Cisco IP * Tv. That's still a long term goal but RTP doesn't fit well in an input @@ -134,7 +133,10 @@ typedef struct { int curpos; int rtp_running; - + + char preview[MAX_PREVIEW_SIZE]; + int preview_size; + nbc_t *nbc; pthread_mutex_t writer_mut; @@ -216,27 +218,24 @@ static int host_connect_attempt(struct in_addr ia, int port, xine_t *xine) { /* * */ -static int host_connect(const char *host, int port, xine_t *xine) { +static int host_connect(const char *host, int port, xine_t *xine) +{ struct hostent *h; int i; int s; h=gethostbyname(host); - if(h==NULL) - { - LOG_MSG(xine, _("unable to resolve '%s'.\n"), host); - return -1; - } - + if(h==NULL) { + LOG_MSG(xine, _("unable to resolve '%s'.\n"), host); + return -1; + } - for(i=0; h->h_addr_list[i]; i++) - { - struct in_addr ia; - memcpy(&ia, h->h_addr_list[i],4); - s = host_connect_attempt(ia, port, xine); - if(s != -1) - return s; - } + for(i=0; h->h_addr_list[i]; i++) { + struct in_addr ia; + memcpy(&ia, h->h_addr_list[i],4); + s = host_connect_attempt(ia, port, xine); + if (s != -1) return s; + } LOG_MSG(xine, _("unable to bind to '%s'.\n"), host); return -1; } @@ -245,144 +244,137 @@ static int host_connect(const char *host, int port, xine_t *xine) { * */ static void * input_plugin_read_loop(void *arg) { - rtp_input_plugin_t *this = (rtp_input_plugin_t *) arg; + + rtp_input_plugin_t *this = (rtp_input_plugin_t *) arg; unsigned char *data; long length; - while (1) - { - /* System calls are not a thread cancellation point in Linux - * pthreads. However, the RT signal sent to cancel the thread - * will cause recv() to return with EINTR, and we can manually - * check cancellation. - **/ - pthread_testcancel(); - length = recv(this->fh, this->packet_buffer, - sizeof(this->packet_buffer), 0); - pthread_testcancel(); - - if (length < 0) - { - if (errno != EINTR) - { - LOG_MSG(this->stream->xine, _("recv(): %s.\n"), strerror(errno)); - return NULL; - } - } - else - { - data = this->packet_buffer; - - if (this->is_rtp) - { - int pad, ext; - int csrc; - - /* Do minimal RTP parsing to extract payload. See - * http://www.faqs.org/rfcs/rfc1889.html for header format. - * - * WARNING: wholly untested code. I don't have any RTP sender. - **/ - - if (length < 12) - continue; - - pad = data[0] & 0x20; - ext = data[0] & 0x10; - csrc = data[0] & 0x0f; + while (1) { + + /* System calls are not a thread cancellation point in Linux + * pthreads. However, the RT signal sent to cancel the thread + * will cause recv() to return with EINTR, and we can manually + * check cancellation. + */ + + pthread_testcancel(); + length = recv(this->fh, this->packet_buffer, + sizeof(this->packet_buffer), 0); + pthread_testcancel(); + + if (length < 0) { + if (errno != EINTR) { + LOG_MSG(this->stream->xine, _("recv(): %s.\n"), strerror(errno)); + return NULL; + } + } + else { + data = this->packet_buffer; + + if (this->is_rtp) { + int pad, ext; + int csrc; + + /* Do minimal RTP parsing to extract payload. See + * http://www.faqs.org/rfcs/rfc1889.html for header format. + * + * WARNING: wholly untested code. I don't have any RTP sender. + */ + + if (length < 12) continue; + + pad = data[0] & 0x20; + ext = data[0] & 0x10; + csrc = data[0] & 0x0f; + + data += 12 + csrc * 4; + length -= 12 + csrc * 4; + + if (ext) { + long hlen; + + if (length < 4) continue; - data += 12 + csrc * 4; - length -= 12 + csrc * 4; + hlen = (data[2] << 8) | data[3]; + data += hlen; + length -= hlen; + } - if (ext) - { - long hlen; - - if (length < 4) - continue; - - hlen = (data[2] << 8) | data[3]; - data += hlen; - length -= hlen; - } - - if (pad) - { - if (length < 1) - continue; - - /* FIXME: is the pad length byte included in the - * length value or not? We assume it is not. - */ - length -= data[length - 1] + 1; - } - } - - /* insert data into cyclic buffer */ - while (length > 0) - { - /* work with a copy of buffer count, while the variable can - * be updated by the reader - */ - long buffer_count = this->buffer_count; - long n; - - /* - * if the buffer is full, wait for the reader - * to signal - */ - if(buffer_count >= BUFFER_SIZE) - { - pthread_mutex_lock(&this->writer_mut); - pthread_cond_wait(&this->writer_cond, &this->writer_mut); - pthread_mutex_unlock(&this->writer_mut); - /* update the buffer count again */ - buffer_count = this->buffer_count; - } - - /* Now there's enough space to write some bytes into the buffer - * determine how many bytes can be written. If the buffer wraps - * around, write in two pieces: from the head pointer to the - * end of the buffer and from the base to the remaining number - * of bytes. - */ - if(length > (BUFFER_SIZE - buffer_count)) - { - n = BUFFER_SIZE - buffer_count; - } - else - { - n = length; - } - - if(((this->buffer_head - this->buffer) + n) > BUFFER_SIZE) - { - n = BUFFER_SIZE - (this->buffer_head - this->buffer); - } - - /* The actual write... */ - memcpy(this->buffer_head, data, n); + if (pad) { + if (length < 1) + continue; - data += n; - length -= n; + /* FIXME: is the pad length byte included in the + * length value or not? We assume it is not. + */ + length -= data[length - 1] + 1; + } + } - /* update head pointer; and check for wrap around */ - this->buffer_head += n; - if(this->buffer_head - this->buffer >= BUFFER_SIZE) - this->buffer_head = this->buffer; + /* insert data into cyclic buffer */ + while (length > 0) { + + /* work with a copy of buffer count, while the variable can + * be updated by the reader + */ + + long buffer_count = this->buffer_count; + long n; + + /* + * if the buffer is full, wait for the reader + * to signal + */ + + if(buffer_count >= BUFFER_SIZE) { + pthread_mutex_lock(&this->writer_mut); + pthread_cond_wait(&this->writer_cond, &this->writer_mut); + pthread_mutex_unlock(&this->writer_mut); + /* update the buffer count again */ + buffer_count = this->buffer_count; + } - /* lock the mutex; for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); - this->buffer_count += n; - pthread_mutex_unlock(&this->buffer_mutex); - - /* signal the reader that there is new data */ - pthread_mutex_lock(&this->reader_mut); - pthread_cond_signal(&this->reader_cond); - pthread_mutex_unlock(&this->reader_mut); - } + /* Now there's enough space to write some bytes into the buffer + * determine how many bytes can be written. If the buffer wraps + * around, write in two pieces: from the head pointer to the + * end of the buffer and from the base to the remaining number + * of bytes. + */ + + if(length > (BUFFER_SIZE - buffer_count)) { + n = BUFFER_SIZE - buffer_count; + } + else { + n = length; } + + if(((this->buffer_head - this->buffer) + n) > BUFFER_SIZE) { + n = BUFFER_SIZE - (this->buffer_head - this->buffer); + } + + /* The actual write... */ + memcpy(this->buffer_head, data, n); + + data += n; + length -= n; + + /* update head pointer; and check for wrap around */ + this->buffer_head += n; + if(this->buffer_head - this->buffer >= BUFFER_SIZE) + this->buffer_head = this->buffer; + + /* lock the mutex; for updating the count */ + pthread_mutex_lock(&this->buffer_mutex); + this->buffer_count += n; + pthread_mutex_unlock(&this->buffer_mutex); + + /* signal the reader that there is new data */ + pthread_mutex_lock(&this->reader_mut); + pthread_cond_signal(&this->reader_cond); + pthread_mutex_unlock(&this->reader_mut); + } } + } } /* ***************************************************************** */ @@ -396,81 +388,79 @@ static off_t rtp_plugin_read (input_plugin_t *this_gen, struct timeval tv; struct timespec timeout; off_t copied = 0; + + while(length > 0) { - while(length > 0) - { - off_t n; - /* work with a copy of the buffer count, while the variable can - * be updated by the writer - */ - long buffer_count = this->buffer_count; - + off_t n; - /* - * if nothing in the buffer, wait for data for 5 seconds. If - * no data is received within this timeout, return the number - * of bytes already received (which is likely to be 0) - */ - if(buffer_count == 0) + /* work with a copy of the buffer count, while the variable can + * be updated by the writer + */ + + long buffer_count = this->buffer_count; + + /* + * if nothing in the buffer, wait for data for 5 seconds. If + * no data is received within this timeout, return the number + * of bytes already received (which is likely to be 0) + */ + + if(buffer_count == 0) { + gettimeofday(&tv, NULL); + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + 5; + + pthread_mutex_lock(&this->reader_mut); + if(pthread_cond_timedwait(&this->reader_cond, &this->reader_mut, &timeout) != 0) { - gettimeofday(&tv, NULL); - timeout.tv_nsec = tv.tv_usec * 1000; - timeout.tv_sec = tv.tv_sec + 5; - - pthread_mutex_lock(&this->reader_mut); - if(pthread_cond_timedwait(&this->reader_cond, &this->reader_mut, &timeout) != 0) - { - /* we timed out, no data available */ - pthread_mutex_unlock(&this->reader_mut); - return copied; - } + /* we timed out, no data available */ pthread_mutex_unlock(&this->reader_mut); - /* update the local buffer count variable again */ - buffer_count = this->buffer_count; + return copied; } + pthread_mutex_unlock(&this->reader_mut); + /* update the local buffer count variable again */ + buffer_count = this->buffer_count; + } - /* Now determine how many bytes can be read. If the buffer - * will wrap the buffer is read in two pieces, first read - * to the end of the buffer, wrap the tail pointer and - * update the buffer count. Finally read the second piece - * from the base to the remaining count - */ - if(length > buffer_count) - { - n = buffer_count; - } - else - { - n = length; - } - - if(((this->buffer_tail - this->buffer) + n) > BUFFER_SIZE) - { - n = BUFFER_SIZE - (this->buffer_tail - this->buffer); - } + /* Now determine how many bytes can be read. If the buffer + * will wrap the buffer is read in two pieces, first read + * to the end of the buffer, wrap the tail pointer and + * update the buffer count. Finally read the second piece + * from the base to the remaining count + */ + if(length > buffer_count) { + n = buffer_count; + } + else { + n = length; + } - /* the actual read */ - memcpy(buf, this->buffer_tail, n); - - buf += n; - copied += n; - length -= n; - - /* update the tail pointer, watch for wrap arounds */ - this->buffer_tail += n; - if(this->buffer_tail - this->buffer >= BUFFER_SIZE) - this->buffer_tail = this->buffer; - - /* lock the buffer, for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); - this->buffer_count -= n; - pthread_mutex_unlock(&this->buffer_mutex); - - /* signal the writer that there's space in the buffer again */ - pthread_mutex_lock(&this->writer_mut); - pthread_cond_signal(&this->writer_cond); - pthread_mutex_unlock(&this->writer_mut); + if(((this->buffer_tail - this->buffer) + n) > BUFFER_SIZE) { + n = BUFFER_SIZE - (this->buffer_tail - this->buffer); } + + /* the actual read */ + memcpy(buf, this->buffer_tail, n); + + buf += n; + copied += n; + length -= n; + + /* update the tail pointer, watch for wrap arounds */ + this->buffer_tail += n; + if(this->buffer_tail - this->buffer >= BUFFER_SIZE) + this->buffer_tail = this->buffer; + + /* lock the buffer, for updating the count */ + pthread_mutex_lock(&this->buffer_mutex); + this->buffer_count -= n; + pthread_mutex_unlock(&this->buffer_mutex); + + /* signal the writer that there's space in the buffer again */ + pthread_mutex_lock(&this->writer_mut); + pthread_cond_signal(&this->writer_cond); + pthread_mutex_unlock(&this->writer_mut); + } this->curpos += copied; @@ -489,8 +479,8 @@ static off_t rtp_plugin_seek (input_plugin_t *this_gen, /* * */ -static off_t rtp_plugin_get_length (input_plugin_t *this_gen) { +static off_t rtp_plugin_get_length (input_plugin_t *this_gen) { return -1; } @@ -507,15 +497,15 @@ static off_t rtp_plugin_get_current_pos (input_plugin_t *this_gen){ * */ static uint32_t rtp_plugin_get_capabilities (input_plugin_t *this_gen) { - - return 0; + + return INPUT_CAP_PREVIEW; } /* * */ static uint32_t rtp_plugin_get_blocksize (input_plugin_t *this_gen) { - + return 0; } @@ -524,7 +514,7 @@ static uint32_t rtp_plugin_get_blocksize (input_plugin_t *this_gen) { */ static char* rtp_plugin_get_mrl (input_plugin_t *this_gen) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; - + return this->mrl; } @@ -533,17 +523,33 @@ static char* rtp_plugin_get_mrl (input_plugin_t *this_gen) { */ static int rtp_plugin_get_optional_data (input_plugin_t *this_gen, void *data, int data_type) { - /* rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; */ - - /* TODO: this input plugin should support preview */ - return INPUT_OPTIONAL_UNSUPPORTED; + + rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + + /* Since this input plugin deals with stream data, we + * are not going to worry about retaining the data packet + * retrieved for review purposes. Hence, the first non-preview + * packet read made will return the 2nd packet from the UDP/RTP stream. + * The first packet is only used for the preview. + */ + + if (data_type == INPUT_OPTIONAL_DATA_PREVIEW) { + if (this->preview_size == 0) { + this->preview_size = rtp_plugin_read(this_gen, this->preview, MAX_PREVIEW_SIZE); + lprintf("Preview data length = %d\n", this->preview_size); + } + memcpy(data, this->preview, this->preview_size); + return this->preview_size; + } + else { + return INPUT_OPTIONAL_UNSUPPORTED; + } } static void rtp_plugin_dispose (input_plugin_t *this_gen ) { rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; - if(this->nbc) - nbc_close(this->nbc); + if (this->nbc) nbc_close(this->nbc); if (this->rtp_running) { LOG_MSG(this->stream->xine, _("RTP: stopping reading thread...\n")); @@ -552,9 +558,7 @@ static void rtp_plugin_dispose (input_plugin_t *this_gen ) { LOG_MSG(this->stream->xine, _("RTP: reading thread terminated\n")); } - - if (this->fh != -1) - close(this->fh); + if (this->fh != -1) close(this->fh); free(this->buffer); free(this->mrl); @@ -569,9 +573,7 @@ static int rtp_plugin_open (input_plugin_t *this_gen ) { this->fh = host_connect(this->filename, this->port, this->stream->xine); - if (this->fh == -1) { - return 0; - } + if (this->fh == -1) return 0; this->last_input_error = 0; this->input_eof = 0; @@ -583,20 +585,20 @@ static int rtp_plugin_open (input_plugin_t *this_gen ) { LOG_MSG(this->stream->xine, _("input_rtp: can't create new thread (%s)\n"), strerror(err)); abort(); } - + return 1; } static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, - xine_stream_t *stream, - const char *data) { + xine_stream_t *stream, + const char *data) { rtp_input_plugin_t *this; char *filename = NULL; char *pptr; char *mrl; int is_rtp = 0; int port = 7658; - + mrl = strdup(data); if (!strncasecmp (mrl, "rtp://", 6)) { @@ -620,13 +622,14 @@ static input_plugin_t *rtp_class_get_instance (input_class_t *cls_gen, } this = (rtp_input_plugin_t *) malloc(sizeof(rtp_input_plugin_t)); - this->stream = stream; - this->mrl = mrl; - this->filename = filename; - this->port = port; - this->is_rtp = is_rtp; - this->fh = -1; - this->rtp_running = 0; + this->stream = stream; + this->mrl = mrl; + this->filename = filename; + this->port = port; + this->is_rtp = is_rtp; + this->fh = -1; + this->rtp_running = 0; + this->preview_size = 0; pthread_mutex_init(&this->buffer_mutex, NULL); pthread_mutex_init(&this->reader_mut, NULL); @@ -695,11 +698,10 @@ static void *init_class (xine_t *xine, void *data) { this->input_class.get_autoplay_list = NULL; this->input_class.dispose = rtp_class_dispose; this->input_class.eject_media = NULL; - + return this; } - /* * exported plugin catalog entry */ -- cgit v1.2.3