diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/demuxers/demux_ts.c | 381 |
1 files changed, 278 insertions, 103 deletions
diff --git a/src/demuxers/demux_ts.c b/src/demuxers/demux_ts.c index aa401e419..148ca3ade 100644 --- a/src/demuxers/demux_ts.c +++ b/src/demuxers/demux_ts.c @@ -17,7 +17,7 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA * - * $Id: demux_ts.c,v 1.46 2002/05/25 19:19:17 siggi Exp $ + * $Id: demux_ts.c,v 1.47 2002/05/29 20:57:29 miguelfreitas Exp $ * * Demultiplexer for MPEG2 Transport Streams. * @@ -34,6 +34,14 @@ * * Date Author * ---- ------ + * + * 27-May-2002 Giovanni Baronetti and Mauro Borghi <mauro.borghi@tilab.com> + * - force PMT reparsing when PMT PID changes + * - accept non seekable input plugins -- FIX? + * - accept dvb as input plugin + * - optimised read operations + * - modified resync code + * * 16-May-2002 Thibaut Mattern <tmattern@noos.fr> * Fix demux loop * 07-Jan-2002 Andr Draszik <andid@gmx.net> @@ -55,7 +63,7 @@ * Compiles! * - * TODO: do without memcpys, seeking (if possible), preview buffers + * TODO: do without memcpys, preview buffers */ #ifdef HAVE_CONFIG_H @@ -73,7 +81,7 @@ #include "xineutils.h" #include "demux.h" -#define VALID_MRLS "fifo,stdin" +#define VALID_MRLS "fifo,stdin,dvb" #define VALID_ENDS "m2t,ts,trp" #ifdef __GNUC__ @@ -99,19 +107,25 @@ /* #define TS_LOG #define TS_PMT_LOG +#define TS_READ_STATS // activates read statistics generation */ /* - * The maximum number of PIDs we are prepared to handle in a single program is the - * number that fits in a single-packet PMT. + * The maximum number of PIDs we are prepared to handle in a single program + * is the number that fits in a single-packet PMT. */ #define PKT_SIZE 188 #define BODY_SIZE (188 - 4) #define MAX_PIDS ((BODY_SIZE - 1 - 13) / 4) #define MAX_PMTS ((BODY_SIZE - 1 - 13) / 4) #define SYNC_BYTE 0x47 -#define MIN_SYNCS 5 -#define BUF_SIZE ((MIN_SYNCS+1) * PKT_SIZE) + +#define MIN_SYNCS 3 +#define NPKT_PER_READ 100 + +#define BUF_SIZE (NPKT_PER_READ * PKT_SIZE) + +#define MAX_PES_BUF_SIZE 2048 #define NULL_PID 0x1fff #define INVALID_PID ((unsigned int)(-1)) @@ -138,11 +152,9 @@ typedef struct { uint32_t type; int64_t pts; buf_element_t *buf; - int pes_buf_next; - int pes_len; - int pes_len_zero; unsigned int counter; - int broken_pes; + int corrupted_pes; + uint32_t buffered_bytes; } demux_ts_media; @@ -193,6 +205,14 @@ typedef struct { int send_end_buffers; int ignore_scr_discont; int send_newpts; + + unsigned int scrambled_pids[MAX_PIDS]; + unsigned int scrambled_npids; + +#ifdef TS_READ_STATS + uint32_t rstat[NPKT_PER_READ + 1]; +#endif + } demux_ts; static void demux_ts_build_crc32_table(demux_ts *this) { @@ -366,6 +386,14 @@ static void demux_ts_parse_pat (demux_ts *this, unsigned char *original_pkt, program_count++; } this->program_number[program_count] = program_number; + + /* force PMT reparsing when pmt_pid changes */ + if (this->pmt_pid[program_count] != pmt_pid) { + this->pmt_pid[program_count] = pmt_pid; + this->audioPid = INVALID_PID; + this->videoPid = INVALID_PID; + } + this->pmt_pid[program_count] = pmt_pid; this->pmt[program_count] = NULL; this->pmt_write_ptr[program_count] = NULL; @@ -511,87 +539,77 @@ static int demux_ts_parse_pes_header (demux_ts_media *m, return 0 ; } + /* - * buffer arriving pes data - * Input is 188 bytes of Transport stream - * Build a PES packet. PES packets can get as big as 65536 - * If PES packet length was empty(zero) work it out based on seeing the next PUS. - * Once we have a complete PES packet, give PES packet a valid length field. - * then queue it. The queuing routine might have to cut it up to make bits < 4096. FIXME: implement cut up. - * Currently if PES packets are >4096, corruption occurs. + * buffer arriving pes data */ - static void demux_ts_buffer_pes(demux_ts *this, unsigned char *ts, unsigned int mediaIndex, unsigned int pus, unsigned int cc, unsigned int len) { - buf_element_t *buf; - demux_ts_media *m = &this->media[mediaIndex]; + if (!m->fifo) { #ifdef TS_LOG LOG_MSG(this->xine, _("fifo unavailable (%d)\n"), mediaIndex); #endif - return; /* To avoid segfault if video out or audio out plugin not loaded */ - } - /* - * By checking the CC here, we avoid the need to check for the no-payload - * case (i.e. adaptation field only) when it does not get bumped. - */ + /* By checking the CC here, we avoid the need to check for the no-payload + case (i.e. adaptation field only) when it does not get bumped. */ if (m->counter != INVALID_CC) { if ((m->counter & 0x0f) != cc) { - LOG_MSG (this->xine, _("demux_ts: dropped input packet cc: %d " - "expected: %d\n"), - cc, m->counter); + LOG_MSG (this->xine, _("demux_ts: unexpected cc %d (expected %d)\n"), + cc, m->counter); } } - m->counter = cc; m->counter++; - if (pus) { - - /* new PES packet */ - - if (ts[0] || ts[1] || ts[2] != 1) { - LOG_MSG_STDERR (this->xine, _("demux_ts: PUSI set but no PES header " - "(corrupt stream?)\n")); - return; + if (pus) { /* new PES packet */ + + if (m->buffered_bytes) { + m->buf->content = m->buf->mem; + m->buf->size = m->buffered_bytes; + m->buf->type = m->type; + m->buf->pts = m->pts; + m->buf->decoder_info[0] = 1; + m->buf->input_pos = this->input->get_current_pos(this->input); + m->fifo->put(m->fifo, m->buf); + m->buffered_bytes = 0; } - - if (!demux_ts_parse_pes_header (m, ts, len, this->xine)) { - m->broken_pes = 1; - LOG_MSG (this->xine, _("demux_ts: broken pes encountered\n")); - } else { - check_newpts( this, m->pts ); - - m->broken_pes = 0; - buf = m->fifo->buffer_pool_alloc(m->fifo); - memcpy (buf->mem, ts+len-m->size, m->size); /* FIXME: reconstruct parser to do without memcpys */ - buf->content = buf->mem; - buf->size = m->size; - buf->type = m->type; - buf->pts = m->pts; - buf->decoder_info[0] = 1; - m->fifo->put (m->fifo, buf); + if (!demux_ts_parse_pes_header(m, ts, len, this->xine)) { + m->corrupted_pes = 1; + LOG_MSG (this->xine, _("demux_ts: corrupted pes encountered\n")); } + else { + check_newpts(this, m->pts); + m->corrupted_pes = 0; + m->buf = m->fifo->buffer_pool_alloc(m->fifo); + memcpy(m->buf->mem, ts+len-m->size, m->size); + m->buffered_bytes = m->size; + } + } - } else if (!m->broken_pes) { - buf = m->fifo->buffer_pool_alloc(m->fifo); - memcpy (buf->mem, ts, len); /* FIXME: reconstruct parser to do without memcpys */ - buf->content = buf->mem; - buf->size = len; - buf->type = m->type; - buf->pts = 0; - buf->input_pos = this->input->get_current_pos(this->input); - buf->decoder_info[0] = 1; - m->fifo->put (m->fifo, buf); + else if (!m->corrupted_pes) { /* no pus -- PES packet continuation */ + + if ((m->buffered_bytes + len) > MAX_PES_BUF_SIZE) { + m->buf->content = m->buf->mem; + m->buf->size = m->buffered_bytes; + m->buf->type = m->type; + m->buf->pts = m->pts; + m->buf->decoder_info[0] = 1; + m->buf->input_pos = this->input->get_current_pos(this->input); + m->fifo->put(m->fifo, m->buf); + m->buffered_bytes = 0; + m->buf = m->fifo->buffer_pool_alloc(m->fifo); + } + memcpy(m->buf->mem + m->buffered_bytes, ts, len); + m->buffered_bytes += len; } } @@ -608,11 +626,11 @@ static void demux_ts_pes_new(demux_ts *this, /* new PID seen - initialise stuff */ m->pid = pid; m->fifo = fifo; - m->buf = 0; - m->pes_buf_next = 0; - m->pes_len = 0; + if (m->buf != NULL) m->buf->free_buffer(m->buf); + m->buf = NULL; m->counter = INVALID_CC; - m->broken_pes = 1; + m->corrupted_pes = 1; + m->buffered_bytes = 0; } /* @@ -909,52 +927,139 @@ static void demux_ts_parse_pmt (demux_ts *this, } } -void correct_for_sync(demux_ts *this, uint8_t *buf) { - int32_t n, read_length; - if((buf[0] == SYNC_BYTE) && (buf[PKT_SIZE] == SYNC_BYTE) && - (buf[PKT_SIZE*2] == SYNC_BYTE) && (buf[PKT_SIZE*3] == SYNC_BYTE)) { - return; + +/* + * + */ +static int sync_correct(demux_ts *this, uint8_t *buf, int32_t npkt_read) { + + int p = 0; + int n = 0; + int i = 0; + int sync_ok = 0; + int read_length; + + fprintf(stderr, "demux_ts: about to resync!\n"); + + for (p=0; p < npkt_read; p++) { + for(n=0; n < PKT_SIZE; n++) { + sync_ok = 1; + for (i=0; i < MIN(MIN_SYNCS, npkt_read - p); i++) { + if (buf[n + ((i+p) * PKT_SIZE)] != SYNC_BYTE) { + sync_ok = 0; + break; + } + } + if (sync_ok) break; + } + if (sync_ok) break; } - for(n=1;n<PKT_SIZE;n++) { - if((buf[n] == SYNC_BYTE) && (buf[n+PKT_SIZE] == SYNC_BYTE) && - (buf[n+(PKT_SIZE*2)] == SYNC_BYTE) && (buf[n+(PKT_SIZE*3)] == SYNC_BYTE)) { - /* Found sync, fill in */ - memmove(&buf[0],&buf[n],((PKT_SIZE*MIN_SYNCS)-n)); - read_length = this->input->read(this->input, &buf[(PKT_SIZE*MIN_SYNCS)-n], n); - return; + + if (sync_ok) { + /* Found sync, fill in */ + memmove(&buf[0], &buf[n + p * PKT_SIZE], + ((PKT_SIZE * (npkt_read - p)) - n)); + read_length = this->input->read(this->input, + &buf[(PKT_SIZE * (npkt_read - p)) - n], + n + p * PKT_SIZE); + /* FIXME: when read_length is not as required... we now stop demuxing */ + if (read_length != (n + p * PKT_SIZE)) { + fprintf(stderr, "demux_ts sync_correct: sync found, but read failed\n"); + return 0; } } - LOG_MSG(this->xine, _("RE-Sync failed\n")); /* Sync up here */ - return; + else { + fprintf(stderr, "demux_ts sync_correct: sync not found! Stop demuxing\n"); + return 0; + } + fprintf(stderr, "demux_ts: resync successful!\n"); + return 1; +} + +/* + * + */ +static int sync_detect(demux_ts *this, uint8_t *buf, int32_t npkt_read) { + + int i, sync_ok; + + sync_ok = 1; + for (i=0; i < MIN(MIN_SYNCS, npkt_read); i++) { + if (buf[i * PKT_SIZE] != SYNC_BYTE) { + sync_ok = 0; + break; + } + } + if (!sync_ok) return sync_correct(this, buf, npkt_read); + return sync_ok; } -/* Main synchronisation routine. +/* + * Main synchronisation routine. */ - static unsigned char * demux_synchronise(demux_ts * this) { - static int32_t packet_number=MIN_SYNCS; + + static int32_t packet_number = 0; + // NEW: var to keep track of number of last read packets + static int32_t npkt_read = 0; + static int32_t read_zero = 0; + static uint8_t buf[BUF_SIZE]; /* This should change to a malloc. */ - uint8_t *return_pointer = NULL; - int32_t n, read_length; - - if (packet_number == MIN_SYNCS) { - for(n=0;n<MIN_SYNCS;n++) { - read_length = this->input->read(this->input, &buf[n*PKT_SIZE], PKT_SIZE); - if(read_length != PKT_SIZE) { - this->status = DEMUX_FINISHED; - return NULL; + uint8_t *return_pointer = NULL; + int32_t read_length; + if (packet_number >= npkt_read) { + + /* NEW: handle read returning less packets than NPKT_PER_READ... */ + do { + read_length = this->input->read(this->input, buf, + PKT_SIZE * NPKT_PER_READ); + if (read_length % PKT_SIZE) { + fprintf(stderr, + "demux_ts: read returned %d bytes (not a multiple of %d!)\n", + read_length, PKT_SIZE); + this->status = DEMUX_FINISHED; + return NULL; + } + npkt_read = read_length / PKT_SIZE; + +#ifdef TS_READ_STATS + this->rstat[npkt_read]++; +#endif + // what if npkt_read < 5 ? --> ok in sync_detect + + // NEW: stop demuxing if read returns 0 a few times... (200) + + if (npkt_read == 0) { + // fprintf(stderr, "demux_ts: read 0 packets! (%d)\n", read_zero); + read_zero++; } + else read_zero = 0; + + if (read_zero > 200) { + fprintf(stderr, "demux_ts: read 0 packets too many times!\n"); + this->status = DEMUX_FINISHED; + return NULL; + } + + } while (! read_length); + + packet_number = 0; + + if (!sync_detect(this, &buf[0], npkt_read)) { + fprintf(stderr, "demux_ts: sync error.\n"); + this->status = DEMUX_FINISHED; + return NULL; } - packet_number=0; - correct_for_sync(this,&buf[0]); + } - return_pointer=&buf[PKT_SIZE*packet_number]; + return_pointer = &buf[PKT_SIZE * packet_number]; packet_number++; return return_pointer; } + static uint32_t demux_ts_adaptation_field_parse( uint8_t *data, uint32_t adaptation_field_length) { uint32_t discontinuity_indicator=0; uint32_t random_access_indicator=0; @@ -1053,6 +1158,7 @@ static void demux_ts_parse_packet (demux_ts *this) { unsigned int data_offset; unsigned int data_len; uint32_t program_count; + int i; /* get next synchronised packet, or NULL */ originalPkt = demux_synchronise(this); @@ -1080,6 +1186,20 @@ static void demux_ts_parse_packet (demux_ts *this) { LOG_MSG_STDERR(this->xine, _("demux error! transport error\n")); return; } + if (transport_scrambling_control) { + if (this->videoPid == pid) { + fprintf(stderr, "demux_ts: selected videoPid is scrambled. Stopping.\n"); + this->status = DEMUX_FINISHED; + } + for (i=0; i < this->scrambled_npids; i++) { + if (this->scrambled_pids[i] == pid) return; + } + this->scrambled_pids[this->scrambled_npids] = pid; + this->scrambled_npids++; + // LOG_MSG_STDERR(this->xine, _("demux_ts: PID %d is scrambled!\n"), pid); + fprintf(stderr, "demux_ts: PID %d is scrambled!\n", pid); + return; + } data_offset = 4; if (adaptation_field_control & 0x1) { @@ -1183,6 +1303,7 @@ static void demux_ts_parse_packet (demux_ts *this) { } } + /* * Sit in a loop eating data. */ @@ -1190,7 +1311,9 @@ static void *demux_ts_loop(void *gen_this) { demux_ts *this = (demux_ts *)gen_this; buf_element_t *buf; + int i; + pthread_mutex_lock( &this->mutex ); /* do-while needed to seek after demux finished */ do { @@ -1211,14 +1334,21 @@ static void *demux_ts_loop(void *gen_this) { xine_usec_sleep(100000); pthread_mutex_lock( &this->mutex ); } - + } while( this->status == DEMUX_OK ); #ifdef TS_LOG printf ("demux_ts: demux loop finished (status: %d)\n", this->status); #endif - + + for (i = 0; i < MAX_PIDS; i++) { + if (this->media[i].buf != NULL) { + this->media[i].buf->free_buffer(this->media[i].buf); + this->media[i].buf = NULL; + } + } + this->status = DEMUX_FINISHED; if (this->send_end_buffers) { buf = this->video_fifo->buffer_pool_alloc (this->video_fifo); @@ -1241,7 +1371,7 @@ static void *demux_ts_loop(void *gen_this) { } static void demux_ts_close(demux_plugin_t *gen_this) { - free (gen_this); + free(gen_this); } static char *demux_ts_get_id(void) { @@ -1272,6 +1402,13 @@ static int demux_ts_open(demux_plugin_t *this_gen, input_plugin_t *input, case STAGE_BY_CONTENT: { uint8_t buf[4096]; + // Mauro -- Say DEMUX_CAN_HANDLE with dvb input plugin, even if + // non-seekable and non-previewable. + if (strncasecmp(input->get_identifier(input), "dvb", 4) == 0) { + this->input = input; + return DEMUX_CAN_HANDLE; + } + if((input->get_capabilities(input) & INPUT_CAP_SEEKABLE) != 0) { input->seek(input, 0, SEEK_SET); @@ -1372,6 +1509,9 @@ static int demux_ts_start(demux_plugin_t *this_gen, pthread_mutex_lock( &this->mutex ); + this->videoPid = INVALID_PID; + this->audioPid = INVALID_PID; + if( !this->thread_running ) { this->video_fifo = video_fifo; this->audio_fifo = audio_fifo; @@ -1404,14 +1544,15 @@ static int demux_ts_start(demux_plugin_t *this_gen, demux_ts_build_crc32_table(this); + this->status = DEMUX_OK ; if( !this->thread_running ) { /* * Now start demuxing. */ - this->status = DEMUX_OK ; this->send_end_buffers = 1; this->last_PCR = 0; this->thread_running = 1; + this->scrambled_npids = 0; if ((err = pthread_create(&this->thread, NULL, demux_ts_loop, this)) != 0) { LOG_MSG_STDERR(this->xine, _("demux_ts: can't create new thread (%s)\n"), strerror(err)); @@ -1444,6 +1585,31 @@ static void demux_ts_stop(demux_plugin_t *this_gen) buf_element_t *buf; void *p; +#ifdef TS_READ_STATS + int i, prev_zero; + uint32_t pkt_total; + + pkt_total = 0; + for (i=0; i<=NPKT_PER_READ; i++) { + pkt_total += this->rstat[i]; + } + prev_zero = 1; + for (i=0; i<=NPKT_PER_READ; i++) { + if (this->rstat[i]) { + fprintf(stderr, "%3d pkts --> %6.3f%% [ %d ]\n", i, + (this->rstat[i] * 100.0 / pkt_total), this->rstat[i]); + prev_zero = 0; + } + else if (prev_zero) { + + } + else { + fprintf(stderr, "...\n"); + prev_zero = 1; + } + } +#endif + pthread_mutex_lock( &this->mutex ); if (!this->thread_running) { @@ -1524,8 +1690,10 @@ demux_plugin_t *init_demuxer_plugin(int iface, xine_t *xine) { /* * Initialise our specialised data. */ - for (i = 0; i < MAX_PIDS; i++) + for (i = 0; i < MAX_PIDS; i++) { this->media[i].pid = INVALID_PID; + this->media[i].buf = NULL; + } for (i = 0; i < MAX_PMTS; i++) { this->program_number[i] = INVALID_PROGRAM; @@ -1537,6 +1705,7 @@ demux_plugin_t *init_demuxer_plugin(int iface, xine_t *xine) { this->programNumber = INVALID_PROGRAM; this->pcrPid = INVALID_PID; this->PCR = 0; + this->scrambled_npids = 0; this->videoPid = INVALID_PID; this->audioPid = INVALID_PID; @@ -1545,6 +1714,12 @@ demux_plugin_t *init_demuxer_plugin(int iface, xine_t *xine) { this->status = DEMUX_FINISHED; pthread_mutex_init( &this->mutex, NULL ); +#ifdef TS_READ_STATS + for (i=0; i<=NPKT_PER_READ; i++) { + this->rstat[i] = 0; + } +#endif + return (demux_plugin_t *)this; } |