aboutsummaryrefslogtreecommitdiff
path: root/plugins/elements/gstqueue2.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/elements/gstqueue2.c')
-rw-r--r--plugins/elements/gstqueue2.c839
1 files changed, 538 insertions, 301 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c
index c89c6cd..82e2e11 100644
--- a/plugins/elements/gstqueue2.c
+++ b/plugins/elements/gstqueue2.c
@@ -63,6 +63,7 @@
#include <glib/gstdio.h>
#include "gst/gst-i18n-lib.h"
+#include "gst/glib-compat-private.h"
#include <string.h>
@@ -155,10 +156,10 @@ enum
queue->max_level.time, \
(guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \
- queue->queue->length))
+ queue->queue.length))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
- g_mutex_lock (q->qlock); \
+ g_mutex_lock (&q->qlock); \
} G_STMT_END
#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
@@ -168,13 +169,13 @@ enum
} G_STMT_END
#define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
- g_mutex_unlock (q->qlock); \
+ g_mutex_unlock (&q->qlock); \
} G_STMT_END
#define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \
STATUS (queue, q->sinkpad, "wait for DEL"); \
q->waiting_del = TRUE; \
- g_cond_wait (q->item_del, queue->qlock); \
+ g_cond_wait (&q->item_del, &queue->qlock); \
q->waiting_del = FALSE; \
if (res != GST_FLOW_OK) { \
STATUS (queue, q->srcpad, "received DEL wakeup"); \
@@ -186,7 +187,7 @@ enum
#define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \
STATUS (queue, q->srcpad, "wait for ADD"); \
q->waiting_add = TRUE; \
- g_cond_wait (q->item_add, q->qlock); \
+ g_cond_wait (&q->item_add, &q->qlock); \
q->waiting_add = FALSE; \
if (res != GST_FLOW_OK) { \
STATUS (queue, q->srcpad, "received ADD wakeup"); \
@@ -198,14 +199,14 @@ enum
#define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
if (q->waiting_del) { \
STATUS (q, q->srcpad, "signal DEL"); \
- g_cond_signal (q->item_del); \
+ g_cond_signal (&q->item_del); \
} \
} G_STMT_END
#define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
if (q->waiting_add) { \
STATUS (q, q->sinkpad, "signal ADD"); \
- g_cond_signal (q->item_add); \
+ g_cond_signal (&q->item_add); \
} \
} G_STMT_END
@@ -223,26 +224,32 @@ static void gst_queue2_set_property (GObject * object,
static void gst_queue2_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
-static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list);
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
-static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
-static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
static gboolean gst_queue2_handle_query (GstElement * element,
GstQuery * query);
-static GstCaps *gst_queue2_getcaps (GstPad * pad, GstCaps * filter);
-static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
-
-static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
- guint length, GstBuffer ** buffer);
+static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
+ guint64 offset, guint length, GstBuffer ** buffer);
-static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
-static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
+static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active);
+static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active);
static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
GstStateChange transition);
@@ -251,6 +258,14 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
+typedef enum
+{
+ GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
+ GST_QUEUE2_ITEM_TYPE_BUFFER,
+ GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
+ GST_QUEUE2_ITEM_TYPE_EVENT
+} GstQueue2ItemType;
+
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
static void
@@ -358,7 +373,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
- gst_element_class_set_details_simple (gstelement_class, "Queue 2",
+ gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
"Generic",
"Simple data queue",
"Erik Walthinsen <omega@cse.ogi.edu>, "
@@ -375,32 +390,28 @@ gst_queue2_init (GstQueue2 * queue)
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_chain));
- gst_pad_set_activatepush_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
+ gst_pad_set_chain_list_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
+ gst_pad_set_activatemode_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
- gst_pad_set_getcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
+ gst_pad_set_query_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
+ GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
- gst_pad_set_activatepull_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
- gst_pad_set_activatepush_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
+ gst_pad_set_activatemode_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
gst_pad_set_getrange_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_get_range));
- gst_pad_set_getcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
- gst_pad_set_acceptcaps_function (queue->srcpad,
- GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
gst_pad_set_event_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
gst_pad_set_query_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
+ GST_PAD_SET_PROXY_CAPS (queue->srcpad);
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
/* levels */
@@ -422,18 +433,18 @@ gst_queue2_init (GstQueue2 * queue)
queue->sink_tainted = TRUE;
queue->src_tainted = TRUE;
- queue->srcresult = GST_FLOW_WRONG_STATE;
- queue->sinkresult = GST_FLOW_WRONG_STATE;
+ queue->srcresult = GST_FLOW_FLUSHING;
+ queue->sinkresult = GST_FLOW_FLUSHING;
queue->is_eos = FALSE;
queue->in_timer = g_timer_new ();
queue->out_timer = g_timer_new ();
- queue->qlock = g_mutex_new ();
+ g_mutex_init (&queue->qlock);
queue->waiting_add = FALSE;
- queue->item_add = g_cond_new ();
+ g_cond_init (&queue->item_add);
queue->waiting_del = FALSE;
- queue->item_del = g_cond_new ();
- queue->queue = g_queue_new ();
+ g_cond_init (&queue->item_del);
+ g_queue_init (&queue->queue);
queue->buffering_percent = 100;
@@ -458,16 +469,16 @@ gst_queue2_finalize (GObject * object)
GST_DEBUG_OBJECT (queue, "finalizing queue");
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
gst_mini_object_unref (data);
}
- g_queue_free (queue->queue);
- g_mutex_free (queue->qlock);
- g_cond_free (queue->item_add);
- g_cond_free (queue->item_del);
+ g_queue_clear (&queue->queue);
+ g_mutex_clear (&queue->qlock);
+ g_cond_clear (&queue->item_add);
+ g_cond_clear (&queue->item_del);
g_timer_destroy (queue->in_timer);
g_timer_destroy (queue->out_timer);
@@ -611,42 +622,6 @@ init_ranges (GstQueue2 * queue)
queue->current = add_range (queue, 0);
}
-static gboolean
-gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
-{
- GstQueue2 *queue;
- GstPad *otherpad;
- gboolean result;
-
- queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
-
- otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
- result = gst_pad_peer_accept_caps (otherpad, caps);
-
- return result;
-}
-
-static GstCaps *
-gst_queue2_getcaps (GstPad * pad, GstCaps * filter)
-{
- GstQueue2 *queue;
- GstPad *otherpad;
- GstCaps *result;
-
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
- if (G_UNLIKELY (queue == NULL))
- return (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
-
- otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
- result = gst_pad_peer_get_caps (otherpad, filter);
- if (result == NULL)
- result = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
-
- gst_object_unref (queue);
-
- return result;
-}
-
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue. */
static void
@@ -686,11 +661,9 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
gst_event_copy_segment (event, segment);
if (segment->format == GST_FORMAT_BYTES) {
- if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
/* start is where we'll be getting from and as such writing next */
queue->current = add_range (queue, segment->start);
- /* update the stats for this range */
- update_cur_level (queue, queue->current);
}
}
@@ -749,6 +722,52 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
update_time_level (queue);
}
+static gboolean
+buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
+{
+ GstClockTime *timestamp = data;
+
+ GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
+ " duration %" GST_TIME_FORMAT, idx,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+ *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+ if (GST_BUFFER_DURATION_IS_VALID (*buf))
+ *timestamp += GST_BUFFER_DURATION (*buf);
+
+ GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+ return TRUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
+ GstSegment * segment, gboolean is_sink)
+{
+ GstClockTime timestamp;
+
+ /* if no timestamp is set, assume it's continuous with the previous time */
+ timestamp = segment->position;
+
+ gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+ GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timestamp));
+
+ segment->position = timestamp;
+
+ if (is_sink)
+ queue->sink_tainted = TRUE;
+ else
+ queue->src_tainted = TRUE;
+
+ /* calc diff with other end */
+ update_time_level (queue);
+}
+
static void
update_buffering (GstQueue2 * queue)
{
@@ -819,7 +838,7 @@ update_buffering (GstQueue2 * queue)
mode = GST_BUFFERING_DOWNLOAD;
if (queue->byte_in_rate > 0) {
- if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES,
+ if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
&duration)) {
buffering_left =
(gdouble) ((duration -
@@ -984,6 +1003,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
GstEvent *event;
gboolean res;
+ /* until we receive the FLUSH_STOP from this seek, we skip data */
+ queue->seeking = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
@@ -1036,20 +1057,27 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
(offset + length) - range->writing_pos);
} else {
- GST_INFO_OBJECT (queue, "not found in any range");
- /* we don't have the range, see how far away we are, FIXME, find a good
- * threshold based on the incoming rate. */
+ GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
+ " len %u", offset, length);
+ /* we don't have the range, see how far away we are */
if (!queue->is_eos && queue->current) {
+ /* FIXME, find a good threshold based on the incoming rate. */
+ guint64 threshold = 1024 * 512;
+
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
- if (offset < queue->current->offset || offset >
- queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
- queue->cur_level.bytes) {
- perform_seek_to_offset (queue, offset);
- } else {
+ guint64 distance;
+
+ distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+ /* don't wait for the complete buffer to fill */
+ distance = MIN (distance, threshold);
+
+ if (offset >= queue->current->offset && offset <=
+ queue->current->writing_pos + distance) {
GST_INFO_OBJECT (queue,
"requested data is within range, wait for data");
+ return FALSE;
}
- } else if (offset < queue->current->writing_pos + 200000) {
+ } else if (offset < queue->current->writing_pos + threshold) {
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "wait for data");
return FALSE;
@@ -1122,7 +1150,7 @@ could_not_read:
eos:
{
GST_DEBUG ("non-regular file hits EOS");
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
}
@@ -1131,22 +1159,30 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GstBuffer ** buffer)
{
GstBuffer *buf;
+ GstMapInfo info;
guint8 *data;
guint64 file_offset;
guint block_length, remaining, read_length;
guint64 rb_size;
+ guint64 max_size;
guint64 rpos;
GstFlowReturn ret = GST_FLOW_OK;
/* allocate the output buffer of the requested size */
- buf = gst_buffer_new_allocate (NULL, length, 0);
- data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE);
+ if (*buffer == NULL)
+ buf = gst_buffer_new_allocate (NULL, length, NULL);
+ else
+ buf = *buffer;
+
+ gst_buffer_map (buf, &info, GST_MAP_WRITE);
+ data = info.data;
GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
offset);
rpos = offset;
rb_size = queue->ring_buffer_max_size;
+ max_size = QUEUE_MAX_BYTES (queue);
remaining = length;
while (remaining > 0) {
@@ -1165,16 +1201,16 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GST_DEBUG_OBJECT (queue,
"reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
- ", level %" G_GUINT64_FORMAT,
- rpos, queue->current->writing_pos, level);
+ ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
+ rpos, queue->current->writing_pos, level, max_size);
- if (level >= rb_size) {
+ if (level >= max_size) {
/* we don't have the data but if we have a ring buffer that is full, we
* need to read */
GST_DEBUG_OBJECT (queue,
- "ring buffer full, reading ring-buffer-max-size %"
- G_GUINT64_FORMAT " bytes", rb_size);
- read_length = rb_size;
+ "ring buffer full, reading QUEUE_MAX_BYTES %"
+ G_GUINT64_FORMAT " bytes", max_size);
+ read_length = max_size;
} else if (queue->is_eos) {
/* won't get any more data so read any data we have */
if (level) {
@@ -1182,21 +1218,20 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
"EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
level);
read_length = level;
+ remaining = level;
+ length = level;
} else
goto hit_eos;
}
}
if (read_length == 0) {
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- && queue->current->max_reading_pos > rpos) {
- /* protect cached data (data between offset and max_reading_pos)
- * and update current level */
+ if (QUEUE_IS_USING_RING_BUFFER (queue)) {
GST_DEBUG_OBJECT (queue,
- "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
- "]", rpos, queue->current->max_reading_pos);
- queue->current->max_reading_pos = rpos;
- update_cur_level (queue, queue->current);
+ "update current position [%" G_GUINT64_FORMAT "-%"
+ G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
+ update_cur_pos (queue, queue->current, rpos);
+ GST_QUEUE2_SIGNAL_DEL (queue);
}
GST_DEBUG_OBJECT (queue, "waiting for add");
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
@@ -1251,7 +1286,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
}
- gst_buffer_unmap (buf, data, length);
+ gst_buffer_unmap (buf, &info);
+ gst_buffer_resize (buf, 0, length);
GST_BUFFER_OFFSET (buf) = offset;
GST_BUFFER_OFFSET_END (buf) = offset + length;
@@ -1264,20 +1300,25 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
hit_eos:
{
GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
- gst_buffer_unref (buf);
- return GST_FLOW_UNEXPECTED;
+ gst_buffer_unmap (buf, &info);
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
+ return GST_FLOW_EOS;
}
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
- gst_buffer_unref (buf);
- return GST_FLOW_WRONG_STATE;
+ gst_buffer_unmap (buf, &info);
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
+ return GST_FLOW_FLUSHING;
}
read_error:
{
GST_DEBUG_OBJECT (queue, "we have a read error");
- gst_buffer_unmap (buf, data, 0);
- gst_buffer_unref (buf);
+ gst_buffer_unmap (buf, &info);
+ if (*buffer == NULL)
+ gst_buffer_unref (buf);
return ret;
}
}
@@ -1288,12 +1329,15 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
{
GstMiniObject *item;
- if (queue->starting_segment != NULL) {
+ if (queue->stream_start_event != NULL) {
+ item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
+ queue->stream_start_event = NULL;
+ } else if (queue->starting_segment != NULL) {
item = GST_MINI_OBJECT_CAST (queue->starting_segment);
queue->starting_segment = NULL;
} else {
GstFlowReturn ret;
- GstBuffer *buffer;
+ GstBuffer *buffer = NULL;
guint64 reading_pos;
reading_pos = queue->current->reading_pos;
@@ -1306,7 +1350,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
case GST_FLOW_OK:
item = GST_MINI_OBJECT_CAST (buffer);
break;
- case GST_FLOW_UNEXPECTED:
+ case GST_FLOW_EOS:
item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
break;
default:
@@ -1442,8 +1486,8 @@ gst_queue2_locked_flush (GstQueue2 * queue)
gst_queue2_flush_temp_file (queue);
init_ranges (queue);
} else {
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
@@ -1459,6 +1503,7 @@ gst_queue2_locked_flush (GstQueue2 * queue)
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
queue->segment_event_received = FALSE;
+ gst_event_replace (&queue->stream_start_event, NULL);
/* we deleted a lot of something */
GST_QUEUE2_SIGNAL_DEL (queue);
@@ -1502,9 +1547,9 @@ out_flushing:
static gboolean
gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
{
- guint8 *odata, *data, *ring_buffer;
+ GstMapInfo info;
+ guint8 *data, *ring_buffer;
guint size, rb_size;
- gsize osize;
guint64 writing_pos, new_writing_pos;
GstQueue2Range *range, *prev, *next;
@@ -1515,13 +1560,13 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
ring_buffer = queue->ring_buffer;
rb_size = queue->ring_buffer_max_size;
- odata = gst_buffer_map (buffer, &osize, NULL, GST_MAP_READ);
+ gst_buffer_map (buffer, &info, GST_MAP_READ);
- size = osize;
- data = odata;
+ size = info.size;
+ data = info.data;
GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
- GST_BUFFER_OFFSET (buffer));
+ writing_pos);
while (size > 0) {
guint to_write;
@@ -1742,7 +1787,7 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
GST_QUEUE2_SIGNAL_ADD (queue);
}
- gst_buffer_unmap (buffer, odata, osize);
+ gst_buffer_unmap (buffer, &info);
return TRUE;
@@ -1750,14 +1795,14 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
- gst_buffer_unmap (buffer, odata, osize);
- /* FIXME - GST_FLOW_UNEXPECTED ? */
+ gst_buffer_unmap (buffer, &info);
+ /* FIXME - GST_FLOW_EOS ? */
return FALSE;
}
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
- gst_buffer_unmap (buffer, odata, osize);
+ gst_buffer_unmap (buffer, &info);
return FALSE;
}
handle_error:
@@ -1773,16 +1818,45 @@ handle_error:
("%s", g_strerror (errno)));
}
}
- gst_buffer_unmap (buffer, odata, osize);
+ gst_buffer_unmap (buffer, &info);
return FALSE;
}
}
+static gboolean
+buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
+{
+ GstQueue2 *queue = q;
+
+ GST_TRACE_OBJECT (queue,
+ "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
+ gst_buffer_get_size (*buf));
+
+ if (!gst_queue2_create_write (queue, *buf)) {
+ GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static gboolean
+buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
+{
+ guint *p_size = data;
+ gsize buf_size;
+
+ buf_size = gst_buffer_get_size (*buf);
+ GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
+ *p_size += buf_size;
+ return TRUE;
+}
+
/* enqueue an item an update the level stats */
static void
-gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
+gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
+ GstQueue2ItemType item_type)
{
- if (isbuffer) {
+ if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
guint size;
@@ -1805,7 +1879,32 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
/* FIXME - check return value? */
gst_queue2_create_write (queue, buffer);
}
- } else if (GST_IS_EVENT (item)) {
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list;
+ guint size = 0;
+
+ buffer_list = GST_BUFFER_LIST_CAST (item);
+
+ gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+ GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
+
+ /* add buffer to the statistics */
+ if (QUEUE_IS_USING_QUEUE (queue)) {
+ queue->cur_level.buffers++;
+ queue->cur_level.bytes += size;
+ }
+ queue->bytes_in += size;
+
+ /* apply new buffer to segment stats */
+ apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+ /* update the byterate stats */
+ update_in_rates (queue);
+
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
+ gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
+ }
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event;
event = GST_EVENT_CAST (item);
@@ -1831,10 +1930,17 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
queue->starting_segment = event;
item = NULL;
}
- /* a new segment allows us to accept more buffers if we got UNEXPECTED
+ /* a new segment allows us to accept more buffers if we got EOS
* from downstream */
queue->unexpected = FALSE;
break;
+ case GST_EVENT_STREAM_START:
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
+ gst_event_replace (&queue->stream_start_event, event);
+ gst_event_unref (event);
+ item = NULL;
+ }
+ break;
default:
if (!QUEUE_IS_USING_QUEUE (queue))
goto unexpected_event;
@@ -1853,7 +1959,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
update_buffering (queue);
if (QUEUE_IS_USING_QUEUE (queue)) {
- g_queue_push_tail (queue->queue, item);
+ g_queue_push_tail (&queue->queue, item);
} else {
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
}
@@ -1877,14 +1983,14 @@ unexpected_event:
/* dequeue an item from the queue and update level stats */
static GstMiniObject *
-gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
+gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
{
GstMiniObject *item;
if (!QUEUE_IS_USING_QUEUE (queue))
item = gst_queue2_read_item_from_file (queue);
else
- item = g_queue_pop_head (queue->queue);
+ item = g_queue_pop_head (&queue->queue);
if (item == NULL)
goto no_item;
@@ -1895,7 +2001,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
buffer = GST_BUFFER_CAST (item);
size = gst_buffer_get_size (buffer);
- *is_buffer = TRUE;
+ *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer);
@@ -1916,7 +2022,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
} else if (GST_IS_EVENT (item)) {
GstEvent *event = GST_EVENT_CAST (item);
- *is_buffer = FALSE;
+ *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved event %p from queue", event);
@@ -1932,11 +2038,36 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
default:
break;
}
+ } else if (GST_IS_BUFFER_LIST (item)) {
+ GstBufferList *buffer_list;
+ guint size = 0;
+
+ buffer_list = GST_BUFFER_LIST_CAST (item);
+ gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+ *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "retrieved buffer list %p from queue", buffer_list);
+
+ if (QUEUE_IS_USING_QUEUE (queue)) {
+ queue->cur_level.buffers--;
+ queue->cur_level.bytes -= size;
+ }
+ queue->bytes_out += size;
+
+ apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+ /* update the byterate stats */
+ update_out_rates (queue);
+ /* update the buffering */
+ if (queue->use_buffering)
+ update_buffering (queue);
+
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
item, GST_OBJECT_NAME (queue));
item = NULL;
+ *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
}
GST_QUEUE2_SIGNAL_DEL (queue);
@@ -1951,24 +2082,25 @@ no_item:
}
static gboolean
-gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE2 (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
- if (QUEUE_IS_USING_QUEUE (queue)) {
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
/* now unblock the chain function */
GST_QUEUE2_MUTEX_LOCK (queue);
- queue->srcresult = GST_FLOW_WRONG_STATE;
- queue->sinkresult = GST_FLOW_WRONG_STATE;
+ queue->srcresult = GST_FLOW_FLUSHING;
+ queue->sinkresult = GST_FLOW_FLUSHING;
/* unblock the loop and chain functions */
GST_QUEUE2_SIGNAL_ADD (queue);
GST_QUEUE2_SIGNAL_DEL (queue);
@@ -1981,7 +2113,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
/* flush the sink pad */
- queue->sinkresult = GST_FLOW_WRONG_STATE;
+ queue->sinkresult = GST_FLOW_FLUSHING;
GST_QUEUE2_SIGNAL_DEL (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
@@ -1993,7 +2125,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
- if (QUEUE_IS_USING_QUEUE (queue)) {
+ if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@@ -2003,6 +2135,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
queue->sinkresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
+ queue->seeking = FALSE;
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
@@ -2014,6 +2147,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
queue->is_eos = FALSE;
queue->unexpected = FALSE;
queue->sinkresult = GST_FLOW_OK;
+ queue->seeking = FALSE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
gst_event_unref (event);
@@ -2027,7 +2161,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
/* refuse more events on EOS */
if (queue->is_eos)
goto out_eos;
- gst_queue2_locked_enqueue (queue, event, FALSE);
+ gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* non-serialized events are passed upstream. */
@@ -2056,6 +2190,25 @@ out_eos:
}
static gboolean
+gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query)
+{
+ gboolean res;
+
+ switch (GST_QUERY_TYPE (query)) {
+ default:
+ if (GST_QUERY_IS_SERIALIZED (query)) {
+ GST_WARNING_OBJECT (pad, "unhandled serialized query");
+ res = FALSE;
+ } else {
+ res = gst_pad_query_default (pad, parent, query);
+ }
+ break;
+ }
+ return res;
+}
+
+static gboolean
gst_queue2_is_empty (GstQueue2 * queue)
{
/* never empty on EOS */
@@ -2065,7 +2218,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
- if (queue->queue->length == 0)
+ if (queue->queue.length == 0)
return TRUE;
}
@@ -2117,18 +2270,9 @@ gst_queue2_is_filled (GstQueue2 * queue)
}
static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
+ GstMiniObject * item, GstQueue2ItemType item_type)
{
- GstQueue2 *queue;
-
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
-
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
- G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
- GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
/* we have to lock the queue since we span threads */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
/* when we received EOS, we refuse more data */
@@ -2138,11 +2282,15 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
if (queue->unexpected)
goto out_unexpected;
+ /* while we didn't receive the newsegment, we're seeking and we skip data */
+ if (queue->seeking)
+ goto out_seeking;
+
if (!gst_queue2_wait_free_space (queue))
goto out_flushing;
/* put buffer in queue now */
- gst_queue2_locked_enqueue (queue, buffer, TRUE);
+ gst_queue2_locked_enqueue (queue, item, item_type);
GST_QUEUE2_MUTEX_UNLOCK (queue);
return GST_FLOW_OK;
@@ -2155,7 +2303,7 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
return ret;
}
@@ -2163,19 +2311,102 @@ out_eos:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
+ }
+out_seeking:
+ {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ gst_mini_object_unref (item);
+
+ return GST_FLOW_OK;
}
out_unexpected:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "exit because we received UNEXPECTED");
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
+
+ return GST_FLOW_EOS;
+ }
+}
+
+static GstFlowReturn
+gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstQueue2 *queue;
+
+ queue = GST_QUEUE2 (parent);
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
+ "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
+ GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+ return gst_queue2_chain_buffer_or_buffer_list (queue,
+ GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+}
+
+static GstFlowReturn
+gst_queue2_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list)
+{
+ GstQueue2 *queue;
- return GST_FLOW_UNEXPECTED;
+ queue = GST_QUEUE2 (parent);
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "received buffer list %p", buffer_list);
+
+ return gst_queue2_chain_buffer_or_buffer_list (queue,
+ GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+}
+
+static GstMiniObject *
+gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
+{
+ GstMiniObject *data;
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
+
+ /* stop pushing buffers, we dequeue all items until we see an item that we
+ * can push again, which is EOS or SEGMENT. If there is nothing in the
+ * queue we can push, we set a flag to make the sinkpad refuse more
+ * buffers with an EOS return value until we receive something
+ * pushable again or we get flushed. */
+ while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
+ if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping EOS buffer %p", data);
+ gst_buffer_unref (GST_BUFFER_CAST (data));
+ } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
+ GstEvent *event = GST_EVENT_CAST (data);
+ GstEventType type = GST_EVENT_TYPE (event);
+
+ if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+ /* we found a pushable item in the queue, push it out */
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
+ return data;
+ }
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping EOS event %p", event);
+ gst_event_unref (event);
+ } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping EOS buffer list %p", data);
+ gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
+ }
}
+ /* no more items in the queue. Set the unexpected flag so that upstream
+ * make us refuse any more buffers on the sinkpad. Since we will still
+ * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
+ * task function does not shut down. */
+ queue->unexpected = TRUE;
+ return NULL;
}
/* dequeue an item from the queue an push it downstream. This functions returns
@@ -2185,16 +2416,16 @@ gst_queue2_push_one (GstQueue2 * queue)
{
GstFlowReturn result = GST_FLOW_OK;
GstMiniObject *data;
- gboolean is_buffer = FALSE;
+ GstQueue2ItemType item_type;
- data = gst_queue2_locked_dequeue (queue, &is_buffer);
+ data = gst_queue2_locked_dequeue (queue, &item_type);
if (data == NULL)
goto no_item;
next:
GST_QUEUE2_MUTEX_UNLOCK (queue);
- if (is_buffer) {
+ if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
#if 0
GstCaps *caps;
@@ -2216,56 +2447,59 @@ next:
/* need to check for srcresult here as well */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
- if (result == GST_FLOW_UNEXPECTED) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "got UNEXPECTED from downstream");
- /* stop pushing buffers, we dequeue all items until we see an item that we
- * can push again, which is EOS or SEGMENT. If there is nothing in the
- * queue we can push, we set a flag to make the sinkpad refuse more
- * buffers with an UNEXPECTED return value until we receive something
- * pushable again or we get flushed. */
- while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
- if (is_buffer) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED buffer %p", data);
- gst_buffer_unref (GST_BUFFER_CAST (data));
- } else if (GST_IS_EVENT (data)) {
- GstEvent *event = GST_EVENT_CAST (data);
- GstEventType type = GST_EVENT_TYPE (event);
-
- if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
- /* we found a pushable item in the queue, push it out */
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "pushing pushable event %s after UNEXPECTED",
- GST_EVENT_TYPE_NAME (event));
- goto next;
- }
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED event %p", event);
- gst_event_unref (event);
- }
- }
- /* no more items in the queue. Set the unexpected flag so that upstream
- * make us refuse any more buffers on the sinkpad. Since we will still
- * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
- * task function does not shut down. */
- queue->unexpected = TRUE;
+ if (result == GST_FLOW_EOS) {
+ data = gst_queue2_dequeue_on_eos (queue, &item_type);
+ if (data != NULL)
+ goto next;
+ /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
+ * to the caller so that the task function does not shut down */
result = GST_FLOW_OK;
}
- } else if (GST_IS_EVENT (data)) {
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
gst_pad_push_event (queue->srcpad, event);
- /* if we're EOS, return UNEXPECTED so that the task pauses. */
+ /* if we're EOS, return EOS so that the task pauses. */
if (type == GST_EVENT_EOS) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "pushed EOS event %p, return UNEXPECTED", event);
- result = GST_FLOW_UNEXPECTED;
+ "pushed EOS event %p, return EOS", event);
+ result = GST_FLOW_EOS;
}
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list;
+#if 0
+ GstBuffer *first_buf;
+ GstCaps *caps;
+#endif
+
+ buffer_list = GST_BUFFER_LIST_CAST (data);
+
+#if 0
+ first_buf = gst_buffer_list_get (buffer_list, 0);
+ caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
+
+ /* set caps before pushing the buffer so that core does not try to do
+ * something fancy to check if this is possible. */
+ if (caps && caps != GST_PAD_CAPS (queue->srcpad))
+ gst_pad_set_caps (queue->srcpad, caps);
+#endif
+
+ result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+ /* need to check for srcresult here as well */
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+ if (result == GST_FLOW_EOS) {
+ data = gst_queue2_dequeue_on_eos (queue, &item_type);
+ if (data != NULL)
+ goto next;
+ /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
+ * to the caller so that the task function does not shut down */
+ result = GST_FLOW_OK;
+ }
}
return result;
@@ -2279,7 +2513,7 @@ no_item:
out_flushing:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
- return GST_FLOW_WRONG_STATE;
+ return GST_FLOW_FLUSHING;
}
}
@@ -2337,8 +2571,8 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"pause task, reason: %s", gst_flow_get_name (queue->srcresult));
/* let app know about us giving up if upstream is not expected to do so */
- /* UNEXPECTED is already taken care of elsewhere */
- if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
+ /* EOS is already taken care of elsewhere */
+ if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
GST_ELEMENT_ERROR (queue, STREAM, FAILED,
(_("Internal data flow error.")),
("streaming task paused, reason %s (%d)",
@@ -2350,15 +2584,11 @@ out_flushing:
}
static gboolean
-gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
+gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
gboolean res = TRUE;
- GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ GstQueue2 *queue = GST_QUEUE2 (parent);
- if (G_UNLIKELY (queue == NULL)) {
- gst_event_unref (event);
- return FALSE;
- }
#ifndef GST_DISABLE_GST_DEBUG
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
event, GST_EVENT_TYPE_NAME (event));
@@ -2373,7 +2603,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
/* now unblock the getrange function */
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "flushing");
- queue->srcresult = GST_FLOW_WRONG_STATE;
+ queue->srcresult = GST_FLOW_FLUSHING;
GST_QUEUE2_SIGNAL_ADD (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
@@ -2390,12 +2620,6 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
/* now unblock the getrange function */
GST_QUEUE2_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
- if (queue->current) {
- /* forget the highest read offset, we'll calculate a new one when we
- * get the next getrange request. We need to do this in order to reset
- * the buffering percentage */
- queue->current->max_reading_pos = 0;
- }
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* when using a temp file, we eat the event */
@@ -2408,31 +2632,15 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
break;
}
- gst_object_unref (queue);
return res;
}
static gboolean
-gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
-{
- gboolean ret = FALSE;
- GstPad *peer;
-
- if ((peer = gst_pad_get_peer (pad))) {
- ret = gst_pad_query (peer, query);
- gst_object_unref (peer);
- }
- return ret;
-}
-
-static gboolean
-gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
+gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
- if (G_UNLIKELY (queue == NULL))
- return FALSE;
+ queue = GST_QUEUE2 (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:
@@ -2440,7 +2648,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
gint64 peer_pos;
GstFormat format;
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
/* get peer position */
@@ -2467,7 +2675,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
{
GST_DEBUG_OBJECT (queue, "doing peer query");
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "peer query success");
@@ -2482,7 +2690,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
/* FIXME - is this condition correct? what should ring buffer do? */
if (QUEUE_IS_USING_QUEUE (queue)) {
/* no temp file, just forward to the peer */
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_peer_query (queue->sinkpad, query))
goto peer_failed;
GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
} else {
@@ -2512,7 +2720,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
duration = writing_pos;
} else {
/* get duration of upstream in bytes */
- peer_res = gst_pad_query_peer_duration (queue->sinkpad,
+ peer_res = gst_pad_peer_query_duration (queue->sinkpad,
GST_FORMAT_BYTES, &duration);
}
@@ -2597,28 +2805,32 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
case GST_QUERY_SCHEDULING:
{
gboolean pull_mode;
+ GstSchedulingFlags flags = 0;
/* we can operate in pull mode when we are using a tempfile */
pull_mode = !QUEUE_IS_USING_QUEUE (queue);
- gst_query_set_scheduling (query, pull_mode, pull_mode, FALSE, 0, -1, 1);
+ if (pull_mode)
+ flags |= GST_SCHEDULING_FLAG_SEEKABLE;
+ gst_query_set_scheduling (query, flags, 0, -1, 0);
+ if (pull_mode)
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
break;
}
default:
/* peer handled other queries */
- if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
+ if (!gst_pad_query_default (pad, parent, query))
goto peer_failed;
break;
}
- gst_object_unref (queue);
return TRUE;
/* ERRORS */
peer_failed:
{
GST_DEBUG_OBJECT (queue, "failed peer query");
- gst_object_unref (queue);
return FALSE;
}
}
@@ -2626,8 +2838,11 @@ peer_failed:
static gboolean
gst_queue2_handle_query (GstElement * element, GstQuery * query)
{
+ GstQueue2 *queue = GST_QUEUE2 (element);
+
/* simply forward to the srcpad query function */
- return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
+ return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
+ query);
}
static void
@@ -2635,7 +2850,7 @@ gst_queue2_update_upstream_size (GstQueue2 * queue)
{
gint64 upstream_size = -1;
- if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES,
+ if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
&upstream_size)) {
GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
queue->upstream_size = upstream_size;
@@ -2643,13 +2858,13 @@ gst_queue2_update_upstream_size (GstQueue2 * queue)
}
static GstFlowReturn
-gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
- GstBuffer ** buffer)
+gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
+ guint length, GstBuffer ** buffer)
{
GstQueue2 *queue;
GstFlowReturn ret;
- queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2_CAST (parent);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
@@ -2681,8 +2896,6 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
ret = gst_queue2_create_read (queue, offset, length, buffer);
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
-
return ret;
/* ERRORS */
@@ -2692,60 +2905,64 @@ out_flushing:
GST_DEBUG_OBJECT (queue, "we are flushing");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
return ret;
}
out_unexpected:
{
GST_DEBUG_OBJECT (queue, "read beyond end of file");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_object_unref (queue);
- return GST_FLOW_UNEXPECTED;
+ return GST_FLOW_EOS;
}
}
/* sink currently only operates in push mode */
static gboolean
-gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
+gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
{
- gboolean result = TRUE;
+ gboolean result;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
- if (active) {
- GST_QUEUE2_MUTEX_LOCK (queue);
- GST_DEBUG_OBJECT (queue, "activating push mode");
- queue->srcresult = GST_FLOW_OK;
- queue->sinkresult = GST_FLOW_OK;
- queue->is_eos = FALSE;
- queue->unexpected = FALSE;
- reset_rate_timer (queue);
- GST_QUEUE2_MUTEX_UNLOCK (queue);
- } else {
- /* unblock chain function */
- GST_QUEUE2_MUTEX_LOCK (queue);
- GST_DEBUG_OBJECT (queue, "deactivating push mode");
- queue->srcresult = GST_FLOW_WRONG_STATE;
- queue->sinkresult = GST_FLOW_WRONG_STATE;
- gst_queue2_locked_flush (queue);
- GST_QUEUE2_MUTEX_UNLOCK (queue);
+ switch (mode) {
+ case GST_PAD_MODE_PUSH:
+ if (active) {
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "activating push mode");
+ queue->srcresult = GST_FLOW_OK;
+ queue->sinkresult = GST_FLOW_OK;
+ queue->is_eos = FALSE;
+ queue->unexpected = FALSE;
+ reset_rate_timer (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ } else {
+ /* unblock chain function */
+ GST_QUEUE2_MUTEX_LOCK (queue);
+ GST_DEBUG_OBJECT (queue, "deactivating push mode");
+ queue->srcresult = GST_FLOW_FLUSHING;
+ queue->sinkresult = GST_FLOW_FLUSHING;
+ gst_queue2_locked_flush (queue);
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
+ }
+ result = TRUE;
+ break;
+ default:
+ result = FALSE;
+ break;
}
-
- gst_object_unref (queue);
-
return result;
}
/* src operating in push mode, we start a task on the source pad that pushes out
* buffers from the queue */
static gboolean
-gst_queue2_src_activate_push (GstPad * pad, gboolean active)
+gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
{
gboolean result = FALSE;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2760,8 +2977,8 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
/* unblock loop function */
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating push mode");
- queue->srcresult = GST_FLOW_WRONG_STATE;
- queue->sinkresult = GST_FLOW_WRONG_STATE;
+ queue->srcresult = GST_FLOW_FLUSHING;
+ queue->sinkresult = GST_FLOW_FLUSHING;
/* the item add signal will unblock */
GST_QUEUE2_SIGNAL_ADD (queue);
GST_QUEUE2_MUTEX_UNLOCK (queue);
@@ -2770,19 +2987,17 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active)
result = gst_pad_stop_task (pad);
}
- gst_object_unref (queue);
-
return result;
}
/* pull mode, downstream will call our getrange function */
static gboolean
-gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
+gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
{
gboolean result;
GstQueue2 *queue;
- queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+ queue = GST_QUEUE2 (parent);
if (active) {
GST_QUEUE2_MUTEX_LOCK (queue);
@@ -2808,26 +3023,46 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
/* this is not allowed, we cannot operate in pull mode without a temp
* file. */
- queue->srcresult = GST_FLOW_WRONG_STATE;
- queue->sinkresult = GST_FLOW_WRONG_STATE;
+ queue->srcresult = GST_FLOW_FLUSHING;
+ queue->sinkresult = GST_FLOW_FLUSHING;
result = FALSE;
}
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating pull mode");
- queue->srcresult = GST_FLOW_WRONG_STATE;
- queue->sinkresult = GST_FLOW_WRONG_STATE;
+ queue->srcresult = GST_FLOW_FLUSHING;
+ queue->sinkresult = GST_FLOW_FLUSHING;
/* this will unlock getrange */
GST_QUEUE2_SIGNAL_ADD (queue);
result = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
- gst_object_unref (queue);
return result;
}
+static gboolean
+gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
+ gboolean active)
+{
+ gboolean res;
+
+ switch (mode) {
+ case GST_PAD_MODE_PULL:
+ res = gst_queue2_src_activate_pull (pad, parent, active);
+ break;
+ case GST_PAD_MODE_PUSH:
+ res = gst_queue2_src_activate_push (pad, parent, active);
+ break;
+ default:
+ GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
+ res = FALSE;
+ break;
+ }
+ return res;
+}
+
static GstStateChangeReturn
gst_queue2_change_state (GstElement * element, GstStateChange transition)
{
@@ -2857,6 +3092,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
}
queue->segment_event_received = FALSE;
queue->starting_segment = NULL;
+ gst_event_replace (&queue->stream_start_event, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -2891,6 +3127,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
}
+ gst_event_replace (&queue->stream_start_event, NULL);
GST_QUEUE2_MUTEX_UNLOCK (queue);
break;
case GST_STATE_CHANGE_READY_TO_NULL: