diff options
Diffstat (limited to 'plugins/elements/gstqueue.c')
-rw-r--r-- | plugins/elements/gstqueue.c | 493 |
1 files changed, 206 insertions, 287 deletions
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 19ccf8f..aa494be 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -61,6 +61,7 @@ #include "gstqueue.h" #include "../../gst/gst-i18n-lib.h" +#include "../../gst/glib-compat-private.h" static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -91,7 +92,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_dataflow); queue->cur_level.time, \ queue->min_threshold.time, \ queue->max_size.time, \ - queue->queue->length) + queue->queue.length) /* Queue signals and args */ enum @@ -127,7 +128,7 @@ enum #define DEFAULT_MAX_SIZE_TIME GST_SECOND /* 1 second */ #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ - g_mutex_lock (q->qlock); \ + g_mutex_lock (&q->qlock); \ } G_STMT_END #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ @@ -137,13 +138,13 @@ enum } G_STMT_END #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ - g_mutex_unlock (q->qlock); \ + g_mutex_unlock (&q->qlock); \ } G_STMT_END #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \ STATUS (q, q->sinkpad, "wait for DEL"); \ q->waiting_del = TRUE; \ - g_cond_wait (q->item_del, q->qlock); \ + g_cond_wait (&q->item_del, &q->qlock); \ q->waiting_del = FALSE; \ if (q->srcresult != GST_FLOW_OK) { \ STATUS (q, q->srcpad, "received DEL wakeup"); \ @@ -155,7 +156,7 @@ enum #define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START { \ STATUS (q, q->srcpad, "wait for ADD"); \ q->waiting_add = TRUE; \ - g_cond_wait (q->item_add, q->qlock); \ + g_cond_wait (&q->item_add, &q->qlock); \ q->waiting_add = FALSE; \ if (q->srcresult != GST_FLOW_OK) { \ STATUS (q, q->srcpad, "received ADD wakeup"); \ @@ -167,14 +168,14 @@ enum #define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START { \ if (q->waiting_del) { \ STATUS (q, q->srcpad, "signal DEL"); \ - g_cond_signal (q->item_del); \ + g_cond_signal (&q->item_del); \ } \ } G_STMT_END #define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START { \ if (q->waiting_add) { \ STATUS (q, q->sinkpad, "signal ADD"); \ - g_cond_signal (q->item_add); \ + g_cond_signal (&q->item_add); \ } \ } G_STMT_END @@ -192,23 +193,27 @@ static void gst_queue_set_property (GObject * object, static void gst_queue_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer); +static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); static GstFlowReturn gst_queue_push_one (GstQueue * queue); static void gst_queue_loop (GstPad * pad); -static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event); +static gboolean gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, + GstQuery * query); -static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event); -static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query); +static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent, + GstQuery * query); -static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps); -static GstCaps *gst_queue_getcaps (GstPad * pad, GstCaps * filter); -static GstPadLinkReturn gst_queue_link_sink (GstPad * pad, GstPad * peer); -static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer); static void gst_queue_locked_flush (GstQueue * queue); -static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active); -static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active); +static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); +static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); static gboolean gst_queue_is_empty (GstQueue * queue); static gboolean gst_queue_is_filled (GstQueue * queue); @@ -358,7 +363,7 @@ gst_queue_class_init (GstQueueClass * klass) gobject_class->finalize = gst_queue_finalize; - gst_element_class_set_details_simple (gstelement_class, + gst_element_class_set_static_metadata (gstelement_class, "Queue", "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>"); gst_element_class_add_pad_template (gstelement_class, @@ -367,16 +372,12 @@ gst_queue_class_init (GstQueueClass * klass) gst_static_pad_template_get (&sinktemplate)); /* Registering debug symbols for function pointers */ - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain); - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_sink_activate_push); + GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode); GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_event); - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_link_sink); - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_getcaps); - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_acceptcaps); - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_push); - GST_DEBUG_REGISTER_FUNCPTR (gst_queue_link_src); + GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_query); GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event); GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query); + GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain); } static void @@ -385,23 +386,20 @@ gst_queue_init (GstQueue * queue) queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain); - gst_pad_set_activatepush_function (queue->sinkpad, - gst_queue_sink_activate_push); + gst_pad_set_activatemode_function (queue->sinkpad, + gst_queue_sink_activate_mode); gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event); - gst_pad_set_link_function (queue->sinkpad, gst_queue_link_sink); - gst_pad_set_getcaps_function (queue->sinkpad, gst_queue_getcaps); - gst_pad_set_acceptcaps_function (queue->sinkpad, gst_queue_acceptcaps); + gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query); + GST_PAD_SET_PROXY_CAPS (queue->sinkpad); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); - gst_pad_set_activatepush_function (queue->srcpad, - gst_queue_src_activate_push); - gst_pad_set_link_function (queue->srcpad, gst_queue_link_src); - gst_pad_set_acceptcaps_function (queue->srcpad, gst_queue_acceptcaps); - gst_pad_set_getcaps_function (queue->srcpad, gst_queue_getcaps); + gst_pad_set_activatemode_function (queue->srcpad, + gst_queue_src_activate_mode); gst_pad_set_event_function (queue->srcpad, gst_queue_handle_src_event); gst_pad_set_query_function (queue->srcpad, gst_queue_handle_src_query); + GST_PAD_SET_PROXY_CAPS (queue->srcpad); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); GST_QUEUE_CLEAR_LEVEL (queue->cur_level); @@ -415,12 +413,13 @@ gst_queue_init (GstQueue * queue) queue->head_needs_discont = queue->tail_needs_discont = FALSE; queue->leaky = GST_QUEUE_NO_LEAK; - queue->srcresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; + + g_mutex_init (&queue->qlock); + g_cond_init (&queue->item_add); + g_cond_init (&queue->item_del); - queue->qlock = g_mutex_new (); - queue->item_add = g_cond_new (); - queue->item_del = g_cond_new (); - queue->queue = g_queue_new (); + g_queue_init (&queue->queue); queue->sinktime = GST_CLOCK_TIME_NONE; queue->srctime = GST_CLOCK_TIME_NONE; @@ -438,98 +437,22 @@ gst_queue_init (GstQueue * queue) static void gst_queue_finalize (GObject * object) { + GstMiniObject *data; GstQueue *queue = GST_QUEUE (object); GST_DEBUG_OBJECT (queue, "finalizing queue"); - while (!g_queue_is_empty (queue->queue)) { - GstMiniObject *data = g_queue_pop_head (queue->queue); - - gst_mini_object_unref (data); - } - g_queue_free (queue->queue); - g_mutex_free (queue->qlock); - g_cond_free (queue->item_add); - g_cond_free (queue->item_del); - - G_OBJECT_CLASS (parent_class)->finalize (object); -} - -static gboolean -gst_queue_acceptcaps (GstPad * pad, GstCaps * caps) -{ - gboolean result; - GstQueue *queue; - GstPad *otherpad; - - queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (G_UNLIKELY (queue == NULL)) - return FALSE; - - otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); - result = gst_pad_peer_accept_caps (otherpad, caps); - - gst_object_unref (queue); - - return result; -} - -static GstCaps * -gst_queue_getcaps (GstPad * pad, GstCaps * filter) -{ - GstQueue *queue; - GstPad *otherpad; - GstCaps *result; - - queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (G_UNLIKELY (queue == NULL)) - return (filter ? gst_caps_ref (filter) : gst_caps_new_any ()); - - otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); - result = gst_pad_peer_get_caps (otherpad, filter); - if (result == NULL) - result = (filter ? gst_caps_ref (filter) : gst_caps_new_any ()); - - gst_object_unref (queue); - - return result; -} - -static GstPadLinkReturn -gst_queue_link_sink (GstPad * pad, GstPad * peer) -{ - return GST_PAD_LINK_OK; -} - -static GstPadLinkReturn -gst_queue_link_src (GstPad * pad, GstPad * peer) -{ - GstPadLinkReturn result = GST_PAD_LINK_OK; - GstQueue *queue; - - queue = GST_QUEUE (gst_pad_get_parent (pad)); - - GST_DEBUG_OBJECT (queue, "queue linking source pad"); - - if (GST_PAD_LINKFUNC (peer)) { - result = GST_PAD_LINKFUNC (peer) (peer, pad); + while ((data = g_queue_pop_head (&queue->queue))) { + if (!GST_IS_QUERY (data)) + gst_mini_object_unref (data); } - if (GST_PAD_LINK_SUCCESSFUL (result)) { - GST_QUEUE_MUTEX_LOCK (queue); - if (queue->srcresult == GST_FLOW_OK) { - queue->push_newsegment = TRUE; - gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - GST_DEBUG_OBJECT (queue, "starting task as pad is linked"); - } else { - GST_DEBUG_OBJECT (queue, "not starting task reason %s", - gst_flow_get_name (queue->srcresult)); - } - GST_QUEUE_MUTEX_UNLOCK (queue); - } - gst_object_unref (queue); + g_queue_clear (&queue->queue); + g_mutex_clear (&queue->qlock); + g_cond_clear (&queue->item_add); + g_cond_clear (&queue->item_del); - return result; + G_OBJECT_CLASS (parent_class)->finalize (object); } /* calculate the diff between running time on the sink and src of the queue. @@ -631,12 +554,13 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment, static void gst_queue_locked_flush (GstQueue * queue) { - while (!g_queue_is_empty (queue->queue)) { - GstMiniObject *data = g_queue_pop_head (queue->queue); + GstMiniObject *data; + while ((data = g_queue_pop_head (&queue->queue))) { /* Then lose another reference because we are supposed to destroy that data when flushing */ - gst_mini_object_unref (data); + if (!GST_IS_QUERY (data)) + gst_mini_object_unref (data); } GST_QUEUE_CLEAR_LEVEL (queue->cur_level); queue->min_threshold.buffers = queue->orig_min_threshold.buffers; @@ -664,7 +588,7 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) queue->cur_level.bytes += gst_buffer_get_size (buffer); apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE); - g_queue_push_tail (queue->queue, item); + g_queue_push_tail (&queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -685,12 +609,12 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) case GST_EVENT_SEGMENT: apply_segment (queue, event, &queue->sink_segment, TRUE); /* if the queue is empty, apply sink segment on the source */ - if (queue->queue->length == 0) { + if (queue->queue.length == 0) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad"); apply_segment (queue, event, &queue->src_segment, FALSE); queue->newseg_applied_to_src = TRUE; } - /* a new segment allows us to accept more buffers if we got UNEXPECTED + /* a new segment allows us to accept more buffers if we got EOS * from downstream */ queue->unexpected = FALSE; break; @@ -698,17 +622,17 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) break; } - g_queue_push_tail (queue->queue, item); + g_queue_push_tail (&queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */ static GstMiniObject * -gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer) +gst_queue_locked_dequeue (GstQueue * queue) { GstMiniObject *item; - item = g_queue_pop_head (queue->queue); + item = g_queue_pop_head (&queue->queue); if (item == NULL) goto no_item; @@ -726,7 +650,6 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer) if (queue->cur_level.buffers == 0) queue->cur_level.time = 0; - *is_buffer = TRUE; } else if (GST_IS_EVENT (item)) { GstEvent *event = GST_EVENT_CAST (item); @@ -749,8 +672,11 @@ gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer) default: break; } + } else if (GST_IS_QUERY (item)) { + GstQuery *query = GST_QUERY_CAST (item); - *is_buffer = FALSE; + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "retrieved query %p from queue", query); } else { g_warning ("Unexpected item %p dequeued from queue %s (refcounting problem?)", @@ -770,15 +696,11 @@ no_item: } static gboolean -gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) +gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstQueue *queue; - queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (G_UNLIKELY (queue == NULL)) { - gst_event_unref (event); - return FALSE; - } + queue = GST_QUEUE (parent); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: @@ -789,7 +711,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) /* now unblock the chain function */ GST_QUEUE_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_WRONG_STATE; + queue->srcresult = GST_FLOW_FLUSHING; /* unblock the loop and chain functions */ GST_QUEUE_SIGNAL_ADD (queue); GST_QUEUE_SIGNAL_DEL (queue); @@ -812,12 +734,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) queue->srcresult = GST_FLOW_OK; queue->eos = FALSE; queue->unexpected = FALSE; - if (gst_pad_is_linked (queue->srcpad)) { - gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, - queue->srcpad); - } else { - GST_INFO_OBJECT (queue, "not re-starting task as pad is not linked"); - } + gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, + queue->srcpad); GST_QUEUE_MUTEX_UNLOCK (queue); STATUS (queue, pad, "after flush"); @@ -839,7 +757,6 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) break; } done: - gst_object_unref (queue); return TRUE; /* ERRORS */ @@ -848,7 +765,6 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are flushing"); GST_QUEUE_MUTEX_UNLOCK (queue); - gst_object_unref (queue); gst_event_unref (event); return FALSE; } @@ -856,16 +772,52 @@ out_eos: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS"); GST_QUEUE_MUTEX_UNLOCK (queue); - gst_object_unref (queue); gst_event_unref (event); return FALSE; } } static gboolean +gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstQueue *queue = GST_QUEUE_CAST (parent); + gboolean res; + + switch (GST_QUERY_TYPE (query)) { + default: + if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) { + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GST_LOG_OBJECT (queue, "queuing query %p (%s)", query, + GST_QUERY_TYPE_NAME (query)); + g_queue_push_tail (&queue->queue, query); + GST_QUEUE_SIGNAL_ADD (queue); + while (queue->queue.length != 0) { + /* for as long as the queue has items, we know the query is + * not handled yet */ + GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); + } + res = queue->last_query; + GST_QUEUE_MUTEX_UNLOCK (queue); + } else { + res = gst_pad_query_default (pad, parent, query); + } + break; + } + return res; + + /* ERRORS */ +out_flushing: + { + GST_DEBUG_OBJECT (queue, "we are flushing"); + GST_QUEUE_MUTEX_UNLOCK (queue); + return FALSE; + } +} + +static gboolean gst_queue_is_empty (GstQueue * queue) { - if (queue->queue->length == 0) + if (queue->queue.length == 0) return TRUE; /* It is possible that a max size is reached before all min thresholds are. @@ -896,16 +848,16 @@ gst_queue_leak_downstream (GstQueue * queue) /* for as long as the queue is filled, dequeue an item and discard it */ while (gst_queue_is_filled (queue)) { GstMiniObject *leak; - gboolean is_buffer; - leak = gst_queue_locked_dequeue (queue, &is_buffer); + leak = gst_queue_locked_dequeue (queue); /* there is nothing to dequeue and the queue is still filled.. This should * not happen */ g_assert (leak != NULL); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is full, leaking item %p on downstream end", leak); - gst_mini_object_unref (leak); + if (!GST_IS_QUERY (leak)) + gst_mini_object_unref (leak); /* last buffer needs to get a DISCONT flag */ queue->head_needs_discont = TRUE; @@ -913,12 +865,12 @@ gst_queue_leak_downstream (GstQueue * queue) } static GstFlowReturn -gst_queue_chain (GstPad * pad, GstBuffer * buffer) +gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstQueue *queue; GstClockTime duration, timestamp; - queue = (GstQueue *) GST_OBJECT_PARENT (pad); + queue = GST_QUEUE_CAST (parent); /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); @@ -1033,30 +985,19 @@ out_eos: gst_buffer_unref (buffer); - return GST_FLOW_UNEXPECTED; + return GST_FLOW_EOS; } out_unexpected: { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "exit because we received UNEXPECTED"); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (buffer); - return GST_FLOW_UNEXPECTED; + return GST_FLOW_EOS; } } -static void -gst_queue_push_newsegment (GstQueue * queue) -{ - GstEvent *event; - - event = gst_event_new_segment (&queue->src_segment); - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pushing real newsegment event"); - gst_pad_push_event (queue->srcpad, event); -} - /* dequeue an item from the queue an push it downstream. This functions returns * the result of the push. */ static GstFlowReturn @@ -1064,18 +1005,14 @@ gst_queue_push_one (GstQueue * queue) { GstFlowReturn result = GST_FLOW_OK; GstMiniObject *data; - gboolean is_buffer; - data = gst_queue_locked_dequeue (queue, &is_buffer); + data = gst_queue_locked_dequeue (queue); if (data == NULL) goto no_item; next: - if (is_buffer) { + if (GST_IS_BUFFER (data)) { GstBuffer *buffer; -#if 0 - GstCaps *caps; -#endif buffer = GST_BUFFER_CAST (data); @@ -1090,55 +1027,44 @@ next: } queue->head_needs_discont = FALSE; } -#if 0 - caps = GST_BUFFER_CAPS (buffer); -#endif GST_QUEUE_MUTEX_UNLOCK (queue); -#if 0 - /* set the right caps on the pad now. We do this before pushing the buffer - * because the pad_push call will check (using acceptcaps) if the buffer can - * be set on the pad, which might fail because this will be propagated - * upstream. Also note that if the buffer has NULL caps, it means that the - * caps did not change, so we don't have to change caps on the pad. */ - if (caps && caps != GST_PAD_CAPS (queue->srcpad)) - gst_pad_set_caps (queue->srcpad, caps); -#endif - - if (queue->push_newsegment) { - gst_queue_push_newsegment (queue); - } result = gst_pad_push (queue->srcpad, buffer); /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - if (result == GST_FLOW_UNEXPECTED) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "got UNEXPECTED from downstream"); + if (result == GST_FLOW_EOS) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream"); /* stop pushing buffers, we dequeue all items until we see an item that we * can push again, which is EOS or SEGMENT. If there is nothing in the * queue we can push, we set a flag to make the sinkpad refuse more - * buffers with an UNEXPECTED return value. */ - while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) { - if (is_buffer) { + * buffers with an EOS return value. */ + while ((data = gst_queue_locked_dequeue (queue))) { + if (GST_IS_BUFFER (data)) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping UNEXPECTED buffer %p", data); + "dropping EOS buffer %p", data); gst_buffer_unref (GST_BUFFER_CAST (data)); - } else { + } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) { /* we found a pushable item in the queue, push it out */ GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "pushing pushable event %s after UNEXPECTED", + "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event)); goto next; } GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping UNEXPECTED event %p", event); + "dropping EOS event %p", event); gst_event_unref (event); + } else if (GST_IS_QUERY (data)) { + GstQuery *query = GST_QUERY_CAST (data); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping query %p because of EOS", query); + queue->last_query = FALSE; } } /* no more items in the queue. Set the unexpected flag so that upstream @@ -1148,24 +1074,27 @@ next: queue->unexpected = TRUE; result = GST_FLOW_OK; } - } else { + } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); GST_QUEUE_MUTEX_UNLOCK (queue); - if (queue->push_newsegment && type != GST_EVENT_SEGMENT) { - gst_queue_push_newsegment (queue); - } gst_pad_push_event (queue->srcpad, event); GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); - /* if we're EOS, return UNEXPECTED so that the task pauses. */ + /* if we're EOS, return EOS so that the task pauses. */ if (type == GST_EVENT_EOS) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "pushed EOS event %p, return UNEXPECTED", event); - result = GST_FLOW_UNEXPECTED; + "pushed EOS event %p, return EOS", event); + result = GST_FLOW_EOS; } + } else if (GST_IS_QUERY (data)) { + GstQuery *query = GST_QUERY_CAST (data); + + queue->last_query = gst_pad_peer_query (queue->srcpad, query); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "did query %p, return %d", query, queue->last_query); } return result; @@ -1179,7 +1108,7 @@ no_item: out_flushing: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing"); - return GST_FLOW_WRONG_STATE; + return GST_FLOW_FLUSHING; } } @@ -1217,7 +1146,6 @@ gst_queue_loop (GstPad * pad) } ret = gst_queue_push_one (queue); - queue->push_newsegment = FALSE; queue->srcresult = ret; if (ret != GST_FLOW_OK) goto out_flushing; @@ -1235,11 +1163,14 @@ out_flushing: gst_pad_pause_task (queue->srcpad); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pause task, reason: %s", gst_flow_get_name (ret)); - GST_QUEUE_SIGNAL_DEL (queue); + if (ret == GST_FLOW_FLUSHING) + gst_queue_locked_flush (queue); + else + GST_QUEUE_SIGNAL_DEL (queue); GST_QUEUE_MUTEX_UNLOCK (queue); /* let app know about us giving up if upstream is not expected to do so */ - /* UNEXPECTED is already taken care of elsewhere */ - if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) { + /* EOS is already taken care of elsewhere */ + if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { GST_ELEMENT_ERROR (queue, STREAM, FAILED, (_("Internal data flow error.")), ("streaming task paused, reason %s (%d)", @@ -1251,15 +1182,11 @@ out_flushing: } static gboolean -gst_queue_handle_src_event (GstPad * pad, GstEvent * event) +gst_queue_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { gboolean res = TRUE; - GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); + GstQueue *queue = GST_QUEUE (parent); - if (G_UNLIKELY (queue == NULL)) { - gst_event_unref (event); - return FALSE; - } #ifndef GST_DISABLE_GST_DEBUG GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", event, GST_EVENT_TYPE (event)); @@ -1267,31 +1194,18 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) res = gst_pad_push_event (queue->sinkpad, event); - gst_object_unref (queue); return res; } static gboolean -gst_queue_handle_src_query (GstPad * pad, GstQuery * query) +gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) { - GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); - GstPad *peer; + GstQueue *queue = GST_QUEUE (parent); gboolean res; - if (G_UNLIKELY (queue == NULL)) - return FALSE; - - if (!(peer = gst_pad_get_peer (queue->sinkpad))) { - gst_object_unref (queue); + res = gst_pad_query_default (pad, parent, query); + if (!res) return FALSE; - } - - res = gst_pad_query (peer, query); - gst_object_unref (peer); - if (!res) { - gst_object_unref (queue); - return FALSE; - } switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION: @@ -1347,72 +1261,77 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) break; } - gst_object_unref (queue); return TRUE; } static gboolean -gst_queue_sink_activate_push (GstPad * pad, gboolean active) +gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, + gboolean active) { - gboolean result = TRUE; + gboolean result; GstQueue *queue; - queue = GST_QUEUE (gst_pad_get_parent (pad)); + queue = GST_QUEUE (parent); - if (active) { - GST_QUEUE_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_OK; - queue->eos = FALSE; - queue->unexpected = FALSE; - GST_QUEUE_MUTEX_UNLOCK (queue); - } else { - /* step 1, unblock chain function */ - GST_QUEUE_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_WRONG_STATE; - gst_queue_locked_flush (queue); - GST_QUEUE_MUTEX_UNLOCK (queue); + switch (mode) { + case GST_PAD_MODE_PUSH: + if (active) { + GST_QUEUE_MUTEX_LOCK (queue); + queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; + GST_QUEUE_MUTEX_UNLOCK (queue); + } else { + /* step 1, unblock chain function */ + GST_QUEUE_MUTEX_LOCK (queue); + queue->srcresult = GST_FLOW_FLUSHING; + gst_queue_locked_flush (queue); + GST_QUEUE_MUTEX_UNLOCK (queue); + } + result = TRUE; + break; + default: + result = FALSE; + break; } - - gst_object_unref (queue); - return result; } static gboolean -gst_queue_src_activate_push (GstPad * pad, gboolean active) +gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, + gboolean active) { - gboolean result = FALSE; + gboolean result; GstQueue *queue; - queue = GST_QUEUE (gst_pad_get_parent (pad)); - - if (active) { - GST_QUEUE_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_OK; - queue->eos = FALSE; - queue->unexpected = FALSE; - /* we do not start the task yet if the pad is not connected */ - if (gst_pad_is_linked (pad)) - result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); - else { - GST_INFO_OBJECT (queue, "not starting task as pad is not linked"); - result = TRUE; - } - GST_QUEUE_MUTEX_UNLOCK (queue); - } else { - /* step 1, unblock loop function */ - GST_QUEUE_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_WRONG_STATE; - /* the item add signal will unblock */ - g_cond_signal (queue->item_add); - GST_QUEUE_MUTEX_UNLOCK (queue); + queue = GST_QUEUE (parent); + + switch (mode) { + case GST_PAD_MODE_PUSH: + if (active) { + GST_QUEUE_MUTEX_LOCK (queue); + queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; + result = + gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); + GST_QUEUE_MUTEX_UNLOCK (queue); + } else { + /* step 1, unblock loop function */ + GST_QUEUE_MUTEX_LOCK (queue); + queue->srcresult = GST_FLOW_FLUSHING; + /* the item add signal will unblock */ + g_cond_signal (&queue->item_add); + GST_QUEUE_MUTEX_UNLOCK (queue); - /* step 2, make sure streaming finishes */ - result = gst_pad_stop_task (pad); + /* step 2, make sure streaming finishes */ + result = gst_pad_stop_task (pad); + } + break; + default: + result = FALSE; + break; } - - gst_object_unref (queue); - return result; } |