summaryrefslogtreecommitdiff
path: root/src/input/input_rtp.c
diff options
context:
space:
mode:
authorDaniel Caujolle-Bert <f1rmb@users.sourceforge.net>2001-05-06 02:37:59 +0000
committerDaniel Caujolle-Bert <f1rmb@users.sourceforge.net>2001-05-06 02:37:59 +0000
commit8eb4cb861c548b58f229abf6008e4cc681e711e7 (patch)
tree317705cd28512251b24fc61c709730476e829676 /src/input/input_rtp.c
parentf2181d0641c5ebd684f8e9113d14bb49bb390eef (diff)
downloadxine-lib-8eb4cb861c548b58f229abf6008e4cc681e711e7.tar.gz
xine-lib-8eb4cb861c548b58f229abf6008e4cc681e711e7.tar.bz2
Re-enable and sync input plugins. RTP one should be broken.
CVS patchset: 63 CVS date: 2001/05/06 02:37:59
Diffstat (limited to 'src/input/input_rtp.c')
-rw-r--r--src/input/input_rtp.c552
1 files changed, 329 insertions, 223 deletions
diff --git a/src/input/input_rtp.c b/src/input/input_rtp.c
index 88fa5784d..ec01a1d26 100644
--- a/src/input/input_rtp.c
+++ b/src/input/input_rtp.c
@@ -38,13 +38,13 @@
* fd is open for read on mpeg stream, sock for write on a multicast socket.
*
* while (1) {
- * /* read pack */
+ * \/\* read pack *\/
* read(fd, buf, 2048)
- * /* end of stream */
+ * \/\* end of stream *\/
* if (buf[3] == 0xb9)
* return 0;
*
- * /* extract the system reference clock, srcb, from the pack */
+ * \/\* extract the system reference clock, srcb, from the pack *\/
*
* send_at = srcb/90000.0;
* while (time_now < send_at) {
@@ -80,116 +80,191 @@
#include <sys/time.h>
#include <stdlib.h>
+#include "xine_internal.h"
+#include "monitor.h"
#include "input_plugin.h"
-static int last_input_error;
-static int input_eof;
+#define RTP_BLOCKSIZE 2048
+
static uint32_t xine_debug;
typedef struct _input_buffer {
- struct _input_buffer *next;
- unsigned char *buf;
-} input_buffer;
-
+ struct _input_buffer *next;
+ unsigned char *buf;
+} input_buffer_t;
+
#define N_BUFFERS 128
#define IBUFFER_SIZE 2048
-static int input_file_handle = -1;
+typedef struct {
+ input_plugin_t input_plugin;
+
+ char *mrl;
+ config_values_t *config;
-input_buffer *free_buffers;
-input_buffer **fifo_head;
-input_buffer fifo_tail;
+ int fh;
+
+ input_buffer_t *free_buffers;
+ input_buffer_t **fifo_head;
+ input_buffer_t fifo_tail;
+
+ pthread_mutex_t buffer_mutex;
+ pthread_cond_t buffer_notempty;
+
+ int last_input_error;
+ int input_eof;
-pthread_mutex_t buffer_mutex;
-pthread_cond_t buffer_notempty;
+ pthread_t reader_thread;
-static pthread_t reader_thread;
+ int curpos;
-static void * input_plugin_read_loop(void *);
+} rtp_input_plugin_t;
+/* ***************************************************************** */
+/* Private functions */
+/* ***************************************************************** */
+
+/*
+ *
+ */
static int host_connect_attempt(struct in_addr ia, int port) {
- int s=socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
- struct sockaddr_in sin;
+ int s=socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ struct sockaddr_in sin;
+
+ if(s==-1) {
+ perror("socket");
+ return -1;
+ }
+
+ sin.sin_family = AF_INET;
+ sin.sin_addr = ia;
+ sin.sin_port = htons(port);
+
+ /* datagram socket */
+ if (bind(s, (struct sockaddr *)&sin, sizeof(sin))) {
+ perror("bind failed");
+ exit(1);
+ }
+ /* multicast ? */
+ if ((ntohl(sin.sin_addr.s_addr) >> 28) == 0xe) {
+ struct ip_mreqn mreqn;
- if(s==-1) {
- perror("socket");
- return -1;
- }
-
- sin.sin_family = AF_INET;
- sin.sin_addr = ia;
- sin.sin_port = htons(port);
-
- /* datagram socket */
- if (bind(s, (struct sockaddr *)&sin, sizeof(sin))) {
- perror("bind failed");
- exit(1);
- }
- /* multicast ? */
- if ((ntohl(sin.sin_addr.s_addr) >> 28) == 0xe) {
- struct ip_mreqn mreqn;
-
- mreqn.imr_multiaddr.s_addr = sin.sin_addr.s_addr;
- mreqn.imr_address.s_addr = INADDR_ANY;
- mreqn.imr_ifindex = 0;
- if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreqn,sizeof(mreqn))) {
- perror("setsockopt IP_ADD_MEMBERSHIP failed (multicast kernel?)");
- exit(1);
- }
+ mreqn.imr_multiaddr.s_addr = sin.sin_addr.s_addr;
+ mreqn.imr_address.s_addr = INADDR_ANY;
+ mreqn.imr_ifindex = 0;
+ if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreqn,sizeof(mreqn))) {
+ perror("setsockopt IP_ADD_MEMBERSHIP failed (multicast kernel?)");
+ exit(1);
}
-
- return s;
+ }
+
+ return s;
}
+/*
+ *
+ */
static int host_connect(const char *host, int port) {
- struct hostent *h;
- int i;
- int s;
-
- h=gethostbyname(host);
- if(h==NULL)
- {
- fprintf(stderr,"unable to resolve '%s'.\n", host);
- return -1;
- }
-
-
- for(i=0; h->h_addr_list[i]; i++)
- {
- struct in_addr ia;
- memcpy(&ia, h->h_addr_list[i],4);
- s=host_connect_attempt(ia, port);
- if(s != -1)
- return s;
- }
- fprintf(stderr, "unable to connect to '%s'.\n", host);
- return -1;
+ struct hostent *h;
+ int i;
+ int s;
+
+ h=gethostbyname(host);
+ if(h==NULL)
+ {
+ fprintf(stderr,"unable to resolve '%s'.\n", host);
+ return -1;
+ }
+
+
+ for(i=0; h->h_addr_list[i]; i++)
+ {
+ struct in_addr ia;
+ memcpy(&ia, h->h_addr_list[i],4);
+ s=host_connect_attempt(ia, port);
+ if(s != -1)
+ return s;
+ }
+ fprintf(stderr, "unable to connect to '%s'.\n", host);
+ return -1;
}
-static void input_plugin_init (void) {
- int bufn;
-
- for (bufn = 0; bufn < N_BUFFERS; bufn++) {
- input_buffer *buf = malloc(sizeof(input_buffer));
- if (!buf) {
- fprintf(stderr, "unable to allocate input buffer.\n");
- exit(1);
- }
- buf->buf = malloc(IBUFFER_SIZE);
- if (!buf->buf) {
- fprintf(stderr, "unable to allocate input buffer.\n");
- exit(1);
- }
- buf->next = free_buffers;
- free_buffers = buf;
+/*
+ *
+ */
+static void * input_plugin_read_loop(void *arg) {
+ rtp_input_plugin_t **this = (rtp_input_plugin_t **) arg;
+ input_buffer_t *buf;
+ int r;
+ unsigned short seq = 0;
+ static int warned = 0;
+ /* char whirly[] = "/-\\|"; */
+ /* int gig = 0; */
+
+ while (1) {
+ pthread_mutex_lock (&(*this)->buffer_mutex);
+ /* we expect to be able to get a free buffer - possibly we
+ could be a bit more reasonable but this will do for now. */
+ if (!(*this)->free_buffers) {
+ (*this)->input_eof = 1;
+ if (!warned) {
+ printf("OUCH - ran out of buffers\n");
+ warned = 1;
+ }
+ pthread_cond_signal(&(*this)->buffer_notempty);
+ continue;
+ }
+ warned = 0;
+ buf = (*this)->free_buffers;
+ (*this)->free_buffers = (*this)->free_buffers->next;
+ pthread_mutex_unlock (&(*this)->buffer_mutex);
+
+ /* printf("%c\r", whirly[(gig++ % 4)]); */
+ /* fflush(stdout); */
+ r = read((*this)->fh, buf->buf, IBUFFER_SIZE);
+ if (r < 0) {
+ /* descriptor may be closed by main thread */
+ if (r != EBADF)
+ (*this)->last_input_error = r;
+ (*this)->curpos = 0;
+ return 0;
+ }
+ if (r == 0) {
+ (*this)->input_eof = 1;
+ (*this)->curpos = 0;
+ return 0;
}
+ (*this)->curpos += r;
+
+ /* For now - check whether we're dropping input */
+ if (++seq != *(unsigned short *)buf->buf) {
+ printf("OUCH - dropped input packet %d %d\n", seq, *(unsigned short *)buf->buf);
+ seq = *(unsigned short *)buf->buf;
+ }
+ buf->buf[1] = buf->buf[0] = 0;
+ pthread_mutex_lock (&(*this)->buffer_mutex);
+ buf->next = *(*this)->fifo_head;
+ *(*this)->fifo_head = buf;
+ (*this)->fifo_head = &buf->next;
+ pthread_cond_signal(&(*this)->buffer_notempty);
+ pthread_mutex_unlock (&(*this)->buffer_mutex);
+ }
}
+/* ***************************************************************** */
+/* END OF PRIVATES */
+/* ***************************************************************** */
+
+/*
+ *
+ */
+static int rtp_plugin_open (input_plugin_t *this_gen, char *mrl ) {
+ rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen;
+ char *filename;
+ char *pptr;
+ int port = 7658;
+ pthread_attr_t thread_attrs;
-static int input_plugin_open (char *mrl) {
- char *filename;
- char *pptr;
- int port = 7658;
- pthread_attr_t thread_attrs;
+ this->mrl = mrl;
if (!strncmp (mrl, "rtp:",4)) {
filename = &mrl[4];
@@ -210,175 +285,206 @@ static int input_plugin_open (char *mrl) {
sscanf(pptr,"%d", &port);
}
- if (input_file_handle != -1)
- close(input_file_handle);
- input_file_handle = host_connect(filename, port);
+ if (this->fh != -1)
+ close(this->fh);
+ this->fh = host_connect(filename, port);
- if (input_file_handle == -1) {
+ if (this->fh == -1) {
return 0;
}
- last_input_error = 0;
- input_eof = 0;
- fifo_tail.next = &fifo_tail;
- fifo_head = &fifo_tail.next;
+ this->last_input_error = 0;
+ this->input_eof = 0;
+ this->fifo_tail.next = &this->fifo_tail;
+ this->fifo_head = &this->fifo_tail.next;
+ this->curpos = 0;
- pthread_cond_init(&buffer_notempty, NULL);
+ pthread_cond_init(&this->buffer_notempty, NULL);
pthread_attr_init(&thread_attrs);
pthread_attr_setdetachstate(&thread_attrs, PTHREAD_CREATE_DETACHED);
- pthread_create(&reader_thread, &thread_attrs, input_plugin_read_loop, (void *)input_file_handle);
+ pthread_create(&this->reader_thread, &thread_attrs,
+ input_plugin_read_loop, (void *)&this);
pthread_attr_destroy(&thread_attrs);
return 1;
}
-static uint32_t input_plugin_read (char *buf, uint32_t nlen) {
- input_buffer *ibuf;
-
- pthread_mutex_lock (&buffer_mutex);
- while (fifo_tail.next == &fifo_tail) {
- if (input_eof) {
- pthread_mutex_unlock (&buffer_mutex);
- return 0;
+/*
+ *
+ */
+static off_t rtp_plugin_read (input_plugin_t *this_gen,
+ char *buf, off_t nlen) {
+ rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen;
+ input_buffer_t *ibuf;
+
+ pthread_mutex_lock (&this->buffer_mutex);
+ while (this->fifo_tail.next == &this->fifo_tail) {
+ if (this->input_eof) {
+ pthread_mutex_unlock (&this->buffer_mutex);
+ return 0;
}
- if (last_input_error) {
- pthread_mutex_unlock (&buffer_mutex);
- return last_input_error;
+ if (this->last_input_error) {
+ pthread_mutex_unlock (&this->buffer_mutex);
+ return this->last_input_error;
}
- pthread_cond_wait(&buffer_notempty, &buffer_mutex);
- }
- ibuf = fifo_tail.next;
- fifo_tail.next = fifo_tail.next->next;
-
- /* Is FIFO now empty */
- if (fifo_tail.next == &fifo_tail)
- fifo_head = &fifo_tail.next;
-
- pthread_mutex_unlock (&buffer_mutex);
-
- memcpy(buf, ibuf->buf, nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE);
+ pthread_cond_wait(&this->buffer_notempty, &this->buffer_mutex);
+ }
+ ibuf = this->fifo_tail.next;
+ this->fifo_tail.next = this->fifo_tail.next->next;
+
+ /* Is FIFO now empty */
+ if (this->fifo_tail.next == &this->fifo_tail)
+ this->fifo_head = &this->fifo_tail.next;
+
+ pthread_mutex_unlock (&this->buffer_mutex);
+
+ memcpy(buf, ibuf->buf, nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE);
+
+ pthread_mutex_lock (&this->buffer_mutex);
+ ibuf->next = this->free_buffers;
+ this->free_buffers = ibuf;
+ pthread_mutex_unlock (&this->buffer_mutex);
+
+ return nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE;
+}
- pthread_mutex_lock (&buffer_mutex);
- ibuf->next = free_buffers;
- free_buffers = ibuf;
- pthread_mutex_unlock (&buffer_mutex);
+/*
+ *
+ */
+static off_t rtp_plugin_get_length (input_plugin_t *this_gen) {
- return nlen < IBUFFER_SIZE ? nlen : IBUFFER_SIZE;
+ return 0;
}
-static void * input_plugin_read_loop(void *arg) {
- int inf = (int) arg;
- input_buffer *buf;
- int r;
- unsigned short seq = 0;
- static int warned = 0;
-
- char whirly[] = "/-\\|";
- int gig = 0;
-
- while (1) {
- pthread_mutex_lock (&buffer_mutex);
- /* we expect to be able to get a free buffer - possibly we
- could be a bit more reasonable but this will do for now. */
- if (!free_buffers) {
- input_eof = 1;
- if (!warned) {
- printf("OUCH - ran out of buffers\n");
- warned = 1;
- }
- pthread_cond_signal(&buffer_notempty);
- continue;
- }
- warned = 0;
- buf = free_buffers;
- free_buffers = free_buffers->next;
- pthread_mutex_unlock (&buffer_mutex);
-
- /* printf("%c\r", whirly[(gig++ % 4)]); */
- /* fflush(stdout); */
- r = read(inf, buf->buf, IBUFFER_SIZE);
- if (r < 0) {
- /* descriptor may be closed by main thread */
- if (r != EBADF)
- last_input_error = r;
- return 0;
- }
- if (r == 0) {
- input_eof = 1;
- return 0;
- }
-
- /* For now - check whether we're dropping input */
- if (++seq != *(unsigned short *)buf->buf) {
- printf("OUCH - dropped input packet %d %d\n", seq, *(unsigned short *)buf->buf);
- seq = *(unsigned short *)buf->buf;
- }
- buf->buf[1] = buf->buf[0] = 0;
- pthread_mutex_lock (&buffer_mutex);
- buf->next = *fifo_head;
- *fifo_head = buf;
- fifo_head = &buf->next;
- pthread_cond_signal(&buffer_notempty);
- pthread_mutex_unlock (&buffer_mutex);
- }
+/*
+ *
+ */
+static off_t rtp_plugin_get_current_pos (input_plugin_t *this_gen){
+ rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen;
+
+ return (this->curpos * IBUFFER_SIZE);
}
-static off_t input_plugin_seek (off_t offset, int origin) {
+/*
+ *
+ */
+static uint32_t rtp_plugin_get_capabilities (input_plugin_t *this_gen) {
- return -1;
+ return INPUT_CAP_NOCAP;
}
-static uint32_t input_plugin_get_length (void) {
- return 0;
-}
+/*
+ *
+ */
+static uint32_t rtp_plugin_get_blocksize (input_plugin_t *this_gen) {
-static uint32_t input_plugin_get_capabilities (void) {
- return 0;
+ return RTP_BLOCKSIZE;
}
-static uint32_t input_plugin_get_blocksize (void) {
- return 2048;
-}
+/*
+ *
+ */
+static void rtp_plugin_close (input_plugin_t *this_gen) {
+ rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen;
-static void input_plugin_close (void) {
- close(input_file_handle);
- input_file_handle = -1;
+ close(this->fh);
+ this->fh = -1;
}
-static int input_plugin_eject (void) {
+/*
+ *
+ */
+static int rtp_plugin_eject_media (input_plugin_t *this_gen) {
+
return 1;
}
-static char *input_plugin_get_identifier (void) {
+/*
+ *
+ */
+static char *rtp_plugin_get_description (input_plugin_t *this_gen) {
+
+ return "rtp input plugin as shipped with xine";
+}
+
+/*
+ *
+ */
+static char *rtp_plugin_get_identifier (input_plugin_t *this_gen) {
+
return "RTP";
}
-static int input_plugin_is_branch_possible (char *next_mrl) {
- return 0;
+/*
+ *
+ */
+static char* rtp_plugin_get_mrl (input_plugin_t *this_gen) {
+ rtp_input_plugin_t *this = (rtp_input_plugin_t *) this_gen;
+
+ return this->mrl;
}
-static input_plugin_t plugin_op = {
- NULL,
- NULL,
- input_plugin_init,
- input_plugin_open,
- input_plugin_read,
- input_plugin_seek,
- input_plugin_get_length,
- input_plugin_get_capabilities,
- NULL,
- input_plugin_get_blocksize,
- input_plugin_eject,
- input_plugin_close,
- input_plugin_get_identifier,
- NULL,
- input_plugin_is_branch_possible,
- NULL
-};
-
-input_plugin_t *input_plugin_getinfo(uint32_t dbglvl) {
-
- xine_debug = dbglvl;
-
- return &plugin_op;
+/*
+ *
+ */
+input_plugin_t *init_input_plugin (int iface, config_values_t *config) {
+ rtp_input_plugin_t *this;
+
+ xine_debug = config->lookup_int (config, "xine_debug", 0);
+
+ switch (iface) {
+ case 1: {
+ int bufn;
+
+ this = (rtp_input_plugin_t *) malloc (sizeof (rtp_input_plugin_t));
+
+ for (bufn = 0; bufn < N_BUFFERS; bufn++) {
+ input_buffer_t *buf = malloc(sizeof(input_buffer_t));
+ if (!buf) {
+ fprintf(stderr, "unable to allocate input buffer.\n");
+ exit(1);
+ }
+ buf->buf = malloc(IBUFFER_SIZE);
+ if (!buf->buf) {
+ fprintf(stderr, "unable to allocate input buffer.\n");
+ exit(1);
+ }
+ buf->next = this->free_buffers;
+ this->free_buffers = buf;
+ }
+
+ this->input_plugin.interface_version = INPUT_PLUGIN_IFACE_VERSION;
+ this->input_plugin.get_capabilities = rtp_plugin_get_capabilities;
+ this->input_plugin.open = rtp_plugin_open;
+ this->input_plugin.read = rtp_plugin_read;
+ this->input_plugin.read_block = NULL;
+ this->input_plugin.seek = NULL;
+ this->input_plugin.get_current_pos = rtp_plugin_get_current_pos;
+ this->input_plugin.get_length = rtp_plugin_get_length;
+ this->input_plugin.get_blocksize = rtp_plugin_get_blocksize;
+ this->input_plugin.eject_media = rtp_plugin_eject_media;
+ this->input_plugin.close = rtp_plugin_close;
+ this->input_plugin.get_identifier = rtp_plugin_get_identifier;
+ this->input_plugin.get_description = rtp_plugin_get_description;
+ this->input_plugin.get_dir = NULL;
+ this->input_plugin.get_mrl = rtp_plugin_get_mrl;
+ this->input_plugin.get_autoplay_list = NULL;
+ this->input_plugin.get_clut = NULL;
+
+ this->fh = -1;
+ this->mrl = NULL;
+ this->config = config;
+
+ return (input_plugin_t *) this;
+ }
+ break;
+ default:
+ fprintf(stderr,
+ "Rtp input plugin doesn't support plugin API version %d.\n"
+ "PLUGIN DISABLED.\n"
+ "This means there's a version mismatch between xine and this input"
+ "plugin.\nInstalling current input plugins should help.\n",
+ iface);
+ return NULL;
+ }
}