diff options
Diffstat (limited to 'plugins/elements/gstmultiqueue.c')
-rw-r--r-- | plugins/elements/gstmultiqueue.c | 178 |
1 files changed, 128 insertions, 50 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 6b89f83..9e9209c 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -18,8 +18,8 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** @@ -185,6 +185,8 @@ struct _GstMultiQueueItem GDestroyNotify destroy; guint32 posid; + + gboolean is_query; }; static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id); @@ -196,6 +198,8 @@ static void compute_high_time (GstMultiQueue * mq); static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq); +static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full); + static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST, @@ -728,13 +732,11 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition) } return result; - - - } static gboolean -gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) +gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush, + gboolean full) { gboolean result; @@ -761,7 +763,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) sq->sink_tainted = sq->src_tainted = TRUE; } else { GST_MULTI_QUEUE_MUTEX_LOCK (mq); - gst_data_queue_flush (sq->queue); + gst_single_queue_flush_queue (sq, full); gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); /* All pads start off not-linked for a smooth kick-off */ @@ -1113,7 +1115,7 @@ gst_multi_queue_item_steal_object (GstMultiQueueItem * item) static void gst_multi_queue_item_destroy (GstMultiQueueItem * item) { - if (item->object) + if (!item->is_query && item->object) gst_mini_object_unref (item->object); g_slice_free (GstMultiQueueItem, item); } @@ -1128,6 +1130,7 @@ gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid) item->object = object; item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; item->posid = curid; + item->is_query = GST_IS_QUERY (object); item->size = gst_buffer_get_size (GST_BUFFER_CAST (object)); item->duration = GST_BUFFER_DURATION (object); @@ -1146,6 +1149,7 @@ gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid) item->object = object; item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; item->posid = curid; + item->is_query = GST_IS_QUERY (object); item->size = 0; item->duration = 0; @@ -1362,6 +1366,8 @@ out_flushing: compute_high_time (mq); compute_high_id (mq); wake_up_next_non_linked (mq); + sq->last_query = FALSE; + g_cond_signal (&sq->query_handled); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* upstream needs to see fatal result ASAP to shut things down, @@ -1369,7 +1375,7 @@ out_flushing: * so empty this one and trigger dynamic queue growth. At * this point the srcresult is not OK, NOT_LINKED * or EOS, i.e. a real failure */ - gst_data_queue_flush (sq->queue); + gst_single_queue_flush_queue (sq, FALSE); single_queue_underrun_cb (sq->queue, sq); gst_data_queue_set_flushing (sq->queue, TRUE); gst_pad_pause_task (sq->srcpad); @@ -1462,8 +1468,12 @@ gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent, /* All pads start off linked until they push one buffer */ sq->srcresult = GST_FLOW_OK; sq->pushed = FALSE; + gst_data_queue_set_flushing (sq->queue, FALSE); } else { sq->srcresult = GST_FLOW_FLUSHING; + sq->last_query = FALSE; + g_cond_signal (&sq->query_handled); + gst_data_queue_set_flushing (sq->queue, TRUE); gst_data_queue_flush (sq->queue); } res = TRUE; @@ -1504,7 +1514,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) res = gst_pad_push_event (sq->srcpad, event); - gst_single_queue_flush (mq, sq, TRUE); + gst_single_queue_flush (mq, sq, TRUE, FALSE); goto done; case GST_EVENT_FLUSH_STOP: @@ -1513,7 +1523,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) res = gst_pad_push_event (sq->srcpad, event); - gst_single_queue_flush (mq, sq, FALSE); + gst_single_queue_flush (mq, sq, FALSE, FALSE); goto done; case GST_EVENT_SEGMENT: /* take ref because the queue will take ownership and we need the event @@ -1600,6 +1610,10 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) guint32 curid; GstMultiQueueItem *item; + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + if (sq->srcresult != GST_FLOW_OK) + goto out_flushing; + /* Get an unique incrementing id. */ curid = g_atomic_int_add ((gint *) & mq->counter, 1); @@ -1608,8 +1622,6 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_DEBUG_OBJECT (mq, "SingleQueue %d : Enqueuing query %p of type %s with id %d", sq->id, query, GST_QUERY_TYPE_NAME (query), curid); - - GST_MULTI_QUEUE_MUTEX_LOCK (mq); res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item); g_cond_wait (&sq->query_handled, &mq->qlock); res = sq->last_query; @@ -1621,6 +1633,13 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) break; } return res; + +out_flushing: + { + GST_DEBUG_OBJECT (mq, "Flushing"); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + return FALSE; + } } static gboolean @@ -1639,9 +1658,9 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent, switch (mode) { case GST_PAD_MODE_PUSH: if (active) { - result = gst_single_queue_flush (mq, sq, FALSE); + result = gst_single_queue_flush (mq, sq, FALSE, TRUE); } else { - result = gst_single_queue_flush (mq, sq, TRUE); + result = gst_single_queue_flush (mq, sq, TRUE, TRUE); /* make sure streaming finishes */ result |= gst_pad_stop_task (pad); } @@ -1657,8 +1676,24 @@ static gboolean gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstSingleQueue *sq = gst_pad_get_element_private (pad); + GstMultiQueue *mq = sq->mqueue; + gboolean ret; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_RECONFIGURE: + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + if (sq->srcresult == GST_FLOW_NOT_LINKED) + sq->srcresult = GST_FLOW_OK; + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - return gst_pad_push_event (sq->sinkpad, event); + ret = gst_pad_push_event (sq->sinkpad, event); + break; + default: + ret = gst_pad_push_event (sq->sinkpad, event); + break; + } + + return ret; } static gboolean @@ -1804,16 +1839,36 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GstMultiQueue *mq = sq->mqueue; GList *tmp; GstDataQueueSize size; - gboolean filled = FALSE; + gboolean filled = TRUE; gst_data_queue_get_level (sq->queue, &size); - GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id); + GST_LOG_OBJECT (mq, + "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %" + G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible, + sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time, + sq->max_size.time); GST_MULTI_QUEUE_MUTEX_LOCK (mq); + + /* check if we reached the hard time/bytes limits */ + if (sq->is_eos || IS_FILLED (sq, bytes, size.bytes) || + IS_FILLED (sq, time, sq->cur_time)) { + goto done; + } + + /* if hard limits are not reached then we allow one more buffer in the full + * queue, but only if any of the other singelqueues are empty */ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { GstSingleQueue *oq = (GstSingleQueue *) tmp->data; - GstDataQueueSize ssize; + + if (oq == sq) + continue; + + if (oq->srcresult == GST_FLOW_NOT_LINKED) { + GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id); + continue; + } GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id); @@ -1822,43 +1877,22 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) if (IS_FILLED (sq, visible, size.visible)) { sq->max_size.visible = size.visible + 1; GST_DEBUG_OBJECT (mq, - "Another queue is empty, bumping single queue %d max visible to %d", - sq->id, sq->max_size.visible); + "Queue %d is empty, bumping single queue %d max visible to %d", + oq->id, sq->id, sq->max_size.visible); + filled = FALSE; + break; } } - /* check if we reached the hard time/bytes limits */ - gst_data_queue_get_level (oq->queue, &ssize); - - GST_DEBUG_OBJECT (mq, - "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" - G_GUINT64_FORMAT, oq->id, ssize.visible, oq->max_size.visible, - ssize.bytes, oq->max_size.bytes, oq->cur_time, oq->max_size.time); - - /* if this queue is filled completely we must signal overrun. - * FIXME, this seems wrong in many ways - * - we're comparing the filled level of this queue against the - * values of the other one - * - we should only do this after we found no empty queues, ie, move - * this check outside of the loop - * - the debug statement talks about a different queue than the one - * we are checking here. - */ - if (sq->is_eos || IS_FILLED (sq, bytes, ssize.bytes) || - IS_FILLED (sq, time, sq->cur_time)) { - GST_LOG_OBJECT (mq, "Queue %d is filled EOS %d", sq->id, sq->is_eos); - filled = TRUE; - } } - /* no queues were empty */ + +done: GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* Overrun is always forwarded, since this is blocking the upstream element */ if (filled) { - GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun"); + GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id); g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0); } - - return; } static void @@ -1868,8 +1902,13 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GstMultiQueue *mq = sq->mqueue; GList *tmp; - GST_LOG_OBJECT (mq, - "Single Queue %d is empty, Checking other single queues", sq->id); + if (sq->srcresult == GST_FLOW_NOT_LINKED) { + GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id); + return; + } else { + GST_LOG_OBJECT (mq, + "Single Queue %d is empty, Checking other single queues", sq->id); + } GST_MULTI_QUEUE_MUTEX_LOCK (mq); for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { @@ -1925,6 +1964,45 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, } static void +gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full) +{ + GstDataQueueItem *sitem; + GstMultiQueueItem *mitem; + gboolean was_flushing = FALSE; + + while (!gst_data_queue_is_empty (sq->queue)) { + GstMiniObject *data; + + /* FIXME: If this fails here although the queue is not empty, + * we're flushing... but we want to rescue all sticky + * events nonetheless. + */ + if (!gst_data_queue_pop (sq->queue, &sitem)) { + was_flushing = TRUE; + gst_data_queue_set_flushing (sq->queue, FALSE); + continue; + } + + mitem = (GstMultiQueueItem *) sitem; + + data = sitem->object; + + if (!full && !mitem->is_query && GST_IS_EVENT (data) + && GST_EVENT_IS_STICKY (data) + && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT + && GST_EVENT_TYPE (data) != GST_EVENT_EOS) { + gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data)); + } + + sitem->destroy (sitem); + } + + gst_data_queue_flush (sq->queue); + if (was_flushing) + gst_data_queue_set_flushing (sq->queue, TRUE); +} + +static void gst_single_queue_free (GstSingleQueue * sq) { /* DRAIN QUEUE */ @@ -1983,7 +2061,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->mqueue = mqueue; sq->srcresult = GST_FLOW_FLUSHING; sq->pushed = FALSE; - sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction) + sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) single_queue_check_full, (GstDataQueueFullCallback) single_queue_overrun_cb, (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq); |