aboutsummaryrefslogtreecommitdiff
path: root/plugins/elements/gstmultiqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/elements/gstmultiqueue.c')
-rw-r--r--plugins/elements/gstmultiqueue.c178
1 files changed, 128 insertions, 50 deletions
diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c
index 6b89f83..9e9209c 100644
--- a/plugins/elements/gstmultiqueue.c
+++ b/plugins/elements/gstmultiqueue.c
@@ -18,8 +18,8 @@
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
@@ -185,6 +185,8 @@ struct _GstMultiQueueItem
GDestroyNotify destroy;
guint32 posid;
+
+ gboolean is_query;
};
static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
@@ -196,6 +198,8 @@ static void compute_high_time (GstMultiQueue * mq);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
+static void gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full);
+
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
@@ -728,13 +732,11 @@ gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
}
return result;
-
-
-
}
static gboolean
-gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
+gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush,
+ gboolean full)
{
gboolean result;
@@ -761,7 +763,7 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->sink_tainted = sq->src_tainted = TRUE;
} else {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- gst_data_queue_flush (sq->queue);
+ gst_single_queue_flush_queue (sq, full);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */
@@ -1113,7 +1115,7 @@ gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
static void
gst_multi_queue_item_destroy (GstMultiQueueItem * item)
{
- if (item->object)
+ if (!item->is_query && item->object)
gst_mini_object_unref (item->object);
g_slice_free (GstMultiQueueItem, item);
}
@@ -1128,6 +1130,7 @@ gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
item->object = object;
item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
item->posid = curid;
+ item->is_query = GST_IS_QUERY (object);
item->size = gst_buffer_get_size (GST_BUFFER_CAST (object));
item->duration = GST_BUFFER_DURATION (object);
@@ -1146,6 +1149,7 @@ gst_multi_queue_mo_item_new (GstMiniObject * object, guint32 curid)
item->object = object;
item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
item->posid = curid;
+ item->is_query = GST_IS_QUERY (object);
item->size = 0;
item->duration = 0;
@@ -1362,6 +1366,8 @@ out_flushing:
compute_high_time (mq);
compute_high_id (mq);
wake_up_next_non_linked (mq);
+ sq->last_query = FALSE;
+ g_cond_signal (&sq->query_handled);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* upstream needs to see fatal result ASAP to shut things down,
@@ -1369,7 +1375,7 @@ out_flushing:
* so empty this one and trigger dynamic queue growth. At
* this point the srcresult is not OK, NOT_LINKED
* or EOS, i.e. a real failure */
- gst_data_queue_flush (sq->queue);
+ gst_single_queue_flush_queue (sq, FALSE);
single_queue_underrun_cb (sq->queue, sq);
gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad);
@@ -1462,8 +1468,12 @@ gst_multi_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
/* All pads start off linked until they push one buffer */
sq->srcresult = GST_FLOW_OK;
sq->pushed = FALSE;
+ gst_data_queue_set_flushing (sq->queue, FALSE);
} else {
sq->srcresult = GST_FLOW_FLUSHING;
+ sq->last_query = FALSE;
+ g_cond_signal (&sq->query_handled);
+ gst_data_queue_set_flushing (sq->queue, TRUE);
gst_data_queue_flush (sq->queue);
}
res = TRUE;
@@ -1504,7 +1514,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
res = gst_pad_push_event (sq->srcpad, event);
- gst_single_queue_flush (mq, sq, TRUE);
+ gst_single_queue_flush (mq, sq, TRUE, FALSE);
goto done;
case GST_EVENT_FLUSH_STOP:
@@ -1513,7 +1523,7 @@ gst_multi_queue_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
res = gst_pad_push_event (sq->srcpad, event);
- gst_single_queue_flush (mq, sq, FALSE);
+ gst_single_queue_flush (mq, sq, FALSE, FALSE);
goto done;
case GST_EVENT_SEGMENT:
/* take ref because the queue will take ownership and we need the event
@@ -1600,6 +1610,10 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
guint32 curid;
GstMultiQueueItem *item;
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ if (sq->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
/* Get an unique incrementing id. */
curid = g_atomic_int_add ((gint *) & mq->counter, 1);
@@ -1608,8 +1622,6 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Enqueuing query %p of type %s with id %d",
sq->id, query, GST_QUERY_TYPE_NAME (query), curid);
-
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item);
g_cond_wait (&sq->query_handled, &mq->qlock);
res = sq->last_query;
@@ -1621,6 +1633,13 @@ gst_multi_queue_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
break;
}
return res;
+
+out_flushing:
+ {
+ GST_DEBUG_OBJECT (mq, "Flushing");
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ return FALSE;
+ }
}
static gboolean
@@ -1639,9 +1658,9 @@ gst_multi_queue_src_activate_mode (GstPad * pad, GstObject * parent,
switch (mode) {
case GST_PAD_MODE_PUSH:
if (active) {
- result = gst_single_queue_flush (mq, sq, FALSE);
+ result = gst_single_queue_flush (mq, sq, FALSE, TRUE);
} else {
- result = gst_single_queue_flush (mq, sq, TRUE);
+ result = gst_single_queue_flush (mq, sq, TRUE, TRUE);
/* make sure streaming finishes */
result |= gst_pad_stop_task (pad);
}
@@ -1657,8 +1676,24 @@ static gboolean
gst_multi_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstSingleQueue *sq = gst_pad_get_element_private (pad);
+ GstMultiQueue *mq = sq->mqueue;
+ gboolean ret;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_RECONFIGURE:
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ if (sq->srcresult == GST_FLOW_NOT_LINKED)
+ sq->srcresult = GST_FLOW_OK;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
- return gst_pad_push_event (sq->sinkpad, event);
+ ret = gst_pad_push_event (sq->sinkpad, event);
+ break;
+ default:
+ ret = gst_pad_push_event (sq->sinkpad, event);
+ break;
+ }
+
+ return ret;
}
static gboolean
@@ -1804,16 +1839,36 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GstMultiQueue *mq = sq->mqueue;
GList *tmp;
GstDataQueueSize size;
- gboolean filled = FALSE;
+ gboolean filled = TRUE;
gst_data_queue_get_level (sq->queue, &size);
- GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
+ GST_LOG_OBJECT (mq,
+ "Single Queue %d: EOS %d, visible %u/%u, bytes %u/%u, time %"
+ G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, sq->id, sq->is_eos, size.visible,
+ sq->max_size.visible, size.bytes, sq->max_size.bytes, sq->cur_time,
+ sq->max_size.time);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+
+ /* check if we reached the hard time/bytes limits */
+ if (sq->is_eos || IS_FILLED (sq, bytes, size.bytes) ||
+ IS_FILLED (sq, time, sq->cur_time)) {
+ goto done;
+ }
+
+ /* if hard limits are not reached then we allow one more buffer in the full
+ * queue, but only if any of the other singelqueues are empty */
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
- GstDataQueueSize ssize;
+
+ if (oq == sq)
+ continue;
+
+ if (oq->srcresult == GST_FLOW_NOT_LINKED) {
+ GST_LOG_OBJECT (mq, "Queue %d is not-linked", oq->id);
+ continue;
+ }
GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
@@ -1822,43 +1877,22 @@ single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
if (IS_FILLED (sq, visible, size.visible)) {
sq->max_size.visible = size.visible + 1;
GST_DEBUG_OBJECT (mq,
- "Another queue is empty, bumping single queue %d max visible to %d",
- sq->id, sq->max_size.visible);
+ "Queue %d is empty, bumping single queue %d max visible to %d",
+ oq->id, sq->id, sq->max_size.visible);
+ filled = FALSE;
+ break;
}
}
- /* check if we reached the hard time/bytes limits */
- gst_data_queue_get_level (oq->queue, &ssize);
-
- GST_DEBUG_OBJECT (mq,
- "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
- G_GUINT64_FORMAT, oq->id, ssize.visible, oq->max_size.visible,
- ssize.bytes, oq->max_size.bytes, oq->cur_time, oq->max_size.time);
-
- /* if this queue is filled completely we must signal overrun.
- * FIXME, this seems wrong in many ways
- * - we're comparing the filled level of this queue against the
- * values of the other one
- * - we should only do this after we found no empty queues, ie, move
- * this check outside of the loop
- * - the debug statement talks about a different queue than the one
- * we are checking here.
- */
- if (sq->is_eos || IS_FILLED (sq, bytes, ssize.bytes) ||
- IS_FILLED (sq, time, sq->cur_time)) {
- GST_LOG_OBJECT (mq, "Queue %d is filled EOS %d", sq->id, sq->is_eos);
- filled = TRUE;
- }
}
- /* no queues were empty */
+
+done:
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Overrun is always forwarded, since this is blocking the upstream element */
if (filled) {
- GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
+ GST_DEBUG_OBJECT (mq, "Queue %d is filled, signalling overrun", sq->id);
g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
}
-
- return;
}
static void
@@ -1868,8 +1902,13 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GstMultiQueue *mq = sq->mqueue;
GList *tmp;
- GST_LOG_OBJECT (mq,
- "Single Queue %d is empty, Checking other single queues", sq->id);
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ GST_LOG_OBJECT (mq, "Single Queue %d is empty but not-linked", sq->id);
+ return;
+ } else {
+ GST_LOG_OBJECT (mq,
+ "Single Queue %d is empty, Checking other single queues", sq->id);
+ }
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
@@ -1925,6 +1964,45 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
}
static void
+gst_single_queue_flush_queue (GstSingleQueue * sq, gboolean full)
+{
+ GstDataQueueItem *sitem;
+ GstMultiQueueItem *mitem;
+ gboolean was_flushing = FALSE;
+
+ while (!gst_data_queue_is_empty (sq->queue)) {
+ GstMiniObject *data;
+
+ /* FIXME: If this fails here although the queue is not empty,
+ * we're flushing... but we want to rescue all sticky
+ * events nonetheless.
+ */
+ if (!gst_data_queue_pop (sq->queue, &sitem)) {
+ was_flushing = TRUE;
+ gst_data_queue_set_flushing (sq->queue, FALSE);
+ continue;
+ }
+
+ mitem = (GstMultiQueueItem *) sitem;
+
+ data = sitem->object;
+
+ if (!full && !mitem->is_query && GST_IS_EVENT (data)
+ && GST_EVENT_IS_STICKY (data)
+ && GST_EVENT_TYPE (data) != GST_EVENT_SEGMENT
+ && GST_EVENT_TYPE (data) != GST_EVENT_EOS) {
+ gst_pad_store_sticky_event (sq->srcpad, GST_EVENT_CAST (data));
+ }
+
+ sitem->destroy (sitem);
+ }
+
+ gst_data_queue_flush (sq->queue);
+ if (was_flushing)
+ gst_data_queue_set_flushing (sq->queue, TRUE);
+}
+
+static void
gst_single_queue_free (GstSingleQueue * sq)
{
/* DRAIN QUEUE */
@@ -1983,7 +2061,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id)
sq->mqueue = mqueue;
sq->srcresult = GST_FLOW_FLUSHING;
sq->pushed = FALSE;
- sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
+ sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
single_queue_check_full,
(GstDataQueueFullCallback) single_queue_overrun_cb,
(GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);