diff options
Diffstat (limited to 'plugins/elements/gstqueue2.c')
-rw-r--r-- | plugins/elements/gstqueue2.c | 839 |
1 files changed, 538 insertions, 301 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index c89c6cd..82e2e11 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -63,6 +63,7 @@ #include <glib/gstdio.h> #include "gst/gst-i18n-lib.h" +#include "gst/glib-compat-private.h" #include <string.h> @@ -155,10 +156,10 @@ enum queue->max_level.time, \ (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \ queue->current->writing_pos - queue->current->max_reading_pos : \ - queue->queue->length)) + queue->queue.length)) #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \ - g_mutex_lock (q->qlock); \ + g_mutex_lock (&q->qlock); \ } G_STMT_END #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \ @@ -168,13 +169,13 @@ enum } G_STMT_END #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \ - g_mutex_unlock (q->qlock); \ + g_mutex_unlock (&q->qlock); \ } G_STMT_END #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \ STATUS (queue, q->sinkpad, "wait for DEL"); \ q->waiting_del = TRUE; \ - g_cond_wait (q->item_del, queue->qlock); \ + g_cond_wait (&q->item_del, &queue->qlock); \ q->waiting_del = FALSE; \ if (res != GST_FLOW_OK) { \ STATUS (queue, q->srcpad, "received DEL wakeup"); \ @@ -186,7 +187,7 @@ enum #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \ STATUS (queue, q->srcpad, "wait for ADD"); \ q->waiting_add = TRUE; \ - g_cond_wait (q->item_add, q->qlock); \ + g_cond_wait (&q->item_add, &q->qlock); \ q->waiting_add = FALSE; \ if (res != GST_FLOW_OK) { \ STATUS (queue, q->srcpad, "received ADD wakeup"); \ @@ -198,14 +199,14 @@ enum #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \ if (q->waiting_del) { \ STATUS (q, q->srcpad, "signal DEL"); \ - g_cond_signal (q->item_del); \ + g_cond_signal (&q->item_del); \ } \ } G_STMT_END #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \ if (q->waiting_add) { \ STATUS (q, q->sinkpad, "signal ADD"); \ - g_cond_signal (q->item_add); \ + g_cond_signal (&q->item_add); \ } \ } G_STMT_END @@ -223,26 +224,32 @@ static void gst_queue2_set_property (GObject * object, static void gst_queue2_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer); +static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); +static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list); static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue); static void gst_queue2_loop (GstPad * pad); -static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event); +static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, + GstQuery * query); -static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event); -static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query); +static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, + GstQuery * query); static gboolean gst_queue2_handle_query (GstElement * element, GstQuery * query); -static GstCaps *gst_queue2_getcaps (GstPad * pad, GstCaps * filter); -static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps); - -static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset, - guint length, GstBuffer ** buffer); +static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent, + guint64 offset, guint length, GstBuffer ** buffer); -static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active); -static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active); -static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active); +static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); +static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); static GstStateChangeReturn gst_queue2_change_state (GstElement * element, GstStateChange transition); @@ -251,6 +258,14 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue); static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range); +typedef enum +{ + GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0, + GST_QUEUE2_ITEM_TYPE_BUFFER, + GST_QUEUE2_ITEM_TYPE_BUFFER_LIST, + GST_QUEUE2_ITEM_TYPE_EVENT +} GstQueue2ItemType; + /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */ static void @@ -358,7 +373,7 @@ gst_queue2_class_init (GstQueue2Class * klass) gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); - gst_element_class_set_details_simple (gstelement_class, "Queue 2", + gst_element_class_set_static_metadata (gstelement_class, "Queue 2", "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>, " @@ -375,32 +390,28 @@ gst_queue2_init (GstQueue2 * queue) gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_chain)); - gst_pad_set_activatepush_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push)); + gst_pad_set_chain_list_function (queue->sinkpad, + GST_DEBUG_FUNCPTR (gst_queue2_chain_list)); + gst_pad_set_activatemode_function (queue->sinkpad, + GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode)); gst_pad_set_event_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event)); - gst_pad_set_getcaps_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue2_getcaps)); - gst_pad_set_acceptcaps_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps)); + gst_pad_set_query_function (queue->sinkpad, + GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query)); + GST_PAD_SET_PROXY_CAPS (queue->sinkpad); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); - gst_pad_set_activatepull_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull)); - gst_pad_set_activatepush_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push)); + gst_pad_set_activatemode_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode)); gst_pad_set_getrange_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue2_get_range)); - gst_pad_set_getcaps_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue2_getcaps)); - gst_pad_set_acceptcaps_function (queue->srcpad, - GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps)); gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event)); gst_pad_set_query_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query)); + GST_PAD_SET_PROXY_CAPS (queue->srcpad); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); /* levels */ @@ -422,18 +433,18 @@ gst_queue2_init (GstQueue2 * queue) queue->sink_tainted = TRUE; queue->src_tainted = TRUE; - queue->srcresult = GST_FLOW_WRONG_STATE; - queue->sinkresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; + queue->sinkresult = GST_FLOW_FLUSHING; queue->is_eos = FALSE; queue->in_timer = g_timer_new (); queue->out_timer = g_timer_new (); - queue->qlock = g_mutex_new (); + g_mutex_init (&queue->qlock); queue->waiting_add = FALSE; - queue->item_add = g_cond_new (); + g_cond_init (&queue->item_add); queue->waiting_del = FALSE; - queue->item_del = g_cond_new (); - queue->queue = g_queue_new (); + g_cond_init (&queue->item_del); + g_queue_init (&queue->queue); queue->buffering_percent = 100; @@ -458,16 +469,16 @@ gst_queue2_finalize (GObject * object) GST_DEBUG_OBJECT (queue, "finalizing queue"); - while (!g_queue_is_empty (queue->queue)) { - GstMiniObject *data = g_queue_pop_head (queue->queue); + while (!g_queue_is_empty (&queue->queue)) { + GstMiniObject *data = g_queue_pop_head (&queue->queue); gst_mini_object_unref (data); } - g_queue_free (queue->queue); - g_mutex_free (queue->qlock); - g_cond_free (queue->item_add); - g_cond_free (queue->item_del); + g_queue_clear (&queue->queue); + g_mutex_clear (&queue->qlock); + g_cond_clear (&queue->item_add); + g_cond_clear (&queue->item_del); g_timer_destroy (queue->in_timer); g_timer_destroy (queue->out_timer); @@ -611,42 +622,6 @@ init_ranges (GstQueue2 * queue) queue->current = add_range (queue, 0); } -static gboolean -gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps) -{ - GstQueue2 *queue; - GstPad *otherpad; - gboolean result; - - queue = GST_QUEUE2 (GST_PAD_PARENT (pad)); - - otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); - result = gst_pad_peer_accept_caps (otherpad, caps); - - return result; -} - -static GstCaps * -gst_queue2_getcaps (GstPad * pad, GstCaps * filter) -{ - GstQueue2 *queue; - GstPad *otherpad; - GstCaps *result; - - queue = GST_QUEUE2 (gst_pad_get_parent (pad)); - if (G_UNLIKELY (queue == NULL)) - return (filter ? gst_caps_ref (filter) : gst_caps_new_any ()); - - otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); - result = gst_pad_peer_get_caps (otherpad, filter); - if (result == NULL) - result = (filter ? gst_caps_ref (filter) : gst_caps_new_any ()); - - gst_object_unref (queue); - - return result; -} - /* calculate the diff between running time on the sink and src of the queue. * This is the total amount of time in the queue. */ static void @@ -686,11 +661,9 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment, gst_event_copy_segment (event, segment); if (segment->format == GST_FORMAT_BYTES) { - if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) { /* start is where we'll be getting from and as such writing next */ queue->current = add_range (queue, segment->start); - /* update the stats for this range */ - update_cur_level (queue, queue->current); } } @@ -749,6 +722,52 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, update_time_level (queue); } +static gboolean +buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data) +{ + GstClockTime *timestamp = data; + + GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT + " duration %" GST_TIME_FORMAT, idx, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); + + if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf)) + *timestamp = GST_BUFFER_TIMESTAMP (*buf); + + if (GST_BUFFER_DURATION_IS_VALID (*buf)) + *timestamp += GST_BUFFER_DURATION (*buf); + + GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp)); + return TRUE; +} + +/* take a buffer list and update segment, updating the time level of the queue */ +static void +apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, + GstSegment * segment, gboolean is_sink) +{ + GstClockTime timestamp; + + /* if no timestamp is set, assume it's continuous with the previous time */ + timestamp = segment->position; + + gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp); + + GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, + GST_TIME_ARGS (timestamp)); + + segment->position = timestamp; + + if (is_sink) + queue->sink_tainted = TRUE; + else + queue->src_tainted = TRUE; + + /* calc diff with other end */ + update_time_level (queue); +} + static void update_buffering (GstQueue2 * queue) { @@ -819,7 +838,7 @@ update_buffering (GstQueue2 * queue) mode = GST_BUFFERING_DOWNLOAD; if (queue->byte_in_rate > 0) { - if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES, + if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES, &duration)) { buffering_left = (gdouble) ((duration - @@ -984,6 +1003,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset) GstEvent *event; gboolean res; + /* until we receive the FLUSH_STOP from this seek, we skip data */ + queue->seeking = TRUE; GST_QUEUE2_MUTEX_UNLOCK (queue); GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset); @@ -1036,20 +1057,27 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) (offset + length) - range->writing_pos); } else { - GST_INFO_OBJECT (queue, "not found in any range"); - /* we don't have the range, see how far away we are, FIXME, find a good - * threshold based on the incoming rate. */ + GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT + " len %u", offset, length); + /* we don't have the range, see how far away we are */ if (!queue->is_eos && queue->current) { + /* FIXME, find a good threshold based on the incoming rate. */ + guint64 threshold = 1024 * 512; + if (QUEUE_IS_USING_RING_BUFFER (queue)) { - if (offset < queue->current->offset || offset > - queue->current->writing_pos + QUEUE_MAX_BYTES (queue) - - queue->cur_level.bytes) { - perform_seek_to_offset (queue, offset); - } else { + guint64 distance; + + distance = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes; + /* don't wait for the complete buffer to fill */ + distance = MIN (distance, threshold); + + if (offset >= queue->current->offset && offset <= + queue->current->writing_pos + distance) { GST_INFO_OBJECT (queue, "requested data is within range, wait for data"); + return FALSE; } - } else if (offset < queue->current->writing_pos + 200000) { + } else if (offset < queue->current->writing_pos + threshold) { update_cur_pos (queue, queue->current, offset + length); GST_INFO_OBJECT (queue, "wait for data"); return FALSE; @@ -1122,7 +1150,7 @@ could_not_read: eos: { GST_DEBUG ("non-regular file hits EOS"); - return GST_FLOW_UNEXPECTED; + return GST_FLOW_EOS; } } @@ -1131,22 +1159,30 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, GstBuffer ** buffer) { GstBuffer *buf; + GstMapInfo info; guint8 *data; guint64 file_offset; guint block_length, remaining, read_length; guint64 rb_size; + guint64 max_size; guint64 rpos; GstFlowReturn ret = GST_FLOW_OK; /* allocate the output buffer of the requested size */ - buf = gst_buffer_new_allocate (NULL, length, 0); - data = gst_buffer_map (buf, NULL, NULL, GST_MAP_WRITE); + if (*buffer == NULL) + buf = gst_buffer_new_allocate (NULL, length, NULL); + else + buf = *buffer; + + gst_buffer_map (buf, &info, GST_MAP_WRITE); + data = info.data; GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length, offset); rpos = offset; rb_size = queue->ring_buffer_max_size; + max_size = QUEUE_MAX_BYTES (queue); remaining = length; while (remaining > 0) { @@ -1165,16 +1201,16 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, GST_DEBUG_OBJECT (queue, "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT - ", level %" G_GUINT64_FORMAT, - rpos, queue->current->writing_pos, level); + ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT, + rpos, queue->current->writing_pos, level, max_size); - if (level >= rb_size) { + if (level >= max_size) { /* we don't have the data but if we have a ring buffer that is full, we * need to read */ GST_DEBUG_OBJECT (queue, - "ring buffer full, reading ring-buffer-max-size %" - G_GUINT64_FORMAT " bytes", rb_size); - read_length = rb_size; + "ring buffer full, reading QUEUE_MAX_BYTES %" + G_GUINT64_FORMAT " bytes", max_size); + read_length = max_size; } else if (queue->is_eos) { /* won't get any more data so read any data we have */ if (level) { @@ -1182,21 +1218,20 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have", level); read_length = level; + remaining = level; + length = level; } else goto hit_eos; } } if (read_length == 0) { - if (QUEUE_IS_USING_RING_BUFFER (queue) - && queue->current->max_reading_pos > rpos) { - /* protect cached data (data between offset and max_reading_pos) - * and update current level */ + if (QUEUE_IS_USING_RING_BUFFER (queue)) { GST_DEBUG_OBJECT (queue, - "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT - "]", rpos, queue->current->max_reading_pos); - queue->current->max_reading_pos = rpos; - update_cur_level (queue, queue->current); + "update current position [%" G_GUINT64_FORMAT "-%" + G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos); + update_cur_pos (queue, queue->current, rpos); + GST_QUEUE2_SIGNAL_DEL (queue); } GST_DEBUG_OBJECT (queue, "waiting for add"); GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); @@ -1251,7 +1286,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining); } - gst_buffer_unmap (buf, data, length); + gst_buffer_unmap (buf, &info); + gst_buffer_resize (buf, 0, length); GST_BUFFER_OFFSET (buf) = offset; GST_BUFFER_OFFSET_END (buf) = offset + length; @@ -1264,20 +1300,25 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, hit_eos: { GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data"); - gst_buffer_unref (buf); - return GST_FLOW_UNEXPECTED; + gst_buffer_unmap (buf, &info); + if (*buffer == NULL) + gst_buffer_unref (buf); + return GST_FLOW_EOS; } out_flushing: { GST_DEBUG_OBJECT (queue, "we are flushing"); - gst_buffer_unref (buf); - return GST_FLOW_WRONG_STATE; + gst_buffer_unmap (buf, &info); + if (*buffer == NULL) + gst_buffer_unref (buf); + return GST_FLOW_FLUSHING; } read_error: { GST_DEBUG_OBJECT (queue, "we have a read error"); - gst_buffer_unmap (buf, data, 0); - gst_buffer_unref (buf); + gst_buffer_unmap (buf, &info); + if (*buffer == NULL) + gst_buffer_unref (buf); return ret; } } @@ -1288,12 +1329,15 @@ gst_queue2_read_item_from_file (GstQueue2 * queue) { GstMiniObject *item; - if (queue->starting_segment != NULL) { + if (queue->stream_start_event != NULL) { + item = GST_MINI_OBJECT_CAST (queue->stream_start_event); + queue->stream_start_event = NULL; + } else if (queue->starting_segment != NULL) { item = GST_MINI_OBJECT_CAST (queue->starting_segment); queue->starting_segment = NULL; } else { GstFlowReturn ret; - GstBuffer *buffer; + GstBuffer *buffer = NULL; guint64 reading_pos; reading_pos = queue->current->reading_pos; @@ -1306,7 +1350,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue) case GST_FLOW_OK: item = GST_MINI_OBJECT_CAST (buffer); break; - case GST_FLOW_UNEXPECTED: + case GST_FLOW_EOS: item = GST_MINI_OBJECT_CAST (gst_event_new_eos ()); break; default: @@ -1442,8 +1486,8 @@ gst_queue2_locked_flush (GstQueue2 * queue) gst_queue2_flush_temp_file (queue); init_ranges (queue); } else { - while (!g_queue_is_empty (queue->queue)) { - GstMiniObject *data = g_queue_pop_head (queue->queue); + while (!g_queue_is_empty (&queue->queue)) { + GstMiniObject *data = g_queue_pop_head (&queue->queue); /* Then lose another reference because we are supposed to destroy that data when flushing */ @@ -1459,6 +1503,7 @@ gst_queue2_locked_flush (GstQueue2 * queue) gst_event_unref (queue->starting_segment); queue->starting_segment = NULL; queue->segment_event_received = FALSE; + gst_event_replace (&queue->stream_start_event, NULL); /* we deleted a lot of something */ GST_QUEUE2_SIGNAL_DEL (queue); @@ -1502,9 +1547,9 @@ out_flushing: static gboolean gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) { - guint8 *odata, *data, *ring_buffer; + GstMapInfo info; + guint8 *data, *ring_buffer; guint size, rb_size; - gsize osize; guint64 writing_pos, new_writing_pos; GstQueue2Range *range, *prev, *next; @@ -1515,13 +1560,13 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) ring_buffer = queue->ring_buffer; rb_size = queue->ring_buffer_max_size; - odata = gst_buffer_map (buffer, &osize, NULL, GST_MAP_READ); + gst_buffer_map (buffer, &info, GST_MAP_READ); - size = osize; - data = odata; + size = info.size; + data = info.data; GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size, - GST_BUFFER_OFFSET (buffer)); + writing_pos); while (size > 0) { guint to_write; @@ -1742,7 +1787,7 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) GST_QUEUE2_SIGNAL_ADD (queue); } - gst_buffer_unmap (buffer, odata, osize); + gst_buffer_unmap (buffer, &info); return TRUE; @@ -1750,14 +1795,14 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) out_flushing: { GST_DEBUG_OBJECT (queue, "we are flushing"); - gst_buffer_unmap (buffer, odata, osize); - /* FIXME - GST_FLOW_UNEXPECTED ? */ + gst_buffer_unmap (buffer, &info); + /* FIXME - GST_FLOW_EOS ? */ return FALSE; } seek_failed: { GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM); - gst_buffer_unmap (buffer, odata, osize); + gst_buffer_unmap (buffer, &info); return FALSE; } handle_error: @@ -1773,16 +1818,45 @@ handle_error: ("%s", g_strerror (errno))); } } - gst_buffer_unmap (buffer, odata, osize); + gst_buffer_unmap (buffer, &info); return FALSE; } } +static gboolean +buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q) +{ + GstQueue2 *queue = q; + + GST_TRACE_OBJECT (queue, + "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx, + gst_buffer_get_size (*buf)); + + if (!gst_queue2_create_write (queue, *buf)) { + GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out"); + return FALSE; + } + return TRUE; +} + +static gboolean +buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data) +{ + guint *p_size = data; + gsize buf_size; + + buf_size = gst_buffer_get_size (*buf); + GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size); + *p_size += buf_size; + return TRUE; +} + /* enqueue an item an update the level stats */ static void -gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer) +gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, + GstQueue2ItemType item_type) { - if (isbuffer) { + if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { GstBuffer *buffer; guint size; @@ -1805,7 +1879,32 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer) /* FIXME - check return value? */ gst_queue2_create_write (queue, buffer); } - } else if (GST_IS_EVENT (item)) { + } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list; + guint size = 0; + + buffer_list = GST_BUFFER_LIST_CAST (item); + + gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size); + GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size); + + /* add buffer to the statistics */ + if (QUEUE_IS_USING_QUEUE (queue)) { + queue->cur_level.buffers++; + queue->cur_level.bytes += size; + } + queue->bytes_in += size; + + /* apply new buffer to segment stats */ + apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE); + + /* update the byterate stats */ + update_in_rates (queue); + + if (!QUEUE_IS_USING_QUEUE (queue)) { + gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue); + } + } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { GstEvent *event; event = GST_EVENT_CAST (item); @@ -1831,10 +1930,17 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer) queue->starting_segment = event; item = NULL; } - /* a new segment allows us to accept more buffers if we got UNEXPECTED + /* a new segment allows us to accept more buffers if we got EOS * from downstream */ queue->unexpected = FALSE; break; + case GST_EVENT_STREAM_START: + if (!QUEUE_IS_USING_QUEUE (queue)) { + gst_event_replace (&queue->stream_start_event, event); + gst_event_unref (event); + item = NULL; + } + break; default: if (!QUEUE_IS_USING_QUEUE (queue)) goto unexpected_event; @@ -1853,7 +1959,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer) update_buffering (queue); if (QUEUE_IS_USING_QUEUE (queue)) { - g_queue_push_tail (queue->queue, item); + g_queue_push_tail (&queue->queue, item); } else { gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); } @@ -1877,14 +1983,14 @@ unexpected_event: /* dequeue an item from the queue and update level stats */ static GstMiniObject * -gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) +gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) { GstMiniObject *item; if (!QUEUE_IS_USING_QUEUE (queue)) item = gst_queue2_read_item_from_file (queue); else - item = g_queue_pop_head (queue->queue); + item = g_queue_pop_head (&queue->queue); if (item == NULL) goto no_item; @@ -1895,7 +2001,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) buffer = GST_BUFFER_CAST (item); size = gst_buffer_get_size (buffer); - *is_buffer = TRUE; + *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER; GST_CAT_LOG_OBJECT (queue_dataflow, queue, "retrieved buffer %p from queue", buffer); @@ -1916,7 +2022,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) } else if (GST_IS_EVENT (item)) { GstEvent *event = GST_EVENT_CAST (item); - *is_buffer = FALSE; + *item_type = GST_QUEUE2_ITEM_TYPE_EVENT; GST_CAT_LOG_OBJECT (queue_dataflow, queue, "retrieved event %p from queue", event); @@ -1932,11 +2038,36 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) default: break; } + } else if (GST_IS_BUFFER_LIST (item)) { + GstBufferList *buffer_list; + guint size = 0; + + buffer_list = GST_BUFFER_LIST_CAST (item); + gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size); + *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "retrieved buffer list %p from queue", buffer_list); + + if (QUEUE_IS_USING_QUEUE (queue)) { + queue->cur_level.buffers--; + queue->cur_level.bytes -= size; + } + queue->bytes_out += size; + + apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE); + /* update the byterate stats */ + update_out_rates (queue); + /* update the buffering */ + if (queue->use_buffering) + update_buffering (queue); + } else { g_warning ("Unexpected item %p dequeued from queue %s (refcounting problem?)", item, GST_OBJECT_NAME (queue)); item = NULL; + *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN; } GST_QUEUE2_SIGNAL_DEL (queue); @@ -1951,24 +2082,25 @@ no_item: } static gboolean -gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) +gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event) { GstQueue2 *queue; - queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad)); + queue = GST_QUEUE2 (parent); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); - if (QUEUE_IS_USING_QUEUE (queue)) { + if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) { /* forward event */ gst_pad_push_event (queue->srcpad, event); /* now unblock the chain function */ GST_QUEUE2_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_WRONG_STATE; - queue->sinkresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; + queue->sinkresult = GST_FLOW_FLUSHING; /* unblock the loop and chain functions */ GST_QUEUE2_SIGNAL_ADD (queue); GST_QUEUE2_SIGNAL_DEL (queue); @@ -1981,7 +2113,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) } else { GST_QUEUE2_MUTEX_LOCK (queue); /* flush the sink pad */ - queue->sinkresult = GST_FLOW_WRONG_STATE; + queue->sinkresult = GST_FLOW_FLUSHING; GST_QUEUE2_SIGNAL_DEL (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); @@ -1993,7 +2125,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event"); - if (QUEUE_IS_USING_QUEUE (queue)) { + if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) { /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -2003,6 +2135,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) queue->sinkresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; + queue->seeking = FALSE; /* reset rate counters */ reset_rate_timer (queue); gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, @@ -2014,6 +2147,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) queue->is_eos = FALSE; queue->unexpected = FALSE; queue->sinkresult = GST_FLOW_OK; + queue->seeking = FALSE; GST_QUEUE2_MUTEX_UNLOCK (queue); gst_event_unref (event); @@ -2027,7 +2161,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) /* refuse more events on EOS */ if (queue->is_eos) goto out_eos; - gst_queue2_locked_enqueue (queue, event, FALSE); + gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT); GST_QUEUE2_MUTEX_UNLOCK (queue); } else { /* non-serialized events are passed upstream. */ @@ -2056,6 +2190,25 @@ out_eos: } static gboolean +gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent, + GstQuery * query) +{ + gboolean res; + + switch (GST_QUERY_TYPE (query)) { + default: + if (GST_QUERY_IS_SERIALIZED (query)) { + GST_WARNING_OBJECT (pad, "unhandled serialized query"); + res = FALSE; + } else { + res = gst_pad_query_default (pad, parent, query); + } + break; + } + return res; +} + +static gboolean gst_queue2_is_empty (GstQueue2 * queue) { /* never empty on EOS */ @@ -2065,7 +2218,7 @@ gst_queue2_is_empty (GstQueue2 * queue) if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) { return queue->current->writing_pos <= queue->current->max_reading_pos; } else { - if (queue->queue->length == 0) + if (queue->queue.length == 0) return TRUE; } @@ -2117,18 +2270,9 @@ gst_queue2_is_filled (GstQueue2 * queue) } static GstFlowReturn -gst_queue2_chain (GstPad * pad, GstBuffer * buffer) +gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue, + GstMiniObject * item, GstQueue2ItemType item_type) { - GstQueue2 *queue; - - queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad)); - - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %" - G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %" - GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); - /* we have to lock the queue since we span threads */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); /* when we received EOS, we refuse more data */ @@ -2138,11 +2282,15 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer) if (queue->unexpected) goto out_unexpected; + /* while we didn't receive the newsegment, we're seeking and we skip data */ + if (queue->seeking) + goto out_seeking; + if (!gst_queue2_wait_free_space (queue)) goto out_flushing; /* put buffer in queue now */ - gst_queue2_locked_enqueue (queue, buffer, TRUE); + gst_queue2_locked_enqueue (queue, item, item_type); GST_QUEUE2_MUTEX_UNLOCK (queue); return GST_FLOW_OK; @@ -2155,7 +2303,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (item); return ret; } @@ -2163,19 +2311,102 @@ out_eos: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (item); - return GST_FLOW_UNEXPECTED; + return GST_FLOW_EOS; + } +out_seeking: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking"); + GST_QUEUE2_MUTEX_UNLOCK (queue); + gst_mini_object_unref (item); + + return GST_FLOW_OK; } out_unexpected: { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "exit because we received UNEXPECTED"); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (item); + + return GST_FLOW_EOS; + } +} + +static GstFlowReturn +gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstQueue2 *queue; + + queue = GST_QUEUE2 (parent); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of " + "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %" + GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + + return gst_queue2_chain_buffer_or_buffer_list (queue, + GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER); +} + +static GstFlowReturn +gst_queue2_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list) +{ + GstQueue2 *queue; - return GST_FLOW_UNEXPECTED; + queue = GST_QUEUE2 (parent); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "received buffer list %p", buffer_list); + + return gst_queue2_chain_buffer_or_buffer_list (queue, + GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST); +} + +static GstMiniObject * +gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type) +{ + GstMiniObject *data; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream"); + + /* stop pushing buffers, we dequeue all items until we see an item that we + * can push again, which is EOS or SEGMENT. If there is nothing in the + * queue we can push, we set a flag to make the sinkpad refuse more + * buffers with an EOS return value until we receive something + * pushable again or we get flushed. */ + while ((data = gst_queue2_locked_dequeue (queue, item_type))) { + if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS buffer %p", data); + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { + GstEvent *event = GST_EVENT_CAST (data); + GstEventType type = GST_EVENT_TYPE (event); + + if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) { + /* we found a pushable item in the queue, push it out */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event)); + return data; + } + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS event %p", event); + gst_event_unref (event); + } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS buffer list %p", data); + gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data)); + } } + /* no more items in the queue. Set the unexpected flag so that upstream + * make us refuse any more buffers on the sinkpad. Since we will still + * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the + * task function does not shut down. */ + queue->unexpected = TRUE; + return NULL; } /* dequeue an item from the queue an push it downstream. This functions returns @@ -2185,16 +2416,16 @@ gst_queue2_push_one (GstQueue2 * queue) { GstFlowReturn result = GST_FLOW_OK; GstMiniObject *data; - gboolean is_buffer = FALSE; + GstQueue2ItemType item_type; - data = gst_queue2_locked_dequeue (queue, &is_buffer); + data = gst_queue2_locked_dequeue (queue, &item_type); if (data == NULL) goto no_item; next: GST_QUEUE2_MUTEX_UNLOCK (queue); - if (is_buffer) { + if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { GstBuffer *buffer; #if 0 GstCaps *caps; @@ -2216,56 +2447,59 @@ next: /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); - if (result == GST_FLOW_UNEXPECTED) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "got UNEXPECTED from downstream"); - /* stop pushing buffers, we dequeue all items until we see an item that we - * can push again, which is EOS or SEGMENT. If there is nothing in the - * queue we can push, we set a flag to make the sinkpad refuse more - * buffers with an UNEXPECTED return value until we receive something - * pushable again or we get flushed. */ - while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) { - if (is_buffer) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping UNEXPECTED buffer %p", data); - gst_buffer_unref (GST_BUFFER_CAST (data)); - } else if (GST_IS_EVENT (data)) { - GstEvent *event = GST_EVENT_CAST (data); - GstEventType type = GST_EVENT_TYPE (event); - - if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) { - /* we found a pushable item in the queue, push it out */ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "pushing pushable event %s after UNEXPECTED", - GST_EVENT_TYPE_NAME (event)); - goto next; - } - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping UNEXPECTED event %p", event); - gst_event_unref (event); - } - } - /* no more items in the queue. Set the unexpected flag so that upstream - * make us refuse any more buffers on the sinkpad. Since we will still - * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the - * task function does not shut down. */ - queue->unexpected = TRUE; + if (result == GST_FLOW_EOS) { + data = gst_queue2_dequeue_on_eos (queue, &item_type); + if (data != NULL) + goto next; + /* Since we will still accept EOS and SEGMENT we return _FLOW_OK + * to the caller so that the task function does not shut down */ result = GST_FLOW_OK; } - } else if (GST_IS_EVENT (data)) { + } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); gst_pad_push_event (queue->srcpad, event); - /* if we're EOS, return UNEXPECTED so that the task pauses. */ + /* if we're EOS, return EOS so that the task pauses. */ if (type == GST_EVENT_EOS) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "pushed EOS event %p, return UNEXPECTED", event); - result = GST_FLOW_UNEXPECTED; + "pushed EOS event %p, return EOS", event); + result = GST_FLOW_EOS; } GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); + } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list; +#if 0 + GstBuffer *first_buf; + GstCaps *caps; +#endif + + buffer_list = GST_BUFFER_LIST_CAST (data); + +#if 0 + first_buf = gst_buffer_list_get (buffer_list, 0); + caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL; + + /* set caps before pushing the buffer so that core does not try to do + * something fancy to check if this is possible. */ + if (caps && caps != GST_PAD_CAPS (queue->srcpad)) + gst_pad_set_caps (queue->srcpad, caps); +#endif + + result = gst_pad_push_list (queue->srcpad, buffer_list); + + /* need to check for srcresult here as well */ + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); + if (result == GST_FLOW_EOS) { + data = gst_queue2_dequeue_on_eos (queue, &item_type); + if (data != NULL) + goto next; + /* Since we will still accept EOS and SEGMENT we return _FLOW_OK + * to the caller so that the task function does not shut down */ + result = GST_FLOW_OK; + } } return result; @@ -2279,7 +2513,7 @@ no_item: out_flushing: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing"); - return GST_FLOW_WRONG_STATE; + return GST_FLOW_FLUSHING; } } @@ -2337,8 +2571,8 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (queue->srcresult)); /* let app know about us giving up if upstream is not expected to do so */ - /* UNEXPECTED is already taken care of elsewhere */ - if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) { + /* EOS is already taken care of elsewhere */ + if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { GST_ELEMENT_ERROR (queue, STREAM, FAILED, (_("Internal data flow error.")), ("streaming task paused, reason %s (%d)", @@ -2350,15 +2584,11 @@ out_flushing: } static gboolean -gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) +gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { gboolean res = TRUE; - GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad)); + GstQueue2 *queue = GST_QUEUE2 (parent); - if (G_UNLIKELY (queue == NULL)) { - gst_event_unref (event); - return FALSE; - } #ifndef GST_DISABLE_GST_DEBUG GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", event, GST_EVENT_TYPE_NAME (event)); @@ -2373,7 +2603,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) /* now unblock the getrange function */ GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "flushing"); - queue->srcresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; GST_QUEUE2_SIGNAL_ADD (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); @@ -2390,12 +2620,6 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) /* now unblock the getrange function */ GST_QUEUE2_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; - if (queue->current) { - /* forget the highest read offset, we'll calculate a new one when we - * get the next getrange request. We need to do this in order to reset - * the buffering percentage */ - queue->current->max_reading_pos = 0; - } GST_QUEUE2_MUTEX_UNLOCK (queue); /* when using a temp file, we eat the event */ @@ -2408,31 +2632,15 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) break; } - gst_object_unref (queue); return res; } static gboolean -gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query) -{ - gboolean ret = FALSE; - GstPad *peer; - - if ((peer = gst_pad_get_peer (pad))) { - ret = gst_pad_query (peer, query); - gst_object_unref (peer); - } - return ret; -} - -static gboolean -gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) +gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) { GstQueue2 *queue; - queue = GST_QUEUE2 (gst_pad_get_parent (pad)); - if (G_UNLIKELY (queue == NULL)) - return FALSE; + queue = GST_QUEUE2 (parent); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION: @@ -2440,7 +2648,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) gint64 peer_pos; GstFormat format; - if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) + if (!gst_pad_peer_query (queue->sinkpad, query)) goto peer_failed; /* get peer position */ @@ -2467,7 +2675,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) { GST_DEBUG_OBJECT (queue, "doing peer query"); - if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) + if (!gst_pad_peer_query (queue->sinkpad, query)) goto peer_failed; GST_DEBUG_OBJECT (queue, "peer query success"); @@ -2482,7 +2690,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) /* FIXME - is this condition correct? what should ring buffer do? */ if (QUEUE_IS_USING_QUEUE (queue)) { /* no temp file, just forward to the peer */ - if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) + if (!gst_pad_peer_query (queue->sinkpad, query)) goto peer_failed; GST_DEBUG_OBJECT (queue, "buffering forwarded to peer"); } else { @@ -2512,7 +2720,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) duration = writing_pos; } else { /* get duration of upstream in bytes */ - peer_res = gst_pad_query_peer_duration (queue->sinkpad, + peer_res = gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES, &duration); } @@ -2597,28 +2805,32 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) case GST_QUERY_SCHEDULING: { gboolean pull_mode; + GstSchedulingFlags flags = 0; /* we can operate in pull mode when we are using a tempfile */ pull_mode = !QUEUE_IS_USING_QUEUE (queue); - gst_query_set_scheduling (query, pull_mode, pull_mode, FALSE, 0, -1, 1); + if (pull_mode) + flags |= GST_SCHEDULING_FLAG_SEEKABLE; + gst_query_set_scheduling (query, flags, 0, -1, 0); + if (pull_mode) + gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL); + gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH); break; } default: /* peer handled other queries */ - if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) + if (!gst_pad_query_default (pad, parent, query)) goto peer_failed; break; } - gst_object_unref (queue); return TRUE; /* ERRORS */ peer_failed: { GST_DEBUG_OBJECT (queue, "failed peer query"); - gst_object_unref (queue); return FALSE; } } @@ -2626,8 +2838,11 @@ peer_failed: static gboolean gst_queue2_handle_query (GstElement * element, GstQuery * query) { + GstQueue2 *queue = GST_QUEUE2 (element); + /* simply forward to the srcpad query function */ - return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query); + return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element), + query); } static void @@ -2635,7 +2850,7 @@ gst_queue2_update_upstream_size (GstQueue2 * queue) { gint64 upstream_size = -1; - if (gst_pad_query_peer_duration (queue->sinkpad, GST_FORMAT_BYTES, + if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES, &upstream_size)) { GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size); queue->upstream_size = upstream_size; @@ -2643,13 +2858,13 @@ gst_queue2_update_upstream_size (GstQueue2 * queue) } static GstFlowReturn -gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, - GstBuffer ** buffer) +gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset, + guint length, GstBuffer ** buffer) { GstQueue2 *queue; GstFlowReturn ret; - queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad)); + queue = GST_QUEUE2_CAST (parent); length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); @@ -2681,8 +2896,6 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, ret = gst_queue2_create_read (queue, offset, length, buffer); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_object_unref (queue); - return ret; /* ERRORS */ @@ -2692,60 +2905,64 @@ out_flushing: GST_DEBUG_OBJECT (queue, "we are flushing"); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_object_unref (queue); return ret; } out_unexpected: { GST_DEBUG_OBJECT (queue, "read beyond end of file"); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_object_unref (queue); - return GST_FLOW_UNEXPECTED; + return GST_FLOW_EOS; } } /* sink currently only operates in push mode */ static gboolean -gst_queue2_sink_activate_push (GstPad * pad, gboolean active) +gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) { - gboolean result = TRUE; + gboolean result; GstQueue2 *queue; - queue = GST_QUEUE2 (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (parent); - if (active) { - GST_QUEUE2_MUTEX_LOCK (queue); - GST_DEBUG_OBJECT (queue, "activating push mode"); - queue->srcresult = GST_FLOW_OK; - queue->sinkresult = GST_FLOW_OK; - queue->is_eos = FALSE; - queue->unexpected = FALSE; - reset_rate_timer (queue); - GST_QUEUE2_MUTEX_UNLOCK (queue); - } else { - /* unblock chain function */ - GST_QUEUE2_MUTEX_LOCK (queue); - GST_DEBUG_OBJECT (queue, "deactivating push mode"); - queue->srcresult = GST_FLOW_WRONG_STATE; - queue->sinkresult = GST_FLOW_WRONG_STATE; - gst_queue2_locked_flush (queue); - GST_QUEUE2_MUTEX_UNLOCK (queue); + switch (mode) { + case GST_PAD_MODE_PUSH: + if (active) { + GST_QUEUE2_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "activating push mode"); + queue->srcresult = GST_FLOW_OK; + queue->sinkresult = GST_FLOW_OK; + queue->is_eos = FALSE; + queue->unexpected = FALSE; + reset_rate_timer (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); + } else { + /* unblock chain function */ + GST_QUEUE2_MUTEX_LOCK (queue); + GST_DEBUG_OBJECT (queue, "deactivating push mode"); + queue->srcresult = GST_FLOW_FLUSHING; + queue->sinkresult = GST_FLOW_FLUSHING; + gst_queue2_locked_flush (queue); + GST_QUEUE2_MUTEX_UNLOCK (queue); + } + result = TRUE; + break; + default: + result = FALSE; + break; } - - gst_object_unref (queue); - return result; } /* src operating in push mode, we start a task on the source pad that pushes out * buffers from the queue */ static gboolean -gst_queue2_src_activate_push (GstPad * pad, gboolean active) +gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) { gboolean result = FALSE; GstQueue2 *queue; - queue = GST_QUEUE2 (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (parent); if (active) { GST_QUEUE2_MUTEX_LOCK (queue); @@ -2760,8 +2977,8 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active) /* unblock loop function */ GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating push mode"); - queue->srcresult = GST_FLOW_WRONG_STATE; - queue->sinkresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; + queue->sinkresult = GST_FLOW_FLUSHING; /* the item add signal will unblock */ GST_QUEUE2_SIGNAL_ADD (queue); GST_QUEUE2_MUTEX_UNLOCK (queue); @@ -2770,19 +2987,17 @@ gst_queue2_src_activate_push (GstPad * pad, gboolean active) result = gst_pad_stop_task (pad); } - gst_object_unref (queue); - return result; } /* pull mode, downstream will call our getrange function */ static gboolean -gst_queue2_src_activate_pull (GstPad * pad, gboolean active) +gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active) { gboolean result; GstQueue2 *queue; - queue = GST_QUEUE2 (gst_pad_get_parent (pad)); + queue = GST_QUEUE2 (parent); if (active) { GST_QUEUE2_MUTEX_LOCK (queue); @@ -2808,26 +3023,46 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode"); /* this is not allowed, we cannot operate in pull mode without a temp * file. */ - queue->srcresult = GST_FLOW_WRONG_STATE; - queue->sinkresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; + queue->sinkresult = GST_FLOW_FLUSHING; result = FALSE; } GST_QUEUE2_MUTEX_UNLOCK (queue); } else { GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "deactivating pull mode"); - queue->srcresult = GST_FLOW_WRONG_STATE; - queue->sinkresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; + queue->sinkresult = GST_FLOW_FLUSHING; /* this will unlock getrange */ GST_QUEUE2_SIGNAL_ADD (queue); result = TRUE; GST_QUEUE2_MUTEX_UNLOCK (queue); } - gst_object_unref (queue); return result; } +static gboolean +gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, + gboolean active) +{ + gboolean res; + + switch (mode) { + case GST_PAD_MODE_PULL: + res = gst_queue2_src_activate_pull (pad, parent, active); + break; + case GST_PAD_MODE_PUSH: + res = gst_queue2_src_activate_push (pad, parent, active); + break; + default: + GST_LOG_OBJECT (pad, "unknown activation mode %d", mode); + res = FALSE; + break; + } + return res; +} + static GstStateChangeReturn gst_queue2_change_state (GstElement * element, GstStateChange transition) { @@ -2857,6 +3092,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) } queue->segment_event_received = FALSE; queue->starting_segment = NULL; + gst_event_replace (&queue->stream_start_event, NULL); GST_QUEUE2_MUTEX_UNLOCK (queue); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: @@ -2891,6 +3127,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) gst_event_unref (queue->starting_segment); queue->starting_segment = NULL; } + gst_event_replace (&queue->stream_start_event, NULL); GST_QUEUE2_MUTEX_UNLOCK (queue); break; case GST_STATE_CHANGE_READY_TO_NULL: |