aboutsummaryrefslogtreecommitdiff
path: root/plugins/elements/gstqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/elements/gstqueue.c')
-rw-r--r--plugins/elements/gstqueue.c51
1 files changed, 38 insertions, 13 deletions
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index aa494be..2989823 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -353,8 +353,6 @@ gst_queue_class_init (GstQueueClass * klass)
*
* Don't emit queue signals. Makes queues more lightweight if no signals are
* needed.
- *
- * Since: 0.10.31
*/
g_object_class_install_property (gobject_class, PROP_SILENT,
g_param_spec_boolean ("silent", "Silent",
@@ -419,7 +417,7 @@ gst_queue_init (GstQueue * queue)
g_cond_init (&queue->item_add);
g_cond_init (&queue->item_del);
- g_queue_init (&queue->queue);
+ gst_queue_array_init (&queue->queue, DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
queue->sinktime = GST_CLOCK_TIME_NONE;
queue->srctime = GST_CLOCK_TIME_NONE;
@@ -442,12 +440,14 @@ gst_queue_finalize (GObject * object)
GST_DEBUG_OBJECT (queue, "finalizing queue");
- while ((data = g_queue_pop_head (&queue->queue))) {
+ while (!gst_queue_array_is_empty (&queue->queue)) {
+ data = gst_queue_array_pop_head (&queue->queue);
+ /* FIXME: if it's a query, shouldn't we unref that too? */
if (!GST_IS_QUERY (data))
gst_mini_object_unref (data);
}
+ gst_queue_array_clear (&queue->queue);
- g_queue_clear (&queue->queue);
g_mutex_clear (&queue->qlock);
g_cond_clear (&queue->item_add);
g_cond_clear (&queue->item_del);
@@ -556,7 +556,8 @@ gst_queue_locked_flush (GstQueue * queue)
{
GstMiniObject *data;
- while ((data = g_queue_pop_head (&queue->queue))) {
+ while (!gst_queue_array_is_empty (&queue->queue)) {
+ data = gst_queue_array_pop_head (&queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
if (!GST_IS_QUERY (data))
@@ -588,7 +589,8 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
queue->cur_level.bytes += gst_buffer_get_size (buffer);
apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
- g_queue_push_tail (&queue->queue, item);
+ if (item)
+ gst_queue_array_push_tail (&queue->queue, item);
GST_QUEUE_SIGNAL_ADD (queue);
}
@@ -622,7 +624,8 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
break;
}
- g_queue_push_tail (&queue->queue, item);
+ if (item)
+ gst_queue_array_push_tail (&queue->queue, item);
GST_QUEUE_SIGNAL_ADD (queue);
}
@@ -632,7 +635,7 @@ gst_queue_locked_dequeue (GstQueue * queue)
{
GstMiniObject *item;
- item = g_queue_pop_head (&queue->queue);
+ item = gst_queue_array_pop_head (&queue->queue);
if (item == NULL)
goto no_item;
@@ -735,7 +738,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
queue->eos = FALSE;
queue->unexpected = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
- queue->srcpad);
+ queue->srcpad, NULL);
GST_QUEUE_MUTEX_UNLOCK (queue);
STATUS (queue, pad, "after flush");
@@ -789,7 +792,7 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
GST_QUERY_TYPE_NAME (query));
- g_queue_push_tail (&queue->queue, query);
+ gst_queue_array_push_tail (&queue->queue, query);
GST_QUEUE_SIGNAL_ADD (queue);
while (queue->queue.length != 0) {
/* for as long as the queue has items, we know the query is
@@ -817,9 +820,19 @@ out_flushing:
static gboolean
gst_queue_is_empty (GstQueue * queue)
{
+ GstMiniObject *head;
+
if (queue->queue.length == 0)
return TRUE;
+ /* Only consider the queue empty if the minimum thresholds
+ * are not reached and data is at the queue head. Otherwise
+ * we would block forever on serialized queries.
+ */
+ head = queue->queue.array[queue->queue.head];
+ if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head))
+ return FALSE;
+
/* It is possible that a max size is reached before all min thresholds are.
* Therefore, only consider it empty if it is not filled. */
return ((queue->min_threshold.buffers > 0 &&
@@ -1203,10 +1216,21 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
GstQueue *queue = GST_QUEUE (parent);
gboolean res;
- res = gst_pad_query_default (pad, parent, query);
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_SCHEDULING:{
+ gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
+ res = TRUE;
+ break;
+ }
+ default:
+ res = gst_pad_query_default (pad, parent, query);
+ break;
+ }
+
if (!res)
return FALSE;
+ /* Adjust peer response for data contained in queue */
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:
{
@@ -1314,7 +1338,8 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
queue->eos = FALSE;
queue->unexpected = FALSE;
result =
- gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
+ gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
+ NULL);
GST_QUEUE_MUTEX_UNLOCK (queue);
} else {
/* step 1, unblock loop function */