diff options
author | Daniel Caujolle-Bert <f1rmb@users.sourceforge.net> | 2001-05-06 02:37:59 +0000 |
---|---|---|
committer | Daniel Caujolle-Bert <f1rmb@users.sourceforge.net> | 2001-05-06 02:37:59 +0000 |
commit | 8eb4cb861c548b58f229abf6008e4cc681e711e7 (patch) | |
tree | 317705cd28512251b24fc61c709730476e829676 /src/input/input_rtp.c | |
parent | f2181d0641c5ebd684f8e9113d14bb49bb390eef (diff) | |
download | xine-lib-8eb4cb861c548b58f229abf6008e4cc681e711e7.tar.gz xine-lib-8eb4cb861c548b58f229abf6008e4cc681e711e7.tar.bz2 |
Re-enable and sync input plugins. RTP one should be broken.
CVS patchset: 63
CVS date: 2001/05/06 02:37:59
Diffstat (limited to 'src/input/input_rtp.c')
-rw-r--r-- | src/input/input_rtp.c | 552 |
1 files changed, 329 insertions, 223 deletions
diff --git a/src/input/input_rtp.c b/src/input/input_rtp.c index 88fa5784d..ec01a1d26 100644 --- a/src/input/input_rtp.c +++ b/src/input/input_rtp.c @@ -38,13 +38,13 @@ * fd is open for read on mpeg stream, sock for write on a multicast socket. * * while (1) { - * /* read pack */ + * \/\* read pack *\/ * read(fd, buf, 2048) - * /* end of stream */ + * \/\* end of stream *\/ * if (buf[3] == 0xb9) * return 0; * - * /* extract the system reference clock, srcb, from the pack */ + * \/\* extract the system reference clock, srcb, from the pack *\/ * * send_at = srcb/90000.0; * while (time_now < send_at) { @@ -80,116 +80,191 @@ #include <sys/time.h> #include <stdlib.h> +#include "xine_internal.h" +#include "monitor.h" #include "input_plugin.h" -static int last_input_error; -static int input_eof; +#define RTP_BLOCKSIZE 2048 + static uint32_t xine_debug; typedef struct _input_buffer { - struct _input_buffer *next; - unsigned char *buf; -} input_buffer; - + struct _input_buffer *next; + unsigned char *buf; +} input_buffer_t; + #define N_BUFFERS 128 #define IBUFFER_SIZE 2048 -static int input_file_handle = -1; +typedef struct { + input_plugin_t input_plugin; + + char *mrl; + config_values_t *config; -input_buffer *free_buffers; -input_buffer **fifo_head; -input_buffer fifo_tail; + int fh; + + input_buffer_t *free_buffers; + input_buffer_t **fifo_head; + input_buffer_t fifo_tail; + + pthread_mutex_t buffer_mutex; + pthread_cond_t buffer_notempty; + + int last_input_error; + int input_eof; -pthread_mutex_t buffer_mutex; -pthread_cond_t buffer_notempty; + pthread_t reader_thread; -static pthread_t reader_thread; + int curpos; -static void * input_plugin_read_loop(void *); +} rtp_input_plugin_t; +/* ***************************************************************** */ +/* Private functions */ +/* ***************************************************************** */ + +/* + * + */ static int host_connect_attempt(struct in_addr ia, int port) { - int s=socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); - struct sockaddr_in sin; + int s=socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); + struct sockaddr_in sin; + + if(s==-1) { + perror("socket"); + return -1; + } + + sin.sin_family = AF_INET; + sin.sin_addr = ia; + sin.sin_port = htons(port); + + /* datagram socket */ + if (bind(s, (struct sockaddr *)&sin, sizeof(sin))) { + perror("bind failed"); + exit(1); + } + /* multicast ? */ + if ((ntohl(sin.sin_addr.s_addr) >> 28) == 0xe) { + struct ip_mreqn mreqn; - if(s==-1) { - perror("socket"); - return -1; - } - - sin.sin_family = AF_INET; - sin.sin_addr = ia; - sin.sin_port = htons(port); - - /* datagram socket */ - if (bind(s, (struct sockaddr *)&sin, sizeof(sin))) { - perror("bind failed"); - exit(1); - } - /* multicast ? */ - if ((ntohl(sin.sin_addr.s_addr) >> 28) == 0xe) { - struct ip_mreqn mreqn; - - mreqn.imr_multiaddr.s_addr = sin.sin_addr.s_addr; - mreqn.imr_address.s_addr = INADDR_ANY; - mreqn.imr_ifindex = 0; - if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreqn,sizeof(mreqn))) { - perror("setsockopt IP_ADD_MEMBERSHIP failed (multicast kernel?)"); - exit(1); - } + mreqn.imr_multiaddr.s_addr = sin.sin_addr.s_addr; + mreqn.imr_address.s_addr = INADDR_ANY; + mreqn.imr_ifindex = 0; + if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreqn,sizeof(mreqn))) { + perror("setsockopt IP_ADD_MEMBERSHIP failed (multicast kernel?)"); + exit(1); } - - return s; + } + + return s; } +/* + * + */ static int host_connect(const char *host, int port) { - struct hostent *h; - int i; - int s; - - h=gethostbyname(host); - if(h==NULL) - { - fprintf(stderr,"unable to resolve '%s'.\n", host); - return -1; - } - - - for(i=0; h->h_addr_list[i]; i++) - { - struct in_addr ia; - memcpy(&ia, h->h_addr_list[i],4); - s=host_connect_attempt(ia, port); - if(s != -1) - return s; - } - fprintf(stderr, "unable to connect to '%s'.\n", host); - return -1; + struct hostent *h; + int i; + int s; + + h=gethostbyname(host); + if(h==NULL) + { + fprintf(stderr,"unable to resolve '%s'.\n", host); + return -1; + } + + + for(i=0; h->h_addr_list[i]; i++) + { + struct in_addr ia; + memcpy(&ia, h->h_addr_list[i],4); + s=host_connect_attempt(ia, port); + if(s != -1) + return s; + } + fprintf(stderr, "unable to connect to '%s'.\n", host); + return -1; } -static void input_plugin_init (void) { - int bufn; - - for (bufn = 0; bufn < N_BUFFERS; bufn++) { - input_buffer *buf = malloc(sizeof(input_buffer)); - if (!buf) { - fprintf(stderr, "unable to allocate input buffer.\n"); - exit(1); - } - buf->buf = malloc(IBUFFER_SIZE); - if (!buf->buf) { - fprintf(stderr, "unable to allocate input buffer.\n"); - exit(1); - } - buf->next = free_buffers; - free_buffers = buf; +/* + * + */ +static void * input_plugin_read_loop(void *arg) { + rtp_input_plugin_t **this = (rtp_input_plugin_t **) arg; + input_buffer_t *buf; + int r; + unsigned short seq = 0; + static int warned = 0; + /* char whirly[] = "/-\\|"; */ + /* int gig = 0; */ + + while (1) { + pthread_mutex_lock (&(*this)->buffer_mutex); + /* we expect to be able to get a free buffer - possibly we + could be a bit more reasonable but this will do for now. */ + if (!(*this)->free_buffers) { + (*this)->input_eof = 1; + if (!warned) { + printf("OUCH - ran out of buffers\n"); + warned = 1; + } + pthread_cond_signal(&(*this)->buffer_notempty); + continue; + } + warned = 0; + buf = (*this)->free_buffers; + (*this)->free_buffers = (*this)->free_buffers->next; + pthread_mutex_unlock (&(*this)->buffer_mutex); + + /* printf("%c\r", whirly[(gig++ % 4)]); */ + /* fflush(stdout); */ + r = read((*this)->fh, buf->buf, IBUFFER_SIZE); + if (r < 0) { + /* descriptor may be closed by main thread */ + if (r != EBADF) + (*this)->last_input_error = r; + (*this)->curpos = 0; + return 0; + } + if (r == 0) { + (*this)->input_eof = 1; + (*this)->curpos = 0; + return 0; } + (*this)->curpos += r; + + /* For now - check whether we're dropping input */ + if (++seq != *(unsigned short *)buf->buf) { + printf("OUCH - dropped input packet %d %d\n", seq, *(unsigned short *)buf->buf); + seq = *(unsigned short *)buf->buf; + } + buf->buf[1] = buf->buf[0] = 0; + pthread_mutex_lock (&(*this)->buffer_mutex); + buf->next = *(*this)->fifo_head; + *(*this)->fifo_head = buf; + (*this)->fifo_head = &buf->next; + pthread_cond_signal(&(*this)->buffer_notempty); + pthread_mutex_unlock (&(*this)->buffer_mutex); + } } +/* ***************************************************************** */ +/* END OF PRIVATES */ +/* ***************************************************************** */ + +/* + * + */ +static int rtp_plugin_open (input_plugin_t *this_gen, char *mrl ) { + rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + char *filename; + char *pptr; + int port = 7658; + pthread_attr_t thread_attrs; -static int input_plugin_open (char *mrl) { - char *filename; - char *pptr; - int port = 7658; - pthread_attr_t thread_attrs; + this->mrl = mrl; if (!strncmp (mrl, "rtp:",4)) { filename = &mrl[4]; @@ -210,175 +285,206 @@ static int input_plugin_open (char *mrl) { sscanf(pptr,"%d", &port); } - if (input_file_handle != -1) - close(input_file_handle); - input_file_handle = host_connect(filename, port); + if (this->fh != -1) + close(this->fh); + this->fh = host_connect(filename, port); - if (input_file_handle == -1) { + if (this->fh == -1) { return 0; } - last_input_error = 0; - input_eof = 0; - fifo_tail.next = &fifo_tail; - fifo_head = &fifo_tail.next; + this->last_input_error = 0; + this->input_eof = 0; + this->fifo_tail.next = &this->fifo_tail; + this->fifo_head = &this->fifo_tail.next; + this->curpos = 0; - pthread_cond_init(&buffer_notempty, NULL); + pthread_cond_init(&this->buffer_notempty, NULL); pthread_attr_init(&thread_attrs); pthread_attr_setdetachstate(&thread_attrs, PTHREAD_CREATE_DETACHED); - pthread_create(&reader_thread, &thread_attrs, input_plugin_read_loop, (void *)input_file_handle); + pthread_create(&this->reader_thread, &thread_attrs, + input_plugin_read_loop, (void *)&this); pthread_attr_destroy(&thread_attrs); return 1; } -static uint32_t input_plugin_read (char *buf, uint32_t nlen) { - input_buffer *ibuf; - - pthread_mutex_lock (&buffer_mutex); - while (fifo_tail.next == &fifo_tail) { - if (input_eof) { - pthread_mutex_unlock (&buffer_mutex); - return 0; +/* + * + */ +static off_t rtp_plugin_read (input_plugin_t *this_gen, + char *buf, off_t nlen) { + rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + input_buffer_t *ibuf; + + pthread_mutex_lock (&this->buffer_mutex); + while (this->fifo_tail.next == &this->fifo_tail) { + if (this->input_eof) { + pthread_mutex_unlock (&this->buffer_mutex); + return 0; } - if (last_input_error) { - pthread_mutex_unlock (&buffer_mutex); - return last_input_error; + if (this->last_input_error) { + pthread_mutex_unlock (&this->buffer_mutex); + return this->last_input_error; } - pthread_cond_wait(&buffer_notempty, &buffer_mutex); - } - ibuf = fifo_tail.next; - fifo_tail.next = fifo_tail.next->next; - - /* Is FIFO now empty */ - if (fifo_tail.next == &fifo_tail) - fifo_head = &fifo_tail.next; - - pthread_mutex_unlock (&buffer_mutex); - - memcpy(buf, ibuf->buf, nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE); + pthread_cond_wait(&this->buffer_notempty, &this->buffer_mutex); + } + ibuf = this->fifo_tail.next; + this->fifo_tail.next = this->fifo_tail.next->next; + + /* Is FIFO now empty */ + if (this->fifo_tail.next == &this->fifo_tail) + this->fifo_head = &this->fifo_tail.next; + + pthread_mutex_unlock (&this->buffer_mutex); + + memcpy(buf, ibuf->buf, nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE); + + pthread_mutex_lock (&this->buffer_mutex); + ibuf->next = this->free_buffers; + this->free_buffers = ibuf; + pthread_mutex_unlock (&this->buffer_mutex); + + return nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE; +} - pthread_mutex_lock (&buffer_mutex); - ibuf->next = free_buffers; - free_buffers = ibuf; - pthread_mutex_unlock (&buffer_mutex); +/* + * + */ +static off_t rtp_plugin_get_length (input_plugin_t *this_gen) { - return nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE; + return 0; } -static void * input_plugin_read_loop(void *arg) { - int inf = (int) arg; - input_buffer *buf; - int r; - unsigned short seq = 0; - static int warned = 0; - - char whirly[] = "/-\\|"; - int gig = 0; - - while (1) { - pthread_mutex_lock (&buffer_mutex); - /* we expect to be able to get a free buffer - possibly we - could be a bit more reasonable but this will do for now. */ - if (!free_buffers) { - input_eof = 1; - if (!warned) { - printf("OUCH - ran out of buffers\n"); - warned = 1; - } - pthread_cond_signal(&buffer_notempty); - continue; - } - warned = 0; - buf = free_buffers; - free_buffers = free_buffers->next; - pthread_mutex_unlock (&buffer_mutex); - - /* printf("%c\r", whirly[(gig++ % 4)]); */ - /* fflush(stdout); */ - r = read(inf, buf->buf, IBUFFER_SIZE); - if (r < 0) { - /* descriptor may be closed by main thread */ - if (r != EBADF) - last_input_error = r; - return 0; - } - if (r == 0) { - input_eof = 1; - return 0; - } - - /* For now - check whether we're dropping input */ - if (++seq != *(unsigned short *)buf->buf) { - printf("OUCH - dropped input packet %d %d\n", seq, *(unsigned short *)buf->buf); - seq = *(unsigned short *)buf->buf; - } - buf->buf[1] = buf->buf[0] = 0; - pthread_mutex_lock (&buffer_mutex); - buf->next = *fifo_head; - *fifo_head = buf; - fifo_head = &buf->next; - pthread_cond_signal(&buffer_notempty); - pthread_mutex_unlock (&buffer_mutex); - } +/* + * + */ +static off_t rtp_plugin_get_current_pos (input_plugin_t *this_gen){ + rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + + return (this->curpos * IBUFFER_SIZE); } -static off_t input_plugin_seek (off_t offset, int origin) { +/* + * + */ +static uint32_t rtp_plugin_get_capabilities (input_plugin_t *this_gen) { - return -1; + return INPUT_CAP_NOCAP; } -static uint32_t input_plugin_get_length (void) { - return 0; -} +/* + * + */ +static uint32_t rtp_plugin_get_blocksize (input_plugin_t *this_gen) { -static uint32_t input_plugin_get_capabilities (void) { - return 0; + return RTP_BLOCKSIZE; } -static uint32_t input_plugin_get_blocksize (void) { - return 2048; -} +/* + * + */ +static void rtp_plugin_close (input_plugin_t *this_gen) { + rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; -static void input_plugin_close (void) { - close(input_file_handle); - input_file_handle = -1; + close(this->fh); + this->fh = -1; } -static int input_plugin_eject (void) { +/* + * + */ +static int rtp_plugin_eject_media (input_plugin_t *this_gen) { + return 1; } -static char *input_plugin_get_identifier (void) { +/* + * + */ +static char *rtp_plugin_get_description (input_plugin_t *this_gen) { + + return "rtp input plugin as shipped with xine"; +} + +/* + * + */ +static char *rtp_plugin_get_identifier (input_plugin_t *this_gen) { + return "RTP"; } -static int input_plugin_is_branch_possible (char *next_mrl) { - return 0; +/* + * + */ +static char* rtp_plugin_get_mrl (input_plugin_t *this_gen) { + rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen; + + return this->mrl; } -static input_plugin_t plugin_op = { - NULL, - NULL, - input_plugin_init, - input_plugin_open, - input_plugin_read, - input_plugin_seek, - input_plugin_get_length, - input_plugin_get_capabilities, - NULL, - input_plugin_get_blocksize, - input_plugin_eject, - input_plugin_close, - input_plugin_get_identifier, - NULL, - input_plugin_is_branch_possible, - NULL -}; - -input_plugin_t *input_plugin_getinfo(uint32_t dbglvl) { - - xine_debug = dbglvl; - - return &plugin_op; +/* + * + */ +input_plugin_t *init_input_plugin (int iface, config_values_t *config) { + rtp_input_plugin_t *this; + + xine_debug = config->lookup_int (config, "xine_debug", 0); + + switch (iface) { + case 1: { + int bufn; + + this = (rtp_input_plugin_t *) malloc (sizeof (rtp_input_plugin_t)); + + for (bufn = 0; bufn < N_BUFFERS; bufn++) { + input_buffer_t *buf = malloc(sizeof(input_buffer_t)); + if (!buf) { + fprintf(stderr, "unable to allocate input buffer.\n"); + exit(1); + } + buf->buf = malloc(IBUFFER_SIZE); + if (!buf->buf) { + fprintf(stderr, "unable to allocate input buffer.\n"); + exit(1); + } + buf->next = this->free_buffers; + this->free_buffers = buf; + } + + this->input_plugin.interface_version = INPUT_PLUGIN_IFACE_VERSION; + this->input_plugin.get_capabilities = rtp_plugin_get_capabilities; + this->input_plugin.open = rtp_plugin_open; + this->input_plugin.read = rtp_plugin_read; + this->input_plugin.read_block = NULL; + this->input_plugin.seek = NULL; + this->input_plugin.get_current_pos = rtp_plugin_get_current_pos; + this->input_plugin.get_length = rtp_plugin_get_length; + this->input_plugin.get_blocksize = rtp_plugin_get_blocksize; + this->input_plugin.eject_media = rtp_plugin_eject_media; + this->input_plugin.close = rtp_plugin_close; + this->input_plugin.get_identifier = rtp_plugin_get_identifier; + this->input_plugin.get_description = rtp_plugin_get_description; + this->input_plugin.get_dir = NULL; + this->input_plugin.get_mrl = rtp_plugin_get_mrl; + this->input_plugin.get_autoplay_list = NULL; + this->input_plugin.get_clut = NULL; + + this->fh = -1; + this->mrl = NULL; + this->config = config; + + return (input_plugin_t *) this; + } + break; + default: + fprintf(stderr, + "Rtp input plugin doesn't support plugin API version %d.\n" + "PLUGIN DISABLED.\n" + "This means there's a version mismatch between xine and this input" + "plugin.\nInstalling current input plugins should help.\n", + iface); + return NULL; + } } |