summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThibaut Mattern <tmattern@users.sourceforge.net>2006-01-27 19:37:15 +0000
committerThibaut Mattern <tmattern@users.sourceforge.net>2006-01-27 19:37:15 +0000
commit5941590357c91b94bc0008160781b3ecb0e2acd9 (patch)
tree248a384bc5c818715c871b2c6d2e1af388ad8179
parentd2cae45649cba868d864c74c512d64cc37541096 (diff)
downloadxine-lib-5941590357c91b94bc0008160781b3ecb0e2acd9.tar.gz
xine-lib-5941590357c91b94bc0008160781b3ecb0e2acd9.tar.bz2
Fixed bugs.
Add a close function to handle EndOfStream. Tested with 1 writer thread and 2 reader threads. CVS patchset: 7851 CVS date: 2006/01/27 19:37:15
-rw-r--r--src/xine-utils/ring_buffer.c123
-rw-r--r--src/xine-utils/ring_buffer.h22
2 files changed, 115 insertions, 30 deletions
diff --git a/src/xine-utils/ring_buffer.c b/src/xine-utils/ring_buffer.c
index 6825681f0..9fe14f30f 100644
--- a/src/xine-utils/ring_buffer.c
+++ b/src/xine-utils/ring_buffer.c
@@ -17,17 +17,19 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA
*
- * $Id: ring_buffer.c,v 1.1 2006/01/27 07:55:18 tmattern Exp $
+ * $Id: ring_buffer.c,v 1.2 2006/01/27 19:37:15 tmattern Exp $
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
+#include <pthread.h>
#include <stdlib.h>
#include <inttypes.h>
-#include <pthread.h>
#include <assert.h>
+#include <stdio.h>
+#include <string.h>
#include "pool.h"
#include "list.h"
#include "ring_buffer.h"
@@ -81,6 +83,8 @@ struct xine_ring_buffer_s {
uint8_t *extra_buffer;
size_t extra_buffer_size;
+ int EOS;
+
pthread_mutex_t lock;
};
@@ -89,6 +93,7 @@ xine_ring_buffer_t *xine_ring_buffer_new(size_t size) {
xine_ring_buffer_t *new_ring_buffer = malloc(sizeof(xine_ring_buffer_t));
new_ring_buffer->buffer = malloc(size);
+ new_ring_buffer->buffer_size = size;
new_ring_buffer->alloc_list = xine_list_new();
new_ring_buffer->get_list = xine_list_new();
@@ -105,18 +110,21 @@ xine_ring_buffer_t *xine_ring_buffer_new(size_t size) {
new_ring_buffer->tail_release = new_ring_buffer->buffer;
new_ring_buffer->free_size = size;
- new_ring_buffer->free_size_cond = PTHREAD_COND_INITIALIZER;
+ pthread_cond_init(&new_ring_buffer->free_size_cond, NULL);
new_ring_buffer->free_size_needed = 0;
new_ring_buffer->full_size = 0;
- new_ring_buffer->full_size_cond = PTHREAD_COND_INITIALIZER;
+ pthread_cond_init(&new_ring_buffer->full_size_cond, NULL);
new_ring_buffer->full_size_needed = 0;
- new_ring_buffer->lock = PTHREAD_MUTEX_INITIALIZER;
+
+ pthread_mutex_init(&new_ring_buffer->lock, NULL);
new_ring_buffer->buffer_end = new_ring_buffer->buffer + size;
new_ring_buffer->extra_buffer = malloc(RING_BUFFER_EXTRA_BUFFER_SIZE);
new_ring_buffer->extra_buffer_size = RING_BUFFER_EXTRA_BUFFER_SIZE;
+ new_ring_buffer->EOS = 0;
+
return new_ring_buffer;
}
@@ -130,14 +138,48 @@ void xine_ring_buffer_delete(xine_ring_buffer_t *ring_buffer) {
free (ring_buffer);
}
+static void xine_ring_buffer_display_stat(xine_ring_buffer_t *ring_buffer) {
+#if DEBUG_RING_BUFFER
+ size_t free_size, full_size;
+
+ printf("alloc: free_size1=%d full_size1=%d\n",
+ ring_buffer->free_size, ring_buffer->full_size);
+ if (ring_buffer->tail_release > ring_buffer->head_alloc) {
+ free_size = ring_buffer->tail_release-ring_buffer->head_alloc;
+ } else {
+ free_size = ring_buffer->tail_release +
+ (ring_buffer->buffer_end - ring_buffer->buffer) -
+ ring_buffer->head_alloc;
+ }
+ if (ring_buffer->head > ring_buffer->tail) {
+ full_size = ring_buffer->head - ring_buffer->tail;
+ } else {
+ full_size = ring_buffer->head +
+ (ring_buffer->buffer_end - ring_buffer->buffer) -
+ ring_buffer->tail;
+ }
+ printf("alloc: free_size2=%d full_size2=%d\n", free_size, full_size);
+ printf("alloc: head_alloc=%d, head=%d, tail=%d, tail_release=%d\n",
+ ring_buffer->head_alloc - ring_buffer->buffer,
+ ring_buffer->head - ring_buffer->buffer,
+ ring_buffer->tail_release - ring_buffer->buffer,
+ ring_buffer->tail - ring_buffer->buffer);
+#endif
+}
+
void *xine_ring_buffer_alloc(xine_ring_buffer_t *ring_buffer, size_t size) {
int ok = 0;
xine_ring_buffer_chunk_t *chunk;
+ assert(ring_buffer);
+
pthread_mutex_lock(&ring_buffer->lock);
+
+ xine_ring_buffer_display_stat(ring_buffer);
+
do {
while (size > ring_buffer->free_size) {
- // we need more free room
+ /* we need more free room */
ring_buffer->free_size_needed++;
pthread_cond_wait (&ring_buffer->free_size_cond, &ring_buffer->lock);
ring_buffer->free_size_needed--;
@@ -163,6 +205,7 @@ void *xine_ring_buffer_alloc(xine_ring_buffer_t *ring_buffer, size_t size) {
xine_list_push_back(ring_buffer->alloc_list, chunk);
ring_buffer->head_alloc += size;
+ ring_buffer->free_size -= size;
pthread_mutex_unlock(&ring_buffer->lock);
return chunk->mem;
@@ -188,12 +231,12 @@ void xine_ring_buffer_put(xine_ring_buffer_t *ring_buffer, void *buffer) {
assert(chunk);
/* found associated chunk, is it the first in the list ? */
if (!prev_chunk) {
- /* increment ringbuffer head, and full_size */
- ring_buffer->head += chunk->size;
- ring_buffer->full_size += chunk->size;
if (ring_buffer->head == ring_buffer->buffer_end) {
ring_buffer->head = ring_buffer->buffer;
}
+ /* increment ring_buffer head, and full_size */
+ ring_buffer->head += chunk->size;
+ ring_buffer->full_size += chunk->size;
/* notify */
if (ring_buffer->full_size_needed) {
@@ -203,27 +246,34 @@ void xine_ring_buffer_put(xine_ring_buffer_t *ring_buffer, void *buffer) {
/* merge with previous chunk */
prev_chunk->size += chunk->size;
}
- xine_list_remove(ring_buffer->alloc_list, chunk);
+ xine_list_remove(ring_buffer->alloc_list, ite);
xine_pool_put(ring_buffer->chunk_pool, chunk);
pthread_mutex_unlock(&ring_buffer->lock);
}
-void *xine_ring_buffer_get(xine_ring_buffer_t *ring_buffer, size_t size) {
+void *xine_ring_buffer_get(xine_ring_buffer_t *ring_buffer, size_t size, size_t *rsize) {
xine_ring_buffer_chunk_t *chunk;
size_t continuous_size;
uint8_t *mem_chunk;
+ assert(ring_buffer);
+ assert(rsize);
pthread_mutex_lock(&ring_buffer->lock);
- while (size > ring_buffer->full_size) {
- // we need more free room
+ while ((size > ring_buffer->full_size) && !ring_buffer->EOS) {
+ /* we need more free room */
ring_buffer->full_size_needed++;
pthread_cond_wait (&ring_buffer->full_size_cond, &ring_buffer->lock);
ring_buffer->full_size_needed--;
}
- continuous_size = ring_buffer->buffer_end - ring_buffer->tail_release;
+ if (size > ring_buffer->full_size) {
+ size = ring_buffer->full_size;
+ }
+
+ continuous_size = ring_buffer->buffer_end - ring_buffer->tail;
if (size > continuous_size) {
+
/* we can't get a continuous buffer of this size from the ring buffer,
we accumulate the ringbuffer content into the extra buffer */
if (ring_buffer->extra_buffer_size < size) {
@@ -231,15 +281,16 @@ void *xine_ring_buffer_get(xine_ring_buffer_t *ring_buffer, size_t size) {
ring_buffer->extra_buffer_size = size;
}
memcpy(ring_buffer->extra_buffer,
- ring_buffer->tail_release,
+ ring_buffer->tail,
continuous_size);
memcpy(ring_buffer->extra_buffer + continuous_size,
ring_buffer->buffer,
size - continuous_size);
- ring_buffer->tail_release = ring_buffer->buffer - continuous_size;
mem_chunk = ring_buffer->extra_buffer;
+ ring_buffer->tail = ring_buffer->buffer + size - continuous_size;
} else {
- mem_chunk = ring_buffer->tail_release;
+ mem_chunk = ring_buffer->tail;
+ ring_buffer->tail += size;
}
/* create a new chunk and add it to the allocated list */
@@ -248,7 +299,8 @@ void *xine_ring_buffer_get(xine_ring_buffer_t *ring_buffer, size_t size) {
chunk->size = size;
xine_list_push_back(ring_buffer->get_list, chunk);
- ring_buffer->tail_release += size;
+ *rsize = size;
+ ring_buffer->full_size -= size;
pthread_mutex_unlock(&ring_buffer->lock);
return chunk->mem;
@@ -274,13 +326,22 @@ void xine_ring_buffer_release(xine_ring_buffer_t *ring_buffer, void *buffer) {
assert(chunk);
/* found associated chunk, is it the first in the list ? */
if (!prev_chunk) {
- /* increment ringbuffer tail and free_size */
- ring_buffer->tail += chunk->size;
- ring_buffer->free_size += chunk->size;
- if (ring_buffer->tail >= ring_buffer->buffer_end) {
- ring_buffer->tail = ring_buffer->buffer +
- (ring_buffer->tail - ring_buffer->buffer_end);
+ size_t continuous_size;
+
+ /* increment ringbuffer tail_release and free_size */
+
+ continuous_size = ring_buffer->buffer_end - ring_buffer->tail_release;
+ if (chunk->size > continuous_size) {
+ ring_buffer->tail_release = ring_buffer->buffer + chunk->size - continuous_size;
+ /* place the buffer_end back to the end */
+ ring_buffer->free_size +=
+ (ring_buffer->buffer + ring_buffer->buffer_size) - ring_buffer->buffer_end;
+ ring_buffer->buffer_end = ring_buffer->buffer + ring_buffer->buffer_size;
+ } else {
+ ring_buffer->tail_release += chunk->size;
}
+ ring_buffer->free_size += chunk->size;
+
/* notify */
if (ring_buffer->free_size_needed) {
pthread_cond_broadcast(&ring_buffer->free_size_cond);
@@ -289,8 +350,20 @@ void xine_ring_buffer_release(xine_ring_buffer_t *ring_buffer, void *buffer) {
/* merge with previous chunk */
prev_chunk->size += chunk->size;
}
- xine_list_remove(ring_buffer->get_list, chunk);
+ xine_list_remove(ring_buffer->get_list, ite);
xine_pool_put(ring_buffer->chunk_pool, chunk);
pthread_mutex_unlock(&ring_buffer->lock);
}
+
+void xine_ring_buffer_close(xine_ring_buffer_t *ring_buffer) {
+ pthread_mutex_lock(&ring_buffer->lock);
+
+ ring_buffer->EOS = 1;
+
+ /* notify */
+ if (ring_buffer->full_size_needed) {
+ pthread_cond_broadcast(&ring_buffer->full_size_cond);
+ }
+ pthread_mutex_unlock(&ring_buffer->lock);
+}
diff --git a/src/xine-utils/ring_buffer.h b/src/xine-utils/ring_buffer.h
index 412c62b7a..ece5e7beb 100644
--- a/src/xine-utils/ring_buffer.h
+++ b/src/xine-utils/ring_buffer.h
@@ -17,7 +17,7 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA
*
- * $Id: ring_buffer.h,v 1.1 2006/01/27 07:55:18 tmattern Exp $
+ * $Id: ring_buffer.h,v 1.2 2006/01/27 19:37:15 tmattern Exp $
*
* Fifo + Ring Buffer
*/
@@ -36,10 +36,22 @@ void *xine_ring_buffer_alloc(xine_ring_buffer_t *ring_buffer, size_t size);
/* Put a chunk into the ring */
void xine_ring_buffer_put(xine_ring_buffer_t *ring_buffer, void *chunk);
-/* Get a chunk of a specified size from the ring buffer */
-/* Might block if the ring buffer is empty */
-void *xine_ring_buffer_get(xine_ring_buffer_t *ring_buffer, size_t size);
+/* Get a chunk of a specified size from the ring buffer
+ * Might block if the ring buffer is empty
+ * param size: the desired size
+ * param rsize: the size of the chunk returned
+ * rsize is not equal to size at the end of stream, the caller MUST check
+ * rsize value.
+ */
+void *xine_ring_buffer_get(xine_ring_buffer_t *ring_buffer, size_t size, size_t *rsize);
-/* Release the chunk, makes memory available for the alloc function */
+/* Releases the chunk, makes memory available for the alloc function */
void xine_ring_buffer_release(xine_ring_buffer_t *ring_buffer, void *chunk);
+/* Closes the ring buffer
+ * The writer uses this function to signal the end of stream to the reader.
+ * The reader MUST check the rsize value returned by the get function.
+ */
+void xine_ring_buffer_close(xine_ring_buffer_t *ring_buffer);
+
+