diff options
Diffstat (limited to 'plugins/elements/gstqueue.c')
-rw-r--r-- | plugins/elements/gstqueue.c | 51 |
1 files changed, 38 insertions, 13 deletions
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index aa494be..2989823 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -353,8 +353,6 @@ gst_queue_class_init (GstQueueClass * klass) * * Don't emit queue signals. Makes queues more lightweight if no signals are * needed. - * - * Since: 0.10.31 */ g_object_class_install_property (gobject_class, PROP_SILENT, g_param_spec_boolean ("silent", "Silent", @@ -419,7 +417,7 @@ gst_queue_init (GstQueue * queue) g_cond_init (&queue->item_add); g_cond_init (&queue->item_del); - g_queue_init (&queue->queue); + gst_queue_array_init (&queue->queue, DEFAULT_MAX_SIZE_BUFFERS * 3 / 2); queue->sinktime = GST_CLOCK_TIME_NONE; queue->srctime = GST_CLOCK_TIME_NONE; @@ -442,12 +440,14 @@ gst_queue_finalize (GObject * object) GST_DEBUG_OBJECT (queue, "finalizing queue"); - while ((data = g_queue_pop_head (&queue->queue))) { + while (!gst_queue_array_is_empty (&queue->queue)) { + data = gst_queue_array_pop_head (&queue->queue); + /* FIXME: if it's a query, shouldn't we unref that too? */ if (!GST_IS_QUERY (data)) gst_mini_object_unref (data); } + gst_queue_array_clear (&queue->queue); - g_queue_clear (&queue->queue); g_mutex_clear (&queue->qlock); g_cond_clear (&queue->item_add); g_cond_clear (&queue->item_del); @@ -556,7 +556,8 @@ gst_queue_locked_flush (GstQueue * queue) { GstMiniObject *data; - while ((data = g_queue_pop_head (&queue->queue))) { + while (!gst_queue_array_is_empty (&queue->queue)) { + data = gst_queue_array_pop_head (&queue->queue); /* Then lose another reference because we are supposed to destroy that data when flushing */ if (!GST_IS_QUERY (data)) @@ -588,7 +589,8 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) queue->cur_level.bytes += gst_buffer_get_size (buffer); apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE); - g_queue_push_tail (&queue->queue, item); + if (item) + gst_queue_array_push_tail (&queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -622,7 +624,8 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) break; } - g_queue_push_tail (&queue->queue, item); + if (item) + gst_queue_array_push_tail (&queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -632,7 +635,7 @@ gst_queue_locked_dequeue (GstQueue * queue) { GstMiniObject *item; - item = g_queue_pop_head (&queue->queue); + item = gst_queue_array_pop_head (&queue->queue); if (item == NULL) goto no_item; @@ -735,7 +738,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) queue->eos = FALSE; queue->unexpected = FALSE; gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, - queue->srcpad); + queue->srcpad, NULL); GST_QUEUE_MUTEX_UNLOCK (queue); STATUS (queue, pad, "after flush"); @@ -789,7 +792,7 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_LOG_OBJECT (queue, "queuing query %p (%s)", query, GST_QUERY_TYPE_NAME (query)); - g_queue_push_tail (&queue->queue, query); + gst_queue_array_push_tail (&queue->queue, query); GST_QUEUE_SIGNAL_ADD (queue); while (queue->queue.length != 0) { /* for as long as the queue has items, we know the query is @@ -817,9 +820,19 @@ out_flushing: static gboolean gst_queue_is_empty (GstQueue * queue) { + GstMiniObject *head; + if (queue->queue.length == 0) return TRUE; + /* Only consider the queue empty if the minimum thresholds + * are not reached and data is at the queue head. Otherwise + * we would block forever on serialized queries. + */ + head = queue->queue.array[queue->queue.head]; + if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head)) + return FALSE; + /* It is possible that a max size is reached before all min thresholds are. * Therefore, only consider it empty if it is not filled. */ return ((queue->min_threshold.buffers > 0 && @@ -1203,10 +1216,21 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query) GstQueue *queue = GST_QUEUE (parent); gboolean res; - res = gst_pad_query_default (pad, parent, query); + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_SCHEDULING:{ + gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH); + res = TRUE; + break; + } + default: + res = gst_pad_query_default (pad, parent, query); + break; + } + if (!res) return FALSE; + /* Adjust peer response for data contained in queue */ switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION: { @@ -1314,7 +1338,8 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, queue->eos = FALSE; queue->unexpected = FALSE; result = - gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); + gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad, + NULL); GST_QUEUE_MUTEX_UNLOCK (queue); } else { /* step 1, unblock loop function */ |