aboutsummaryrefslogtreecommitdiff
path: root/libs/gst/base/gstbaseparse.c
diff options
context:
space:
mode:
Diffstat (limited to 'libs/gst/base/gstbaseparse.c')
-rw-r--r--libs/gst/base/gstbaseparse.c1704
1 files changed, 948 insertions, 756 deletions
diff --git a/libs/gst/base/gstbaseparse.c b/libs/gst/base/gstbaseparse.c
index ed4c890..8067e53 100644
--- a/libs/gst/base/gstbaseparse.c
+++ b/libs/gst/base/gstbaseparse.c
@@ -2,6 +2,8 @@
* Copyright (C) 2008 Nokia Corporation. All rights reserved.
* Contact: Stefan Kost <stefan.kost@nokia.com>
* Copyright (C) 2008 Sebastian Dröge <sebastian.droege@collabora.co.uk>.
+ * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
+ * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@@ -48,14 +50,16 @@
* <listitem>
* <itemizedlist><title>Set-up phase</title>
* <listitem><para>
- * GstBaseParse class calls @set_sink_caps to inform the subclass about
- * incoming sinkpad caps. Subclass should set the srcpad caps accordingly.
- * </para></listitem>
- * <listitem><para>
* GstBaseParse calls @start to inform subclass that data processing is
* about to start now.
* </para></listitem>
* <listitem><para>
+ * GstBaseParse class calls @set_sink_caps to inform the subclass about
+ * incoming sinkpad caps. Subclass could already set the srcpad caps
+ * accordingly, but this might be delayed until calling
+ * gst_base_parse_finish_frame() with a non-queued frame.
+ * </para></listitem>
+ * <listitem><para>
* At least at this point subclass needs to tell the GstBaseParse class
* how big data chunks it wants to receive (min_frame_size). It can do
* this with gst_base_parse_set_min_frame_size().
@@ -76,35 +80,40 @@
* </para></listitem>
* <listitem><para>
* A buffer of (at least) min_frame_size bytes is passed to subclass with
- * @check_valid_frame. Subclass checks the contents and returns TRUE
- * if the buffer contains a valid frame. It also needs to set the
- * @framesize according to the detected frame size. If buffer didn't
- * contain a valid frame, this call must return FALSE and optionally
- * set the @skipsize value to inform base class that how many bytes
- * it needs to skip in order to find a valid frame. @framesize can always
- * indicate a new minimum for current frame parsing. Indicating G_MAXUINT
- * for requested amount means subclass simply needs best available
- * subsequent data. In push mode this amounts to an additional input buffer
- * (thus minimal additional latency), in pull mode this amounts to some
- * arbitrary reasonable buffer size increase. The passed buffer
- * is read-only. Note that @check_valid_frame might receive any small
- * amount of input data when leftover data is being drained (e.g. at EOS).
- * </para></listitem>
- * <listitem><para>
- * After valid frame is found, it will be passed again to subclass with
- * @parse_frame call. Now subclass is responsible for parsing the
- * frame contents and setting the caps, and buffer metadata (e.g.
- * buffer timestamp and duration, or keyframe if applicable).
+ * @handle_frame. Subclass checks the contents and can optionally
+ * return GST_FLOW_OK along with an amount of data to be skipped to find
+ * a valid frame (which will result in a subsequent DISCONT).
+ * If, otherwise, the buffer does not hold a complete frame,
+ * @handle_frame can merely return and will be called again when additional
+ * data is available. In push mode this amounts to an
+ * additional input buffer (thus minimal additional latency), in pull mode
+ * this amounts to some arbitrary reasonable buffer size increase.
+ * Of course, gst_base_parse_set_min_size() could also be used if a very
+ * specific known amount of additional data is required.
+ * If, however, the buffer holds a complete valid frame, it can pass
+ * the size of this frame to gst_base_parse_finish_frame().
+ * If acting as a converter, it can also merely indicate consumed input data
+ * while simultaneously providing custom output data.
+ * Note that baseclass performs some processing (such as tracking
+ * overall consumed data rate versus duration) for each finished frame,
+ * but other state is only updated upon each call to @handle_frame
+ * (such as tracking upstream input timestamp).
+ * </para><para>
+ * Subclass is also responsible for setting the buffer metadata
+ * (e.g. buffer timestamp and duration, or keyframe if applicable).
* (although the latter can also be done by GstBaseParse if it is
* appropriately configured, see below). Frame is provided with
* timestamp derived from upstream (as much as generally possible),
* duration obtained from configuration (see below), and offset
* if meaningful (in pull mode).
+ * </para><para>
+ * Note that @check_valid_frame might receive any small
+ * amount of input data when leftover data is being drained (e.g. at EOS).
* </para></listitem>
* <listitem><para>
- * Finally the buffer can be pushed downstream and the parsing loop starts
- * over again. Just prior to actually pushing the buffer in question,
- * it is passed to @pre_push_buffer which gives subclass yet one
+ * As part of finish frame processing,
+ * just prior to actually pushing the buffer in question,
+ * it is passed to @pre_push_frame which gives subclass yet one
* last chance to examine buffer metadata, or to send some custom (tag)
* events, or to perform custom (segment) filtering.
* </para></listitem>
@@ -152,12 +161,9 @@
* done with gst_base_parse_set_min_frame_size() function.
* </para></listitem>
* <listitem><para>
- * Examine data chunks passed to subclass with @check_valid_frame
- * and tell if they contain a valid frame
- * </para></listitem>
- * <listitem><para>
- * Set the caps and timestamp to frame that is passed to subclass with
- * @parse_frame function.
+ * Examine data chunks passed to subclass with @handle_frame and pass
+ * proper frame(s) to gst_base_parse_finish_frame(), and setting src pad
+ * caps and timestamps on frame.
* </para></listitem>
* <listitem><para>Provide conversion functions</para></listitem>
* <listitem><para>
@@ -203,6 +209,11 @@
#include "gstbaseparse.h"
+/* FIXME: get rid of old GstIndex code */
+#include "gstindex.h"
+#include "gstindex.c"
+#include "gstmemindex.c"
+
#define GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC (1 << 0)
#define MIN_FRAMES_TO_POST_BITRATE 10
@@ -224,13 +235,14 @@ static const GstFormat fmtlist[] = {
struct _GstBaseParsePrivate
{
- GstActivateMode pad_mode;
+ GstPadMode pad_mode;
GstAdapter *adapter;
gint64 duration;
GstFormat duration_fmt;
gint64 estimated_duration;
+ gint64 estimated_drift;
guint min_frame_size;
gboolean passthrough;
@@ -254,6 +266,7 @@ struct _GstBaseParsePrivate
GstClockTime frame_duration;
gboolean seen_keyframe;
gboolean is_video;
+ gint flushed;
guint64 framecount;
guint64 bytecount;
@@ -270,8 +283,6 @@ struct _GstBaseParsePrivate
guint max_bitrate;
guint posted_avg_bitrate;
- GList *pending_events;
-
/* frames/buffers that are queued and ready to go on OK */
GQueue queued_frames;
@@ -281,7 +292,7 @@ struct _GstBaseParsePrivate
GstIndex *index;
gint index_id;
gboolean own_index;
- GStaticMutex index_lock;
+ GMutex index_lock;
/* seek table entries only maintained if upstream is BYTE seekable */
gboolean upstream_seekable;
@@ -301,19 +312,31 @@ struct _GstBaseParsePrivate
/* reverse playback */
GSList *buffers_pending;
+ GSList *buffers_head;
GSList *buffers_queued;
GSList *buffers_send;
GstClockTime last_ts;
gint64 last_offset;
+ /* Pending serialized events */
+ GList *pending_events;
/* Newsegment event to be sent after SEEK */
- GstEvent *pending_segment;
-
- /* Segment event that closes the running segment prior to SEEK */
- GstEvent *close_segment;
-
- /* push mode helper frame */
- GstBaseParseFrame frame;
+ gboolean pending_segment;
+
+ /* offset of last parsed frame/data */
+ gint64 prev_offset;
+ /* force a new frame, regardless of offset */
+ gboolean new_frame;
+ /* whether we are merely scanning for a frame */
+ gboolean scanning;
+ /* ... and resulting frame, if any */
+ GstBaseParseFrame *scanned_frame;
+
+ /* TRUE if we're still detecting the format, i.e.
+ * if ::detect() is still called for future buffers */
+ gboolean detecting;
+ GList *detect_buffers;
+ guint detect_buffers_size;
};
typedef struct _GstBaseParseSeek
@@ -324,6 +347,11 @@ typedef struct _GstBaseParseSeek
GstClockTime start_ts;
} GstBaseParseSeek;
+#define GST_BASE_PARSE_INDEX_LOCK(parse) \
+ g_mutex_lock (&parse->priv->index_lock);
+#define GST_BASE_PARSE_INDEX_UNLOCK(parse) \
+ g_mutex_unlock (&parse->priv->index_lock);
+
static GstElementClass *parent_class = NULL;
static void gst_base_parse_class_init (GstBaseParseClass * klass);
@@ -362,28 +390,33 @@ static GstStateChangeReturn gst_base_parse_change_state (GstElement * element,
GstStateChange transition);
static void gst_base_parse_reset (GstBaseParse * parse);
+#if 0
static void gst_base_parse_set_index (GstElement * element, GstIndex * index);
static GstIndex *gst_base_parse_get_index (GstElement * element);
+#endif
-static gboolean gst_base_parse_sink_activate (GstPad * sinkpad);
-static gboolean gst_base_parse_sink_activate_push (GstPad * pad,
- gboolean active);
-static gboolean gst_base_parse_sink_activate_pull (GstPad * pad,
- gboolean active);
+static gboolean gst_base_parse_sink_activate (GstPad * sinkpad,
+ GstObject * parent);
+static gboolean gst_base_parse_sink_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
static gboolean gst_base_parse_handle_seek (GstBaseParse * parse,
GstEvent * event);
static void gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event);
-static gboolean gst_base_parse_src_event (GstPad * pad, GstEvent * event);
-static gboolean gst_base_parse_sink_event (GstPad * pad, GstEvent * event);
-static gboolean gst_base_parse_query (GstPad * pad, GstQuery * query);
-static const GstQueryType *gst_base_parse_get_querytypes (GstPad * pad);
+static gboolean gst_base_parse_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_base_parse_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
-static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstBuffer * buffer);
+static gboolean gst_base_parse_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_base_parse_sink_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
+
+static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
static void gst_base_parse_loop (GstPad * pad);
-static gboolean gst_base_parse_check_frame (GstBaseParse * parse,
- GstBaseParseFrame * frame, guint * framesize, gint * skipsize);
static GstFlowReturn gst_base_parse_parse_frame (GstBaseParse * parse,
GstBaseParseFrame * frame);
@@ -403,8 +436,11 @@ static gint64 gst_base_parse_find_offset (GstBaseParse * parse,
static GstFlowReturn gst_base_parse_locate_time (GstBaseParse * parse,
GstClockTime * _time, gint64 * _offset);
-static GstFlowReturn gst_base_parse_process_fragment (GstBaseParse * parse,
- gboolean push_only);
+static GstFlowReturn gst_base_parse_start_fragment (GstBaseParse * parse);
+static GstFlowReturn gst_base_parse_finish_fragment (GstBaseParse * parse,
+ gboolean prev_head);
+
+static inline GstFlowReturn gst_base_parse_check_sync (GstBaseParse * parse);
static gboolean gst_base_parse_is_seekable (GstBaseParse * parse);
@@ -418,28 +454,37 @@ gst_base_parse_clear_queues (GstBaseParse * parse)
NULL);
g_slist_free (parse->priv->buffers_pending);
parse->priv->buffers_pending = NULL;
+ g_slist_foreach (parse->priv->buffers_head, (GFunc) gst_buffer_unref, NULL);
+ g_slist_free (parse->priv->buffers_head);
+ parse->priv->buffers_head = NULL;
g_slist_foreach (parse->priv->buffers_send, (GFunc) gst_buffer_unref, NULL);
g_slist_free (parse->priv->buffers_send);
parse->priv->buffers_send = NULL;
+
+ g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL);
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
+
+ g_queue_foreach (&parse->priv->queued_frames,
+ (GFunc) gst_base_parse_frame_free, NULL);
+ g_queue_clear (&parse->priv->queued_frames);
+
+ gst_buffer_replace (&parse->priv->cache, NULL);
+
+ g_list_foreach (parse->priv->pending_events, (GFunc) gst_event_unref, NULL);
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
}
static void
gst_base_parse_finalize (GObject * object)
{
GstBaseParse *parse = GST_BASE_PARSE (object);
- GstEvent **p_ev;
g_object_unref (parse->priv->adapter);
- if (parse->priv->pending_segment) {
- p_ev = &parse->priv->pending_segment;
- gst_event_replace (p_ev, NULL);
- }
- if (parse->priv->close_segment) {
- p_ev = &parse->priv->close_segment;
- gst_event_replace (p_ev, NULL);
- }
-
if (parse->priv->cache) {
gst_buffer_unref (parse->priv->cache);
parse->priv->cache = NULL;
@@ -449,17 +494,13 @@ gst_base_parse_finalize (GObject * object)
NULL);
g_list_free (parse->priv->pending_events);
parse->priv->pending_events = NULL;
-
- g_queue_foreach (&parse->priv->queued_frames,
- (GFunc) gst_base_parse_frame_free, NULL);
- g_queue_clear (&parse->priv->queued_frames);
+ parse->priv->pending_segment = FALSE;
if (parse->priv->index) {
gst_object_unref (parse->priv->index);
parse->priv->index = NULL;
}
-
- g_static_mutex_free (&parse->priv->index_lock);
+ g_mutex_clear (&parse->priv->index_lock);
gst_base_parse_clear_queues (parse);
@@ -480,12 +521,14 @@ gst_base_parse_class_init (GstBaseParseClass * klass)
gstelement_class = (GstElementClass *) klass;
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_base_parse_change_state);
+
+#if 0
gstelement_class->set_index = GST_DEBUG_FUNCPTR (gst_base_parse_set_index);
gstelement_class->get_index = GST_DEBUG_FUNCPTR (gst_base_parse_get_index);
+#endif
/* Default handlers */
- klass->check_valid_frame = gst_base_parse_check_frame;
- klass->parse_frame = gst_base_parse_parse_frame;
+ klass->sink_event = gst_base_parse_sink_eventfunc;
klass->src_event = gst_base_parse_src_eventfunc;
klass->convert = gst_base_parse_convert_default;
@@ -508,14 +551,15 @@ gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass)
parse->sinkpad = gst_pad_new_from_template (pad_template, "sink");
gst_pad_set_event_function (parse->sinkpad,
GST_DEBUG_FUNCPTR (gst_base_parse_sink_event));
+ gst_pad_set_query_function (parse->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_base_parse_sink_query));
gst_pad_set_chain_function (parse->sinkpad,
GST_DEBUG_FUNCPTR (gst_base_parse_chain));
gst_pad_set_activate_function (parse->sinkpad,
GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate));
- gst_pad_set_activatepush_function (parse->sinkpad,
- GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_push));
- gst_pad_set_activatepull_function (parse->sinkpad,
- GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_pull));
+ gst_pad_set_activatemode_function (parse->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_mode));
+ GST_PAD_SET_PROXY_ALLOCATION (parse->sinkpad);
gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
GST_DEBUG_OBJECT (parse, "sinkpad created");
@@ -526,10 +570,8 @@ gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass)
parse->srcpad = gst_pad_new_from_template (pad_template, "src");
gst_pad_set_event_function (parse->srcpad,
GST_DEBUG_FUNCPTR (gst_base_parse_src_event));
- gst_pad_set_query_type_function (parse->srcpad,
- GST_DEBUG_FUNCPTR (gst_base_parse_get_querytypes));
gst_pad_set_query_function (parse->srcpad,
- GST_DEBUG_FUNCPTR (gst_base_parse_query));
+ GST_DEBUG_FUNCPTR (gst_base_parse_src_query));
gst_pad_use_fixed_caps (parse->srcpad);
gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
GST_DEBUG_OBJECT (parse, "src created");
@@ -538,13 +580,15 @@ gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass)
parse->priv->adapter = gst_adapter_new ();
- parse->priv->pad_mode = GST_ACTIVATE_NONE;
+ parse->priv->pad_mode = GST_PAD_MODE_NONE;
- g_static_mutex_init (&parse->priv->index_lock);
+ g_mutex_init (&parse->priv->index_lock);
/* init state */
gst_base_parse_reset (parse);
GST_DEBUG_OBJECT (parse, "init ok");
+
+ GST_OBJECT_FLAG_SET (parse, GST_ELEMENT_FLAG_INDEXABLE);
}
static GstBaseParseFrame *
@@ -578,21 +622,9 @@ gst_base_parse_frame_free (GstBaseParseFrame * frame)
}
}
-GType
-gst_base_parse_frame_get_type (void)
-{
- static volatile gsize frame_type = 0;
-
- if (g_once_init_enter (&frame_type)) {
- GType _type;
-
- _type = g_boxed_type_register_static ("GstBaseParseFrame",
- (GBoxedCopyFunc) gst_base_parse_frame_copy,
- (GBoxedFreeFunc) gst_base_parse_frame_free);
- g_once_init_leave (&frame_type, _type);
- }
- return (GType) frame_type;
-}
+G_DEFINE_BOXED_TYPE (GstBaseParseFrame, gst_base_parse_frame,
+ (GBoxedCopyFunc) gst_base_parse_frame_copy,
+ (GBoxedFreeFunc) gst_base_parse_frame_free);
/**
* gst_base_parse_frame_init:
@@ -627,8 +659,7 @@ gst_base_parse_frame_init (GstBaseParseFrame * frame)
* then use gst_base_parse_frame_init() to initialise it.
*
* Returns: a newly-allocated #GstBaseParseFrame. Free with
- * gst_base_parse_frame_free() when no longer needed, unless you gave
- * away ownership to gst_base_parse_push_frame().
+ * gst_base_parse_frame_free() when no longer needed.
*
* Since: 0.10.33
*/
@@ -685,6 +716,7 @@ gst_base_parse_reset (GstBaseParse * parse)
parse->priv->first_frame_ts = GST_CLOCK_TIME_NONE;
parse->priv->first_frame_offset = -1;
parse->priv->estimated_duration = -1;
+ parse->priv->estimated_drift = 0;
parse->priv->next_ts = 0;
parse->priv->syncable = TRUE;
parse->priv->passthrough = FALSE;
@@ -710,15 +742,11 @@ gst_base_parse_reset (GstBaseParse * parse)
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
parse->priv->last_offset = 0;
- if (parse->priv->pending_segment) {
- gst_event_unref (parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
- }
-
g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref,
NULL);
g_list_free (parse->priv->pending_events);
parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
if (parse->priv->cache) {
gst_buffer_unref (parse->priv->cache);
@@ -729,33 +757,17 @@ gst_base_parse_reset (GstBaseParse * parse)
g_slist_free (parse->priv->pending_seeks);
parse->priv->pending_seeks = NULL;
- /* we know it is not alloc'ed, but maybe other stuff to free, some day ... */
- parse->priv->frame._private_flags |=
- GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC;
- gst_base_parse_frame_free (&parse->priv->frame);
- GST_OBJECT_UNLOCK (parse);
-}
+ if (parse->priv->adapter)
+ gst_adapter_clear (parse->priv->adapter);
-/* gst_base_parse_check_frame:
- * @parse: #GstBaseParse.
- * @buffer: GstBuffer.
- * @framesize: This will be set to tell the found frame size in bytes.
- * @skipsize: Output parameter that tells how much data needs to be skipped
- * in order to find the following frame header.
- *
- * Default callback for check_valid_frame.
- *
- * Returns: Always TRUE.
- */
-static gboolean
-gst_base_parse_check_frame (GstBaseParse * parse,
- GstBaseParseFrame * frame, guint * framesize, gint * skipsize)
-{
- *framesize = gst_buffer_get_size (frame->buffer);
- *skipsize = 0;
- return TRUE;
-}
+ parse->priv->new_frame = TRUE;
+ g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref, NULL);
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
+ GST_OBJECT_UNLOCK (parse);
+}
/* gst_base_parse_parse_frame:
* @parse: #GstBaseParse.
@@ -842,25 +854,26 @@ gst_base_parse_convert (GstBaseParse * parse,
* Returns: TRUE if the event was handled.
*/
static gboolean
-gst_base_parse_sink_event (GstPad * pad, GstEvent * event)
+gst_base_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstBaseParse *parse;
GstBaseParseClass *bclass;
- gboolean handled = FALSE;
- gboolean ret = TRUE;
+ gboolean ret;
- parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
+ parse = GST_BASE_PARSE (parent);
bclass = GST_BASE_PARSE_GET_CLASS (parse);
GST_DEBUG_OBJECT (parse, "handling event %d, %s", GST_EVENT_TYPE (event),
GST_EVENT_TYPE_NAME (event));
- /* Cache all events except EOS, SEGMENT and FLUSH_STOP if we have a
+ /* Cache all serialized events except EOS, SEGMENT and FLUSH_STOP if we have a
* pending segment */
- if (parse->priv->pending_segment && GST_EVENT_TYPE (event) != GST_EVENT_EOS
+ if (parse->priv->pending_segment && GST_EVENT_IS_SERIALIZED (event)
+ && GST_EVENT_TYPE (event) != GST_EVENT_EOS
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_START
- && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
+ && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP
+ && GST_EVENT_TYPE (event) != GST_EVENT_CAPS) {
if (GST_EVENT_TYPE (event) == GST_EVENT_TAG)
/* See if any bitrate tags were posted */
@@ -870,24 +883,21 @@ gst_base_parse_sink_event (GstPad * pad, GstEvent * event)
g_list_append (parse->priv->pending_events, event);
ret = TRUE;
} else {
-
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
parse->priv->framecount < MIN_FRAMES_TO_POST_BITRATE)
/* We've not posted bitrate tags yet - do so now */
gst_base_parse_post_bitrates (parse, TRUE, TRUE, TRUE);
- if (bclass->event)
- handled = bclass->event (parse, event);
-
- if (!handled)
- handled = gst_base_parse_sink_eventfunc (parse, event);
-
- if (!handled)
- ret = gst_pad_event_default (pad, event);
+ if (bclass->sink_event)
+ ret = bclass->sink_event (parse, event);
+ else {
+ gst_event_unref (event);
+ ret = FALSE;
+ }
}
- gst_object_unref (parse);
GST_DEBUG_OBJECT (parse, "event handled");
+
return ret;
}
@@ -906,8 +916,7 @@ gst_base_parse_sink_event (GstPad * pad, GstEvent * event)
static gboolean
gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
{
- gboolean handled = FALSE;
- GstEvent **eventp;
+ gboolean ret;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
@@ -921,11 +930,12 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps);
if (klass->set_sink_caps)
- klass->set_sink_caps (parse, caps);
+ ret = klass->set_sink_caps (parse, caps);
+ else
+ ret = TRUE;
/* will send our own caps downstream */
gst_event_unref (event);
- handled = TRUE;
break;
}
case GST_EVENT_SEGMENT:
@@ -1005,8 +1015,8 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
gst_event_unref (event);
out_segment.start = 0;
- out_segment.stop = GST_CLOCK_TIME_NONE;;
- out_segment.time = 0;;
+ out_segment.stop = GST_CLOCK_TIME_NONE;
+ out_segment.time = 0;
event = gst_event_new_segment (&out_segment);
@@ -1028,17 +1038,17 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
/* save the segment for later, right before we push a new buffer so that
* the caps are fixed and the next linked element can receive
* the segment. */
- eventp = &parse->priv->pending_segment;
- gst_event_replace (eventp, event);
- gst_event_unref (event);
- handled = TRUE;
+ parse->priv->pending_events =
+ g_list_append (parse->priv->pending_events, event);
+ parse->priv->pending_segment = TRUE;
+ ret = TRUE;
/* but finish the current segment */
GST_DEBUG_OBJECT (parse, "draining current segment");
if (in_segment->rate > 0.0)
gst_base_parse_drain (parse);
else
- gst_base_parse_process_fragment (parse, FALSE);
+ gst_base_parse_finish_fragment (parse, FALSE);
gst_adapter_clear (parse->priv->adapter);
parse->priv->offset = offset;
@@ -1052,49 +1062,108 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
case GST_EVENT_FLUSH_START:
parse->priv->flushing = TRUE;
- handled = gst_pad_push_event (parse->srcpad, gst_event_ref (event));
- if (handled)
- gst_event_unref (event);
+ ret = gst_pad_push_event (parse->srcpad, event);
/* Wait for _chain() to exit by taking the srcpad STREAM_LOCK */
GST_PAD_STREAM_LOCK (parse->srcpad);
GST_PAD_STREAM_UNLOCK (parse->srcpad);
-
break;
case GST_EVENT_FLUSH_STOP:
+ ret = gst_pad_push_event (parse->srcpad, event);
gst_adapter_clear (parse->priv->adapter);
gst_base_parse_clear_queues (parse);
parse->priv->flushing = FALSE;
parse->priv->discont = TRUE;
parse->priv->last_ts = GST_CLOCK_TIME_NONE;
- parse->priv->frame._private_flags |=
- GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC;
- gst_base_parse_frame_free (&parse->priv->frame);
+ parse->priv->new_frame = TRUE;
break;
case GST_EVENT_EOS:
if (parse->segment.rate > 0.0)
gst_base_parse_drain (parse);
else
- gst_base_parse_process_fragment (parse, FALSE);
+ gst_base_parse_finish_fragment (parse, TRUE);
/* If we STILL have zero frames processed, fire an error */
if (parse->priv->framecount == 0) {
GST_ELEMENT_ERROR (parse, STREAM, WRONG_TYPE,
("No valid frames found before end of stream"), (NULL));
}
- /* newsegment before eos */
- if (parse->priv->pending_segment) {
- gst_pad_push_event (parse->srcpad, parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
+ /* newsegment and other serialized events before eos */
+ if (G_UNLIKELY (parse->priv->pending_events)) {
+ GList *l;
+
+ for (l = parse->priv->pending_events; l != NULL; l = l->next) {
+ gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
+ }
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
}
+ ret = gst_pad_push_event (parse->srcpad, event);
break;
default:
+ ret =
+ gst_pad_event_default (parse->sinkpad, GST_OBJECT_CAST (parse),
+ event);
break;
}
+ return ret;
+}
+
+static gboolean
+gst_base_parse_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+ GstBaseParse *parse;
+ GstBaseParseClass *bclass;
+ gboolean res;
+
+ parse = GST_BASE_PARSE (parent);
+ bclass = GST_BASE_PARSE_GET_CLASS (parse);
+
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_CAPS:
+ {
+ if (bclass->get_sink_caps) {
+ GstCaps *caps, *filter;
+
+ gst_query_parse_caps (query, &filter);
+ caps = bclass->get_sink_caps (parse, filter);
+ GST_LOG_OBJECT (parse, "sink getcaps returning caps %" GST_PTR_FORMAT,
+ caps);
+ gst_query_set_caps_result (query, caps);
+ gst_caps_unref (caps);
+
+ res = TRUE;
+ } else {
+ GstCaps *caps, *template_caps, *filter;
+
+ gst_query_parse_caps (query, &filter);
+ template_caps = gst_pad_get_pad_template_caps (pad);
+ if (filter != NULL) {
+ caps =
+ gst_caps_intersect_full (filter, template_caps,
+ GST_CAPS_INTERSECT_FIRST);
+ gst_caps_unref (template_caps);
+ } else {
+ caps = template_caps;
+ }
+ gst_query_set_caps_result (query, caps);
+ gst_caps_unref (caps);
- return handled;
+ res = TRUE;
+ }
+ break;
+ }
+ default:
+ {
+ res = gst_pad_query_default (pad, parent, query);
+ break;
+ }
+ }
+
+ return res;
}
@@ -1107,26 +1176,23 @@ gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
* Returns: TRUE if the event was handled.
*/
static gboolean
-gst_base_parse_src_event (GstPad * pad, GstEvent * event)
+gst_base_parse_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstBaseParse *parse;
GstBaseParseClass *bclass;
- gboolean handled = FALSE;
gboolean ret = TRUE;
- parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
+ parse = GST_BASE_PARSE (parent);
bclass = GST_BASE_PARSE_GET_CLASS (parse);
GST_DEBUG_OBJECT (parse, "event %d, %s", GST_EVENT_TYPE (event),
GST_EVENT_TYPE_NAME (event));
if (bclass->src_event)
- handled = bclass->src_event (parse, event);
-
- if (!handled)
- ret = gst_pad_event_default (pad, event);
+ ret = bclass->src_event (parse, event);
+ else
+ gst_event_unref (event);
- gst_object_unref (parse);
return ret;
}
@@ -1149,20 +1215,19 @@ gst_base_parse_is_seekable (GstBaseParse * parse)
static gboolean
gst_base_parse_src_eventfunc (GstBaseParse * parse, GstEvent * event)
{
- gboolean handled = FALSE;
+ gboolean res = FALSE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
- {
- if (gst_base_parse_is_seekable (parse)) {
- handled = gst_base_parse_handle_seek (parse, event);
- }
+ if (gst_base_parse_is_seekable (parse))
+ res = gst_base_parse_handle_seek (parse, event);
break;
- }
default:
+ res = gst_pad_event_default (parse->srcpad, GST_OBJECT_CAST (parse),
+ event);
break;
}
- return handled;
+ return res;
}
@@ -1205,13 +1270,13 @@ gst_base_parse_convert_default (GstBaseParse * parse,
/* need at least some frames */
if (!parse->priv->framecount)
- return FALSE;
+ goto no_framecount;
duration = parse->priv->acc_duration / GST_MSECOND;
bytes = parse->priv->bytecount;
if (G_UNLIKELY (!duration || !bytes))
- return FALSE;
+ goto no_duration_bytes;
if (src_format == GST_FORMAT_BYTES) {
if (dest_format == GST_FORMAT_TIME) {
@@ -1222,6 +1287,8 @@ gst_base_parse_convert_default (GstBaseParse * parse,
GST_DEBUG_OBJECT (parse, "conversion result: %" G_GINT64_FORMAT " ms",
*dest_value / GST_MSECOND);
ret = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (parse, "converting bytes -> other not implemented");
}
} else if (src_format == GST_FORMAT_TIME) {
if (dest_format == GST_FORMAT_BYTES) {
@@ -1232,20 +1299,39 @@ gst_base_parse_convert_default (GstBaseParse * parse,
"time %" G_GINT64_FORMAT " ms in bytes = %" G_GINT64_FORMAT,
src_value / GST_MSECOND, *dest_value);
ret = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (parse, "converting time -> other not implemented");
}
} else if (src_format == GST_FORMAT_DEFAULT) {
/* DEFAULT == frame-based */
if (dest_format == GST_FORMAT_TIME) {
+ GST_DEBUG_OBJECT (parse, "converting default -> time");
if (parse->priv->fps_den) {
*dest_value = gst_util_uint64_scale (src_value,
GST_SECOND * parse->priv->fps_den, parse->priv->fps_num);
ret = TRUE;
}
- } else if (dest_format == GST_FORMAT_BYTES) {
+ } else {
+ GST_DEBUG_OBJECT (parse, "converting default -> other not implemented");
}
+ } else {
+ GST_DEBUG_OBJECT (parse, "conversion not implemented");
}
-
return ret;
+
+ /* ERRORS */
+no_framecount:
+ {
+ GST_DEBUG_OBJECT (parse, "no framecount");
+ return FALSE;
+ }
+no_duration_bytes:
+ {
+ GST_DEBUG_OBJECT (parse, "no duration %" G_GUINT64_FORMAT ", bytes %"
+ G_GUINT64_FORMAT, duration, bytes);
+ return FALSE;
+ }
+
}
static void
@@ -1266,6 +1352,17 @@ gst_base_parse_update_duration (GstBaseParse * baseparse)
if (qres) {
if (gst_base_parse_convert (parse, GST_FORMAT_BYTES, ptot,
GST_FORMAT_TIME, &dest_value)) {
+
+ /* inform if duration changed, but try to avoid spamming */
+ parse->priv->estimated_drift +=
+ dest_value - parse->priv->estimated_duration;
+ if (parse->priv->estimated_drift > GST_SECOND ||
+ parse->priv->estimated_drift < -GST_SECOND) {
+ gst_element_post_message (GST_ELEMENT (parse),
+ gst_message_new_duration (GST_OBJECT (parse),
+ GST_FORMAT_TIME, dest_value));
+ parse->priv->estimated_drift = 0;
+ }
parse->priv->estimated_duration = dest_value;
GST_LOG_OBJECT (parse,
"updated estimated duration to %" GST_TIME_FORMAT,
@@ -1282,7 +1379,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min,
GstTagList *taglist = NULL;
if (post_min && parse->priv->post_min_bitrate) {
- taglist = gst_tag_list_new ();
+ taglist = gst_tag_list_new_empty ();
gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE,
GST_TAG_MINIMUM_BITRATE, parse->priv->min_bitrate, NULL);
@@ -1290,7 +1387,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min,
if (post_avg && parse->priv->post_avg_bitrate) {
if (taglist == NULL)
- taglist = gst_tag_list_new ();
+ taglist = gst_tag_list_new_empty ();
parse->priv->posted_avg_bitrate = parse->priv->avg_bitrate;
gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE, GST_TAG_BITRATE,
@@ -1299,7 +1396,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min,
if (post_max && parse->priv->post_max_bitrate) {
if (taglist == NULL)
- taglist = gst_tag_list_new ();
+ taglist = gst_tag_list_new_empty ();
gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE,
GST_TAG_MAXIMUM_BITRATE, parse->priv->max_bitrate, NULL);
@@ -1310,8 +1407,7 @@ gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min,
parse->priv->max_bitrate);
if (taglist != NULL) {
- gst_element_found_tags_for_pad (GST_ELEMENT_CAST (parse), parse->srcpad,
- taglist);
+ gst_pad_push_event (parse->srcpad, gst_event_new_tag (taglist));
}
}
@@ -1397,14 +1493,6 @@ gst_base_parse_update_bitrates (GstBaseParse * parse, GstBaseParseFrame * frame)
if ((update_min || update_avg || update_max))
gst_base_parse_post_bitrates (parse, update_min, update_avg, update_max);
- /* If average bitrate changes that much and no valid (time) duration provided,
- * then post a new duration message so applications can update their cached
- * values */
- if (update_avg && !(parse->priv->duration_fmt == GST_FORMAT_TIME &&
- GST_CLOCK_TIME_IS_VALID (parse->priv->duration)))
- gst_element_post_message (GST_ELEMENT (parse),
- gst_message_new_duration (GST_OBJECT (parse), GST_FORMAT_TIME, -1));
-
exit:
return;
}
@@ -1481,11 +1569,12 @@ gst_base_parse_add_index_entry (GstBaseParse * parse, guint64 offset,
associations[1].value = offset;
/* index might change on-the-fly, although that would be nutty app ... */
- g_static_mutex_lock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_LOCK (parse);
gst_index_add_associationv (parse->priv->index, parse->priv->index_id,
- (key) ? GST_ASSOCIATION_FLAG_KEY_UNIT : GST_ASSOCIATION_FLAG_DELTA_UNIT,
- 2, (const GstIndexAssociation *) &associations);
- g_static_mutex_unlock (&parse->priv->index_lock);
+ (key) ? GST_INDEX_ASSOCIATION_FLAG_KEY_UNIT :
+ GST_INDEX_ASSOCIATION_FLAG_DELTA_UNIT, 2,
+ (const GstIndexAssociation *) &associations);
+ GST_BASE_PARSE_INDEX_UNLOCK (parse);
if (key) {
parse->priv->index_last_offset = offset;
@@ -1518,7 +1607,7 @@ gst_base_parse_check_seekability (GstBaseParse * parse)
/* try harder to query upstream size if we didn't get it the first time */
if (seekable && stop == -1) {
GST_DEBUG_OBJECT (parse, "doing duration query to fix up unset stop");
- gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_BYTES, &stop);
+ gst_pad_peer_query_duration (parse->sinkpad, GST_FORMAT_BYTES, &stop);
}
/* if upstream doesn't know the size, it's likely that it's not seekable in
@@ -1556,7 +1645,7 @@ gst_base_parse_check_upstream (GstBaseParse * parse)
{
gint64 stop;
- if (gst_pad_query_peer_duration (parse->sinkpad, GST_FORMAT_TIME, &stop))
+ if (gst_pad_peer_query_duration (parse->sinkpad, GST_FORMAT_TIME, &stop))
if (GST_CLOCK_TIME_IS_VALID (stop) && stop) {
/* upstream has one, accept it also, and no further updates */
gst_base_parse_set_duration (parse, GST_FORMAT_TIME, stop, 0);
@@ -1608,6 +1697,136 @@ gst_base_parse_queue_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
}
}
+/* makes sure that @buf is properly prepared and decorated for passing
+ * to baseclass, and an equally setup frame is returned setup with @buf.
+ * Takes ownership of @buf. */
+static GstBaseParseFrame *
+gst_base_parse_prepare_frame (GstBaseParse * parse, GstBuffer * buffer)
+{
+ GstBaseParseFrame *frame = NULL;
+
+ buffer = gst_buffer_make_writable (buffer);
+
+ GST_LOG_OBJECT (parse,
+ "preparing frame at offset %" G_GUINT64_FORMAT
+ " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT,
+ GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer),
+ gst_buffer_get_size (buffer));
+
+ if (parse->priv->discont) {
+ GST_DEBUG_OBJECT (parse, "marking DISCONT");
+ GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
+ parse->priv->discont = FALSE;
+ }
+
+ GST_BUFFER_OFFSET (buffer) = parse->priv->offset;
+
+ frame = gst_base_parse_frame_new (buffer, 0, 0);
+
+ /* also ensure to update state flags */
+ gst_base_parse_frame_update (parse, frame, buffer);
+ gst_buffer_unref (buffer);
+
+ if (parse->priv->prev_offset != parse->priv->offset || parse->priv->new_frame) {
+ GST_LOG_OBJECT (parse, "marking as new frame");
+ parse->priv->new_frame = FALSE;
+ frame->flags |= GST_BASE_PARSE_FRAME_FLAG_NEW_FRAME;
+ }
+
+ frame->offset = parse->priv->prev_offset = parse->priv->offset;
+
+ /* use default handler to provide initial (upstream) metadata */
+ gst_base_parse_parse_frame (parse, frame);
+
+ return frame;
+}
+
+/* Wraps buffer in a frame and dispatches to subclass.
+ * Also manages data skipping and offset handling (including adapter flushing).
+ * Takes ownership of @buffer */
+static GstFlowReturn
+gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
+ gint * skip, gint * flushed)
+{
+ GstBaseParseClass *klass = GST_BASE_PARSE_GET_CLASS (parse);
+ GstBaseParseFrame *frame;
+ GstFlowReturn ret;
+
+ g_return_val_if_fail (skip != NULL || flushed != NULL, GST_FLOW_ERROR);
+
+ GST_LOG_OBJECT (parse,
+ "handling buffer of size %" G_GSIZE_FORMAT " with ts %" GST_TIME_FORMAT
+ ", duration %" GST_TIME_FORMAT, gst_buffer_get_size (buffer),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+ /* track what is being flushed during this single round of frame processing */
+ parse->priv->flushed = 0;
+ *skip = 0;
+
+ /* make it easy for _finish_frame to pick up input data */
+ if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
+ gst_buffer_ref (buffer);
+ gst_adapter_push (parse->priv->adapter, buffer);
+ }
+
+ frame = gst_base_parse_prepare_frame (parse, buffer);
+ ret = klass->handle_frame (parse, frame, skip);
+
+ *flushed = parse->priv->flushed;
+
+ GST_LOG_OBJECT (parse, "handle_frame skipped %d, flushed %d",
+ *skip, *flushed);
+
+ if (ret != GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (parse, "handle_frame returned %d", ret);
+ goto exit;
+ }
+
+ /* subclass can only do one of these, or semantics are too unclear */
+ g_assert (*skip == 0 || *flushed == 0);
+
+ /* track skipping */
+ if (*skip > 0) {
+ GstClockTime timestamp;
+ GstBuffer *outbuf;
+
+ GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", *skip);
+ if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
+ /* reverse playback, and no frames found yet, so we are skipping
+ * the leading part of a fragment, which may form the tail of
+ * fragment coming later, hopefully subclass skips efficiently ... */
+ timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
+ outbuf = gst_adapter_take_buffer (parse->priv->adapter, *skip);
+ outbuf = gst_buffer_make_writable (outbuf);
+ GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
+ parse->priv->buffers_head =
+ g_slist_prepend (parse->priv->buffers_head, outbuf);
+ outbuf = NULL;
+ } else {
+ gst_adapter_flush (parse->priv->adapter, *skip);
+ }
+ if (!parse->priv->discont)
+ parse->priv->sync_offset = parse->priv->offset;
+ parse->priv->offset += *skip;
+ parse->priv->discont = TRUE;
+ /* check for indefinite skipping */
+ if (ret == GST_FLOW_OK)
+ ret = gst_base_parse_check_sync (parse);
+ }
+
+ parse->priv->offset += *flushed;
+
+exit:
+ if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
+ gst_adapter_clear (parse->priv->adapter);
+ }
+
+ gst_base_parse_frame_free (frame);
+
+ return ret;
+}
+
/* gst_base_parse_handle_and_push_buffer:
* @parse: #GstBaseParse.
* @klass: #GstBaseParseClass.
@@ -1622,50 +1841,27 @@ gst_base_parse_queue_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
*/
static GstFlowReturn
gst_base_parse_handle_and_push_frame (GstBaseParse * parse,
- GstBaseParseClass * klass, GstBaseParseFrame * frame)
+ GstBaseParseFrame * frame)
{
- GstFlowReturn ret;
gint64 offset;
GstBuffer *buffer;
g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR);
- buffer = frame->buffer;
-
- if (parse->priv->discont) {
- GST_DEBUG_OBJECT (parse, "marking DISCONT");
- GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
- parse->priv->discont = FALSE;
- }
-
/* some one-time start-up */
if (G_UNLIKELY (!parse->priv->framecount)) {
gst_base_parse_check_seekability (parse);
gst_base_parse_check_upstream (parse);
}
- GST_LOG_OBJECT (parse,
- "parsing frame at offset %" G_GUINT64_FORMAT
- " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT,
- GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer),
- gst_buffer_get_size (buffer));
-
- /* use default handler to provide initial (upstream) metadata */
- gst_base_parse_parse_frame (parse, frame);
-
- /* store offset as it might get overwritten */
- offset = GST_BUFFER_OFFSET (buffer);
- ret = klass->parse_frame (parse, frame);
- /* sync */
buffer = frame->buffer;
- /* subclass must play nice */
- g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
+ offset = frame->offset;
/* check if subclass/format can provide ts.
* If so, that allows and enables extra seek and duration determining options */
- if (G_UNLIKELY (parse->priv->first_frame_offset < 0 && ret == GST_FLOW_OK)) {
+ if (G_UNLIKELY (parse->priv->first_frame_offset < 0)) {
if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) && parse->priv->has_timing_info
- && parse->priv->pad_mode == GST_ACTIVATE_PULL) {
+ && parse->priv->pad_mode == GST_PAD_MODE_PULL) {
parse->priv->first_frame_offset = offset;
parse->priv->first_frame_ts = GST_BUFFER_TIMESTAMP (buffer);
GST_DEBUG_OBJECT (parse, "subclass provided ts %" GST_TIME_FORMAT
@@ -1707,26 +1903,12 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse,
GST_BUFFER_TIMESTAMP (buffer),
!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT), FALSE);
- /* First buffers are dropped, this means that the subclass needs more
- * frames to decide on the format and queues them internally */
- /* convert internal flow to OK and mark discont for the next buffer. */
- if (ret == GST_BASE_PARSE_FLOW_DROPPED) {
- gst_base_parse_frame_free (frame);
- return GST_FLOW_OK;
- } else if (ret == GST_BASE_PARSE_FLOW_QUEUED) {
- gst_base_parse_queue_frame (parse, frame);
- return GST_FLOW_OK;
- } else if (ret != GST_FLOW_OK) {
- return ret;
- }
-
/* All OK, push queued frames if there are any */
if (G_UNLIKELY (!g_queue_is_empty (&parse->priv->queued_frames))) {
GstBaseParseFrame *queued_frame;
while ((queued_frame = g_queue_pop_head (&parse->priv->queued_frames))) {
gst_base_parse_push_frame (parse, queued_frame);
- gst_base_parse_frame_free (queued_frame);
}
}
@@ -1736,12 +1918,11 @@ gst_base_parse_handle_and_push_frame (GstBaseParse * parse,
/**
* gst_base_parse_push_frame:
* @parse: #GstBaseParse.
- * @frame: (transfer full): a #GstBaseParseFrame
+ * @frame: (transfer none): a #GstBaseParseFrame
*
- * Pushes the frame downstream, sends any pending events and
- * does some timestamp and segment handling. Takes ownership
- * of @frame and will clear it (if it was initialised with
- * gst_base_parse_frame_init()) or free it.
+ * Pushes the frame's buffer downstream, sends any pending events and
+ * does some timestamp and segment handling. Takes ownership of
+ * frame's buffer, though caller retains ownership of @frame.
*
* This must be called with sinkpad STREAM_LOCK held.
*
@@ -1773,8 +1954,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
/* update stats */
- size = gst_buffer_get_size (buffer);
- parse->priv->bytecount += size;
+ parse->priv->bytecount += frame->size;
if (G_LIKELY (!(frame->flags & GST_BASE_PARSE_FRAME_FLAG_NO_FRAME))) {
parse->priv->framecount++;
if (GST_BUFFER_DURATION_IS_VALID (buffer)) {
@@ -1795,29 +1975,31 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
last_stop = last_start + GST_BUFFER_DURATION (buffer);
/* should have caps by now */
- g_return_val_if_fail (gst_pad_has_current_caps (parse->srcpad),
- GST_FLOW_ERROR);
+ if (!gst_pad_has_current_caps (parse->srcpad))
+ goto no_caps;
- /* segment adjustment magic; only if we are running the whole show */
- if (!parse->priv->passthrough && parse->segment.rate > 0.0 &&
- (parse->priv->pad_mode == GST_ACTIVATE_PULL ||
- parse->priv->upstream_seekable)) {
- /* segment times are typically estimates,
- * actual frame data might lead subclass to different timestamps,
- * so override segment start from what is supplied there */
- if (G_UNLIKELY (parse->priv->pending_segment && !parse->priv->exact_position
- && GST_CLOCK_TIME_IS_VALID (last_start))) {
- gst_event_unref (parse->priv->pending_segment);
- parse->segment.start =
- MIN ((guint64) last_start, (guint64) parse->segment.stop);
+ if (G_UNLIKELY (parse->priv->pending_segment)) {
+ /* have caps; check identity */
+ gst_base_parse_check_media (parse);
+ }
- GST_DEBUG_OBJECT (parse,
- "adjusting pending segment start to %" GST_TIME_FORMAT,
- GST_TIME_ARGS (parse->segment.start));
+ /* Push pending events, including NEWSEGMENT events */
+ if (G_UNLIKELY (parse->priv->pending_events)) {
+ GList *l;
- parse->priv->pending_segment = gst_event_new_segment (&parse->segment);
+ for (l = parse->priv->pending_events; l != NULL; l = l->next) {
+ gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
}
- /* handle gaps, e.g. non-zero start-time, in as much not handled by above */
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
+ }
+
+ /* segment adjustment magic; only if we are running the whole show */
+ if (!parse->priv->passthrough && parse->segment.rate > 0.0 &&
+ (parse->priv->pad_mode == GST_PAD_MODE_PULL ||
+ parse->priv->upstream_seekable)) {
+ /* handle gaps */
if (GST_CLOCK_TIME_IS_VALID (parse->segment.position) &&
GST_CLOCK_TIME_IS_VALID (last_start)) {
GstClockTimeDiff diff;
@@ -1838,57 +2020,19 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
GST_TIME_ARGS (parse->segment.position),
GST_TIME_ARGS (last_start));
- if (G_UNLIKELY (parse->priv->pending_segment)) {
- gst_event_unref (parse->priv->pending_segment);
- parse->segment.start = last_start;
- parse->segment.time = last_start;
- parse->priv->pending_segment =
- gst_event_new_segment (&parse->segment);
- } else {
- /* skip gap FIXME */
- gst_pad_push_event (parse->srcpad,
- gst_event_new_segment (&parse->segment));
- }
+ /* skip gap FIXME */
+ gst_pad_push_event (parse->srcpad,
+ gst_event_new_segment (&parse->segment));
+
parse->segment.position = last_start;
}
}
}
- /* and should then also be linked downstream, so safe to send some events */
- if (G_UNLIKELY (parse->priv->close_segment)) {
- /* only set up by loop */
- GST_DEBUG_OBJECT (parse, "loop sending close segment");
- gst_pad_push_event (parse->srcpad, parse->priv->close_segment);
- parse->priv->close_segment = NULL;
- }
- if (G_UNLIKELY (parse->priv->pending_segment)) {
- GstEvent *pending_segment;
-
- pending_segment = parse->priv->pending_segment;
- parse->priv->pending_segment = NULL;
-
- GST_DEBUG_OBJECT (parse, "%s push pending segment",
- parse->priv->pad_mode == GST_ACTIVATE_PULL ? "loop" : "chain");
- gst_pad_push_event (parse->srcpad, pending_segment);
-
- /* have caps; check identity */
- gst_base_parse_check_media (parse);
- }
-
/* update bitrates and optionally post corresponding tags
* (following newsegment) */
gst_base_parse_update_bitrates (parse, frame);
- if (G_UNLIKELY (parse->priv->pending_events)) {
- GList *l;
-
- for (l = parse->priv->pending_events; l != NULL; l = l->next) {
- gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
- }
- g_list_free (parse->priv->pending_events);
- parse->priv->pending_events = NULL;
- }
-
if (klass->pre_push_frame) {
ret = klass->pre_push_frame (parse, frame);
} else {
@@ -1896,12 +2040,20 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
}
/* take final ownership of frame buffer */
- buffer = frame->buffer;
- frame->buffer = NULL;
+ if (frame->out_buffer) {
+ buffer = frame->out_buffer;
+ frame->out_buffer = NULL;
+ gst_buffer_replace (&frame->buffer, NULL);
+ } else {
+ buffer = frame->buffer;
+ frame->buffer = NULL;
+ }
/* subclass must play nice */
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
+ size = gst_buffer_get_size (buffer);
+
parse->priv->seen_keyframe |= parse->priv->is_video &&
!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
@@ -1911,7 +2063,7 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
GST_BUFFER_TIMESTAMP (buffer) >
parse->segment.stop + parse->priv->lead_out_ts) {
GST_LOG_OBJECT (parse, "Dropped frame, after segment");
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
} else if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
GST_BUFFER_DURATION_IS_VALID (buffer) &&
GST_CLOCK_TIME_IS_VALID (parse->segment.start) &&
@@ -1951,9 +2103,9 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
size, gst_flow_get_name (ret));
gst_buffer_unref (buffer);
/* if we are not sufficiently in control, let upstream decide on EOS */
- if (ret == GST_FLOW_UNEXPECTED &&
+ if (ret == GST_FLOW_EOS &&
(parse->priv->passthrough ||
- (parse->priv->pad_mode == GST_ACTIVATE_PUSH &&
+ (parse->priv->pad_mode == GST_PAD_MODE_PUSH &&
!parse->priv->upstream_seekable)))
ret = GST_FLOW_OK;
}
@@ -1963,11 +2115,107 @@ gst_base_parse_push_frame (GstBaseParse * parse, GstBaseParseFrame * frame)
parse->segment.position < last_stop)
parse->segment.position = last_stop;
- gst_base_parse_frame_free (frame);
-
return ret;
+
+ /* ERRORS */
+no_caps:
+ {
+ GST_ELEMENT_ERROR (parse, STREAM, DECODE, ("No caps set"), (NULL));
+ return GST_FLOW_ERROR;
+ }
}
+/**
+ * gst_base_parse_finish_frame:
+ * @parse: a #GstBaseParse
+ * @frame: a #GstBaseParseFrame
+ * @size: consumed input data represented by frame
+ *
+ * Collects parsed data and pushes this downstream.
+ * Source pad caps must be set when this is called.
+ *
+ * If @frame's out_buffer is set, that will be used as subsequent frame data.
+ * Otherwise, @size samples will be taken from the input and used for output,
+ * and the output's metadata (timestamps etc) will be taken as (optionally)
+ * set by the subclass on @frame's (input) buffer (which is otherwise
+ * ignored for any but the above purpose/information).
+ *
+ * Note that the latter buffer is invalidated by this call, whereas the
+ * caller retains ownership of @frame.
+ *
+ * Returns: a #GstFlowReturn that should be escalated to caller (of caller)
+ *
+ * Since: 0.11.1
+ */
+GstFlowReturn
+gst_base_parse_finish_frame (GstBaseParse * parse, GstBaseParseFrame * frame,
+ gint size)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR);
+ g_return_val_if_fail (frame->buffer != NULL, GST_FLOW_ERROR);
+ g_return_val_if_fail (size > 0 || frame->out_buffer, GST_FLOW_ERROR);
+ g_return_val_if_fail (gst_adapter_available (parse->priv->adapter) >= size,
+ GST_FLOW_ERROR);
+
+ GST_LOG_OBJECT (parse, "finished frame at offset %" G_GUINT64_FORMAT ", "
+ "flushing size %d", frame->offset, size);
+
+ if (parse->priv->scanning && frame->buffer) {
+ if (!parse->priv->scanned_frame) {
+ parse->priv->scanned_frame = gst_base_parse_frame_copy (frame);
+ }
+ goto exit;
+ }
+
+ parse->priv->flushed += size;
+
+ /* either PUSH or PULL mode arranges for adapter data */
+ /* ensure output buffer */
+ if (!frame->out_buffer) {
+ GstBuffer *src, *dest;
+
+ frame->out_buffer = gst_adapter_take_buffer (parse->priv->adapter, size);
+ dest = frame->out_buffer;
+ src = frame->buffer;
+ GST_BUFFER_PTS (dest) = GST_BUFFER_PTS (src);
+ GST_BUFFER_DTS (dest) = GST_BUFFER_DTS (src);
+ GST_BUFFER_OFFSET (dest) = GST_BUFFER_OFFSET (src);
+ GST_BUFFER_DURATION (dest) = GST_BUFFER_DURATION (src);
+ GST_BUFFER_OFFSET_END (dest) = GST_BUFFER_OFFSET_END (src);
+ GST_MINI_OBJECT_FLAGS (dest) = GST_MINI_OBJECT_FLAGS (src);
+ } else {
+ gst_adapter_flush (parse->priv->adapter, size);
+ }
+
+ /* use as input for subsequent processing */
+ gst_buffer_replace (&frame->buffer, frame->out_buffer);
+ gst_buffer_unref (frame->out_buffer);
+ frame->out_buffer = NULL;
+
+ /* mark input size consumed */
+ frame->size = size;
+
+ /* subclass might queue frames/data internally if it needs more
+ * frames to decide on the format, or might request us to queue here. */
+ if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_DROP) {
+ gst_buffer_replace (&frame->buffer, NULL);
+ goto exit;
+ } else if (frame->flags & GST_BASE_PARSE_FRAME_FLAG_QUEUE) {
+ GstBaseParseFrame *copy;
+
+ copy = gst_base_parse_frame_copy (frame);
+ copy->flags &= ~GST_BASE_PARSE_FRAME_FLAG_QUEUE;
+ gst_base_parse_queue_frame (parse, copy);
+ goto exit;
+ }
+
+ ret = gst_base_parse_handle_and_push_frame (parse, frame);
+
+exit:
+ return ret;
+}
/* gst_base_parse_drain:
*
@@ -1988,7 +2236,8 @@ gst_base_parse_drain (GstBaseParse * parse)
if (!avail)
break;
- if (gst_base_parse_chain (parse->sinkpad, NULL) != GST_FLOW_OK) {
+ if (gst_base_parse_chain (parse->sinkpad, GST_OBJECT_CAST (parse),
+ NULL) != GST_FLOW_OK) {
break;
}
@@ -2044,7 +2293,34 @@ gst_base_parse_send_buffers (GstBaseParse * parse)
return ret;
}
-/* gst_base_parse_process_fragment:
+/* gst_base_parse_start_fragment:
+ *
+ * Prepares for processing a reverse playback (forward) fragment
+ * by (re)setting proper state variables.
+ */
+static GstFlowReturn
+gst_base_parse_start_fragment (GstBaseParse * parse)
+{
+ GST_LOG_OBJECT (parse, "starting fragment");
+
+ /* invalidate so no fall-back timestamping is performed;
+ * ok if taken from subclass or upstream */
+ parse->priv->next_ts = GST_CLOCK_TIME_NONE;
+ parse->priv->prev_ts = GST_CLOCK_TIME_NONE;
+ /* prevent it hanging around stop all the time */
+ parse->segment.position = GST_CLOCK_TIME_NONE;
+ /* mark next run */
+ parse->priv->discont = TRUE;
+
+ /* head of previous fragment is now pending tail of current fragment */
+ parse->priv->buffers_pending = parse->priv->buffers_head;
+ parse->priv->buffers_head = NULL;
+
+ return GST_FLOW_OK;
+}
+
+
+/* gst_base_parse_finish_fragment:
*
* Processes a reverse playback (forward) fragment:
* - append head of last fragment that was skipped to current fragment data
@@ -2053,40 +2329,35 @@ gst_base_parse_send_buffers (GstBaseParse * parse)
* - push queued data
*/
static GstFlowReturn
-gst_base_parse_process_fragment (GstBaseParse * parse, gboolean push_only)
+gst_base_parse_finish_fragment (GstBaseParse * parse, gboolean prev_head)
{
GstBuffer *buf;
GstFlowReturn ret = GST_FLOW_OK;
gboolean seen_key = FALSE, seen_delta = FALSE;
- if (push_only)
- goto push;
+ GST_LOG_OBJECT (parse, "finishing fragment");
/* restore order */
parse->priv->buffers_pending = g_slist_reverse (parse->priv->buffers_pending);
while (parse->priv->buffers_pending) {
buf = GST_BUFFER_CAST (parse->priv->buffers_pending->data);
- GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
- gst_buffer_get_size (buf));
- gst_adapter_push (parse->priv->adapter, buf);
+ if (prev_head) {
+ GST_LOG_OBJECT (parse, "adding pending buffer (size %" G_GSIZE_FORMAT ")",
+ gst_buffer_get_size (buf));
+ gst_adapter_push (parse->priv->adapter, buf);
+ } else {
+ GST_LOG_OBJECT (parse, "discarding head buffer");
+ gst_buffer_unref (buf);
+ }
parse->priv->buffers_pending =
g_slist_delete_link (parse->priv->buffers_pending,
parse->priv->buffers_pending);
}
- /* invalidate so no fall-back timestamping is performed;
- * ok if taken from subclass or upstream */
- parse->priv->next_ts = GST_CLOCK_TIME_NONE;
- /* prevent it hanging around stop all the time */
- parse->segment.position = GST_CLOCK_TIME_NONE;
- /* mark next run */
- parse->priv->discont = TRUE;
-
/* chain looks for frames and queues resulting ones (in stead of pushing) */
/* initial skipped data is added to buffers_pending */
gst_base_parse_drain (parse);
-push:
if (parse->priv->buffers_send) {
buf = GST_BUFFER_CAST (parse->priv->buffers_send->data);
seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
@@ -2133,12 +2404,11 @@ push:
}
seen_key = FALSE;
}
- } else {
seen_delta = TRUE;
+ } else {
+ seen_key = TRUE;
}
- seen_key |= !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
-
parse->priv->buffers_send =
g_slist_prepend (parse->priv->buffers_send, buf);
parse->priv->buffers_queued =
@@ -2175,174 +2445,143 @@ gst_base_parse_check_sync (GstBaseParse * parse)
}
static GstFlowReturn
-gst_base_parse_chain (GstPad * pad, GstBuffer * buffer)
+gst_base_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstBaseParseClass *bclass;
GstBaseParse *parse;
GstFlowReturn ret = GST_FLOW_OK;
- GstBuffer *outbuf = NULL;
GstBuffer *tmpbuf = NULL;
guint fsize = 1;
gint skip = -1;
const guint8 *data;
- guint old_min_size = 0, min_size, av;
+ guint min_size, av;
GstClockTime timestamp;
- GstBaseParseFrame *frame;
- parse = GST_BASE_PARSE (GST_OBJECT_PARENT (pad));
+ parse = GST_BASE_PARSE (parent);
bclass = GST_BASE_PARSE_GET_CLASS (parse);
- frame = &parse->priv->frame;
+
+ if (parse->priv->detecting) {
+ GstBuffer *detect_buf;
+
+ if (parse->priv->detect_buffers_size == 0) {
+ detect_buf = gst_buffer_ref (buffer);
+ } else {
+ GList *l;
+ guint offset = 0;
+
+ detect_buf = gst_buffer_new ();
+
+ for (l = parse->priv->detect_buffers; l; l = l->next) {
+ gsize tmpsize = gst_buffer_get_size (l->data);
+
+ gst_buffer_copy_into (detect_buf, GST_BUFFER_CAST (l->data),
+ GST_BUFFER_COPY_MEMORY, offset, tmpsize);
+ offset += tmpsize;
+ }
+ if (buffer)
+ gst_buffer_copy_into (detect_buf, buffer, GST_BUFFER_COPY_MEMORY,
+ offset, gst_buffer_get_size (buffer));
+ }
+
+ ret = bclass->detect (parse, detect_buf);
+ gst_buffer_unref (detect_buf);
+
+ if (ret == GST_FLOW_OK) {
+ GList *l;
+
+ /* Detected something */
+ parse->priv->detecting = FALSE;
+
+ for (l = parse->priv->detect_buffers; l; l = l->next) {
+ if (ret == GST_FLOW_OK && !parse->priv->flushing)
+ ret =
+ gst_base_parse_chain (GST_BASE_PARSE_SINK_PAD (parse),
+ parent, GST_BUFFER_CAST (l->data));
+ else
+ gst_buffer_unref (GST_BUFFER_CAST (l->data));
+ }
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
+
+ if (ret != GST_FLOW_OK) {
+ return ret;
+ }
+
+ /* Handle the current buffer */
+ } else if (ret == GST_FLOW_NOT_NEGOTIATED) {
+ /* Still detecting, append buffer or error out if draining */
+
+ if (parse->priv->drain) {
+ GST_DEBUG_OBJECT (parse, "Draining but did not detect format yet");
+ return GST_FLOW_ERROR;
+ } else if (parse->priv->flushing) {
+ g_list_foreach (parse->priv->detect_buffers, (GFunc) gst_buffer_unref,
+ NULL);
+ g_list_free (parse->priv->detect_buffers);
+ parse->priv->detect_buffers = NULL;
+ parse->priv->detect_buffers_size = 0;
+ } else {
+ parse->priv->detect_buffers =
+ g_list_append (parse->priv->detect_buffers, buffer);
+ parse->priv->detect_buffers_size += gst_buffer_get_size (buffer);
+ return GST_FLOW_OK;
+ }
+ } else {
+ /* Something went wrong, subclass responsible for error reporting */
+ return ret;
+ }
+
+ /* And now handle the current buffer if detection worked */
+ }
if (G_LIKELY (buffer)) {
GST_LOG_OBJECT (parse,
"buffer size: %" G_GSIZE_FORMAT ", offset = %" G_GINT64_FORMAT,
gst_buffer_get_size (buffer), GST_BUFFER_OFFSET (buffer));
if (G_UNLIKELY (parse->priv->passthrough)) {
- gst_base_parse_frame_init (frame);
- frame->buffer = gst_buffer_make_writable (buffer);
- return gst_base_parse_push_frame (parse, frame);
+ GstBaseParseFrame frame;
+
+ gst_base_parse_frame_init (&frame);
+ frame.buffer = gst_buffer_make_writable (buffer);
+ ret = gst_base_parse_push_frame (parse, &frame);
+ gst_base_parse_frame_free (&frame);
+ return ret;
}
/* upstream feeding us in reverse playback;
- * gather each fragment, then process it in single run */
+ * finish previous fragment and start new upon DISCONT */
if (parse->segment.rate < 0.0) {
if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
GST_DEBUG_OBJECT (parse, "buffer starts new reverse playback fragment");
- ret = gst_base_parse_process_fragment (parse, FALSE);
+ ret = gst_base_parse_finish_fragment (parse, TRUE);
+ gst_base_parse_start_fragment (parse);
}
- gst_adapter_push (parse->priv->adapter, buffer);
- return ret;
}
gst_adapter_push (parse->priv->adapter, buffer);
}
- if (G_UNLIKELY (buffer &&
- GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
- frame->_private_flags |= GST_BASE_PARSE_FRAME_PRIVATE_FLAG_NOALLOC;
- gst_base_parse_frame_free (frame);
- }
-
/* Parse and push as many frames as possible */
/* Stop either when adapter is empty or we are flushing */
while (!parse->priv->flushing) {
- gboolean res;
-
- /* maintain frame state for a single frame parsing round across _chain calls,
- * so only init when needed */
- if (!frame->_private_flags)
- gst_base_parse_frame_init (frame);
-
- tmpbuf = gst_buffer_new ();
-
- old_min_size = 0;
- /* Synchronization loop */
- for (;;) {
- /* note: if subclass indicates MAX fsize,
- * this will not likely be available anyway ... */
- min_size = MAX (parse->priv->min_frame_size, fsize);
- av = gst_adapter_available (parse->priv->adapter);
-
- /* loop safety check */
- if (G_UNLIKELY (old_min_size >= min_size))
- goto invalid_min;
- old_min_size = min_size;
-
- if (G_UNLIKELY (parse->priv->drain)) {
- min_size = av;
- GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size);
- if (G_UNLIKELY (!min_size)) {
- gst_buffer_unref (tmpbuf);
- goto done;
- }
- }
-
- /* Collect at least min_frame_size bytes */
- if (av < min_size) {
- GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)",
- av);
- gst_buffer_unref (tmpbuf);
- goto done;
- }
-
- /* always pass all available data */
- data = gst_adapter_map (parse->priv->adapter, av);
- gst_buffer_take_memory (tmpbuf, -1,
- gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
- (gpointer) data, NULL, av, 0, av));
- GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset;
+ gint flush = 0;
- if (parse->priv->discont) {
- GST_DEBUG_OBJECT (parse, "marking DISCONT");
- GST_BUFFER_FLAG_SET (tmpbuf, GST_BUFFER_FLAG_DISCONT);
- }
+ /* note: if subclass indicates MAX fsize,
+ * this will not likely be available anyway ... */
+ min_size = MAX (parse->priv->min_frame_size, fsize);
+ av = gst_adapter_available (parse->priv->adapter);
- skip = -1;
- gst_base_parse_frame_update (parse, frame, tmpbuf);
- res = bclass->check_valid_frame (parse, frame, &fsize, &skip);
- gst_adapter_unmap (parse->priv->adapter, 0);
- gst_buffer_replace (&frame->buffer, NULL);
- if (res) {
- if (gst_adapter_available (parse->priv->adapter) < fsize) {
- GST_DEBUG_OBJECT (parse, "found valid frame but not enough data"
- " available (only %" G_GSIZE_FORMAT " bytes)",
- gst_adapter_available (parse->priv->adapter));
- gst_buffer_unref (tmpbuf);
- goto done;
- }
- GST_LOG_OBJECT (parse, "valid frame of size %d at pos %d", fsize, skip);
- break;
- }
- if (skip == -1) {
- /* subclass didn't touch this value. By default we skip 1 byte */
- skip = 1;
- }
- if (skip > 0) {
- GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
- if (parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
- /* reverse playback, and no frames found yet, so we are skipping
- * the leading part of a fragment, which may form the tail of
- * fragment coming later, hopefully subclass skips efficiently ... */
- timestamp = gst_adapter_prev_timestamp (parse->priv->adapter, NULL);
- outbuf = gst_adapter_take_buffer (parse->priv->adapter, skip);
- outbuf = gst_buffer_make_writable (outbuf);
- GST_BUFFER_TIMESTAMP (outbuf) = timestamp;
- parse->priv->buffers_pending =
- g_slist_prepend (parse->priv->buffers_pending, outbuf);
- outbuf = NULL;
- } else {
- gst_adapter_flush (parse->priv->adapter, skip);
- }
- parse->priv->offset += skip;
- if (!parse->priv->discont)
- parse->priv->sync_offset = parse->priv->offset;
- parse->priv->discont = TRUE;
- /* something changed least; nullify loop check */
- old_min_size = 0;
- }
- /* skip == 0 should imply subclass set min_size to need more data;
- * we check this shortly */
- if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
- gst_buffer_unref (tmpbuf);
+ if (G_UNLIKELY (parse->priv->drain)) {
+ min_size = av;
+ GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size);
+ if (G_UNLIKELY (!min_size)) {
goto done;
}
}
- gst_buffer_unref (tmpbuf);
- tmpbuf = NULL;
-
- if (skip > 0) {
- /* Subclass found the sync, but still wants to skip some data */
- GST_LOG_OBJECT (parse, "skipping %d bytes", skip);
- gst_adapter_flush (parse->priv->adapter, skip);
- parse->priv->offset += skip;
- }
- /* Grab lock to prevent a race with FLUSH_START handler */
- GST_PAD_STREAM_LOCK (parse->srcpad);
-
- /* FLUSH_START event causes the "flushing" flag to be set. In this
- * case we can leave the frame pushing loop */
- if (parse->priv->flushing) {
- GST_PAD_STREAM_UNLOCK (parse->srcpad);
- break;
+ /* Collect at least min_frame_size bytes */
+ if (av < min_size) {
+ GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)", av);
+ goto done;
}
/* move along with upstream timestamp (if any),
@@ -2353,38 +2592,33 @@ gst_base_parse_chain (GstPad * pad, GstBuffer * buffer)
parse->priv->prev_ts = parse->priv->next_ts = timestamp;
}
- /* FIXME: Would it be more efficient to make a subbuffer instead? */
- outbuf = gst_adapter_take_buffer (parse->priv->adapter, fsize);
- outbuf = gst_buffer_make_writable (outbuf);
-
- /* Subclass may want to know the data offset */
- GST_BUFFER_OFFSET (outbuf) = parse->priv->offset;
- parse->priv->offset += fsize;
- GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
- GST_BUFFER_DURATION (outbuf) = GST_CLOCK_TIME_NONE;
+ /* always pass all available data */
+ data = gst_adapter_map (parse->priv->adapter, av);
+ /* arrange for actual data to be copied if subclass tries to,
+ * since what is passed is tied to the adapter */
+ tmpbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY |
+ GST_MEMORY_FLAG_NO_SHARE, (gpointer) data, av, 0, av, NULL, NULL);
- frame->buffer = outbuf;
- ret = gst_base_parse_handle_and_push_frame (parse, bclass, frame);
- GST_PAD_STREAM_UNLOCK (parse->srcpad);
+ /* keep the adapter mapped, so keep track of what has to be flushed */
+ ret = gst_base_parse_handle_buffer (parse, tmpbuf, &skip, &flush);
+ tmpbuf = NULL;
- if (ret != GST_FLOW_OK) {
- GST_LOG_OBJECT (parse, "push returned %d", ret);
- break;
+ /* probably already implicitly unmapped due to adapter operation,
+ * but for good measure ... */
+ gst_adapter_unmap (parse->priv->adapter);
+ if (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED) {
+ goto done;
+ }
+ if (skip == 0 && flush == 0) {
+ GST_LOG_OBJECT (parse, "nothing skipped and no frames finished, "
+ "breaking to get more data");
+ goto done;
}
}
done:
GST_LOG_OBJECT (parse, "chain leaving");
return ret;
-
- /* ERRORS */
-invalid_min:
- {
- GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
- ("min_size evolution %d -> %d; breaking to avoid looping",
- old_min_size, min_size));
- return GST_FLOW_ERROR;
- }
}
/* pull @size bytes at current offset,
@@ -2479,7 +2713,7 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
if (!parse->priv->last_offset || parse->priv->last_ts <= parse->segment.start) {
GST_DEBUG_OBJECT (parse, "past start of segment %" GST_TIME_FORMAT,
GST_TIME_ARGS (parse->segment.start));
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
goto exit;
}
@@ -2491,7 +2725,7 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
if (parse->priv->exact_position) {
offset = gst_base_parse_find_offset (parse, ts, TRUE, NULL);
} else {
- if (!gst_pad_query_convert (parse->srcpad, GST_FORMAT_TIME, ts,
+ if (!gst_base_parse_convert (parse, GST_FORMAT_TIME, ts,
GST_FORMAT_BYTES, &offset)) {
GST_DEBUG_OBJECT (parse, "conversion failed, only BYTE based");
}
@@ -2512,8 +2746,9 @@ gst_base_parse_handle_previous_fragment (GstBaseParse * parse)
/* offset will increase again as fragment is processed/parsed */
parse->priv->last_offset = offset;
+ gst_base_parse_start_fragment (parse);
gst_adapter_push (parse->priv->adapter, buffer);
- ret = gst_base_parse_process_fragment (parse, FALSE);
+ ret = gst_base_parse_finish_fragment (parse, TRUE);
if (ret != GST_FLOW_OK)
goto exit;
@@ -2529,15 +2764,14 @@ exit:
* ajusts sync, drain and offset going along */
static GstFlowReturn
gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
- GstBaseParseFrame * frame, gboolean full)
+ gboolean full)
{
- GstBuffer *buffer, *outbuf;
+ GstBuffer *buffer;
GstFlowReturn ret = GST_FLOW_OK;
- guint fsize = 1, min_size, old_min_size = 0;
+ guint fsize, min_size;
+ gint flushed = 0;
gint skip = 0;
- g_return_val_if_fail (frame != NULL, GST_FLOW_ERROR);
-
GST_LOG_OBJECT (parse, "scanning for frame at offset %" G_GUINT64_FORMAT
" (%#" G_GINT64_MODIFIER "x)", parse->priv->offset, parse->priv->offset);
@@ -2548,106 +2782,74 @@ gst_base_parse_scan_frame (GstBaseParse * parse, GstBaseParseClass * klass,
fsize = 64 * 1024;
while (TRUE) {
- gboolean res;
-
min_size = MAX (parse->priv->min_frame_size, fsize);
- /* loop safety check */
- if (G_UNLIKELY (old_min_size >= min_size))
- goto invalid_min;
- old_min_size = min_size;
+
+ GST_LOG_OBJECT (parse, "reading buffer size %u", min_size);
ret = gst_base_parse_pull_range (parse, min_size, &buffer);
if (ret != GST_FLOW_OK)
goto done;
- if (parse->priv->discont) {
- GST_DEBUG_OBJECT (parse, "marking DISCONT");
- GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
- }
-
/* if we got a short read, inform subclass we are draining leftover
* and no more is to be expected */
- if (gst_buffer_get_size (buffer) < min_size)
+ if (gst_buffer_get_size (buffer) < min_size) {
+ GST_LOG_OBJECT (parse, "... but did not get that; marked draining");
parse->priv->drain = TRUE;
-
- skip = -1;
- gst_base_parse_frame_update (parse, frame, buffer);
- res = klass->check_valid_frame (parse, frame, &fsize, &skip);
- gst_buffer_replace (&frame->buffer, NULL);
- if (res) {
- parse->priv->drain = FALSE;
- GST_LOG_OBJECT (parse, "valid frame of size %d at pos %d", fsize, skip);
- break;
}
- parse->priv->drain = FALSE;
- if (skip == -1)
- skip = 1;
- if (skip > 0) {
- GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
- if (full && parse->segment.rate < 0.0 && !parse->priv->buffers_queued) {
- /* reverse playback, and no frames found yet, so we are skipping
- * the leading part of a fragment, which may form the tail of
- * fragment coming later, hopefully subclass skips efficiently ... */
- outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0, skip);
- parse->priv->buffers_pending =
- g_slist_prepend (parse->priv->buffers_pending, outbuf);
- outbuf = NULL;
+
+ if (parse->priv->detecting) {
+ ret = klass->detect (parse, buffer);
+ if (ret == GST_FLOW_NOT_NEGOTIATED) {
+ /* If draining we error out, otherwise request a buffer
+ * with 64kb more */
+ if (parse->priv->drain) {
+ gst_buffer_unref (buffer);
+ GST_ERROR_OBJECT (parse, "Failed to detect format but draining");
+ return GST_FLOW_ERROR;
+ } else {
+ fsize += 64 * 1024;
+ gst_buffer_unref (buffer);
+ continue;
+ }
+ } else if (ret != GST_FLOW_OK) {
+ gst_buffer_unref (buffer);
+ GST_ERROR_OBJECT (parse, "detect() returned %s",
+ gst_flow_get_name (ret));
+ return ret;
}
- parse->priv->offset += skip;
- if (!parse->priv->discont)
- parse->priv->sync_offset = parse->priv->offset;
- parse->priv->discont = TRUE;
- /* something changed at least; nullify loop check */
- if (fsize == G_MAXUINT)
- fsize = old_min_size + 64 * 1024;
- old_min_size = 0;
- }
- /* skip == 0 should imply subclass set min_size to need more data;
- * we check this shortly */
- GST_DEBUG_OBJECT (parse, "finding sync...");
- gst_buffer_unref (buffer);
- if ((ret = gst_base_parse_check_sync (parse)) != GST_FLOW_OK) {
- goto done;
- }
- }
- /* Does the subclass want to skip too? */
- if (skip > 0)
- parse->priv->offset += skip;
- else if (skip < 0)
- skip = 0;
+ /* Else handle this buffer normally */
+ }
- if (fsize + skip <= gst_buffer_get_size (buffer)) {
- outbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, skip, fsize);
- GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer) + skip;
- GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
- gst_buffer_unref (buffer);
- } else {
- gst_buffer_unref (buffer);
- ret = gst_base_parse_pull_range (parse, fsize, &outbuf);
+ ret = gst_base_parse_handle_buffer (parse, buffer, &skip, &flushed);
if (ret != GST_FLOW_OK)
- goto done;
- if (gst_buffer_get_size (outbuf) < fsize) {
- gst_buffer_unref (outbuf);
- ret = GST_FLOW_UNEXPECTED;
+ break;
+
+ /* something flushed means something happened,
+ * and we should bail out of this loop so as not to occupy
+ * the task thread indefinitely */
+ if (flushed) {
+ GST_LOG_OBJECT (parse, "frame finished, breaking loop");
+ break;
}
+ /* nothing flushed, no skip and draining, so nothing left to do */
+ if (!skip && parse->priv->drain) {
+ GST_LOG_OBJECT (parse, "no activity or result when draining; "
+ "breaking loop and marking EOS");
+ ret = GST_FLOW_EOS;
+ break;
+ }
+ /* otherwise, get some more data
+ * note that is checked this does not happen indefinitely */
+ if (!skip) {
+ GST_LOG_OBJECT (parse, "getting some more data");
+ fsize += 64 * 1024;
+ }
+ parse->priv->drain = FALSE;
}
- parse->priv->offset += fsize;
-
- frame->buffer = outbuf;
-
done:
return ret;
-
- /* ERRORS */
-invalid_min:
- {
- GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
- ("min_size evolution %d -> %d; breaking to avoid looping",
- old_min_size, min_size));
- return GST_FLOW_ERROR;
- }
}
/* Loop that is used in pull mode to retrieve data from upstream */
@@ -2657,7 +2859,6 @@ gst_base_parse_loop (GstPad * pad)
GstBaseParse *parse;
GstBaseParseClass *klass;
GstFlowReturn ret = GST_FLOW_OK;
- GstBaseParseFrame frame;
parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
klass = GST_BASE_PARSE_GET_CLASS (parse);
@@ -2674,27 +2875,23 @@ gst_base_parse_loop (GstPad * pad)
}
}
- gst_base_parse_frame_init (&frame);
- ret = gst_base_parse_scan_frame (parse, klass, &frame, TRUE);
+ ret = gst_base_parse_scan_frame (parse, klass, TRUE);
if (ret != GST_FLOW_OK)
goto done;
- /* This always cleans up frame, even if error occurs */
- ret = gst_base_parse_handle_and_push_frame (parse, klass, &frame);
-
/* eat expected eos signalling past segment in reverse playback */
- if (parse->segment.rate < 0.0 && ret == GST_FLOW_UNEXPECTED &&
+ if (parse->segment.rate < 0.0 && ret == GST_FLOW_EOS &&
parse->segment.position >= parse->segment.stop) {
GST_DEBUG_OBJECT (parse, "downstream has reached end of segment");
/* push what was accumulated during loop run */
- gst_base_parse_process_fragment (parse, TRUE);
+ gst_base_parse_finish_fragment (parse, FALSE);
/* force previous fragment */
parse->priv->offset = -1;
ret = GST_FLOW_OK;
}
done:
- if (ret == GST_FLOW_UNEXPECTED)
+ if (ret == GST_FLOW_EOS)
goto eos;
else if (ret != GST_FLOW_OK)
goto pause;
@@ -2705,7 +2902,7 @@ done:
/* ERRORS */
eos:
{
- ret = GST_FLOW_UNEXPECTED;
+ ret = GST_FLOW_EOS;
GST_DEBUG_OBJECT (parse, "eos");
/* fall-through */
}
@@ -2717,7 +2914,7 @@ pause:
gst_flow_get_name (ret));
gst_pad_pause_task (parse->sinkpad);
- if (ret == GST_FLOW_UNEXPECTED) {
+ if (ret == GST_FLOW_EOS) {
/* handle end-of-stream/segment */
if (parse->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop;
@@ -2739,7 +2936,7 @@ pause:
}
push_eos = TRUE;
}
- } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
+ } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
/* for fatal errors we post an error message, wrong-state is
* not fatal because it happens due to flushes and only means
* that we should stop now. */
@@ -2748,11 +2945,18 @@ pause:
push_eos = TRUE;
}
if (push_eos) {
- /* newsegment before eos */
- if (parse->priv->pending_segment) {
- gst_pad_push_event (parse->srcpad, parse->priv->pending_segment);
- parse->priv->pending_segment = NULL;
+ /* Push pending events, including NEWSEGMENT events */
+ if (G_UNLIKELY (parse->priv->pending_events)) {
+ GList *l;
+
+ for (l = parse->priv->pending_events; l != NULL; l = l->next) {
+ gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
+ }
+ g_list_free (parse->priv->pending_events);
+ parse->priv->pending_events = NULL;
+ parse->priv->pending_segment = FALSE;
}
+
gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
}
gst_object_unref (parse);
@@ -2760,122 +2964,119 @@ pause:
}
static gboolean
-gst_base_parse_sink_activate (GstPad * sinkpad)
+gst_base_parse_sink_activate (GstPad * sinkpad, GstObject * parent)
{
GstBaseParse *parse;
- gboolean result = TRUE;
GstQuery *query;
gboolean pull_mode;
- parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
+ parse = GST_BASE_PARSE (parent);
GST_DEBUG_OBJECT (parse, "sink activate");
query = gst_query_new_scheduling ();
- result = gst_pad_peer_query (sinkpad, query);
- if (result) {
- gst_query_parse_scheduling (query, &pull_mode, NULL, NULL, NULL, NULL,
- NULL);
- } else {
- pull_mode = FALSE;
+ if (!gst_pad_peer_query (sinkpad, query)) {
+ gst_query_unref (query);
+ goto baseparse_push;
}
+
+ pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
gst_query_unref (query);
- if (pull_mode) {
- GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
- result = gst_pad_activate_pull (sinkpad, TRUE);
- } else {
+ if (!pull_mode)
+ goto baseparse_push;
+
+ GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
+ if (!gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, TRUE))
+ goto baseparse_push;
+
+ return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop,
+ sinkpad);
+ /* fallback */
+baseparse_push:
+ {
GST_DEBUG_OBJECT (parse, "trying to activate in push mode");
- result = gst_pad_activate_push (sinkpad, TRUE);
+ return gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PUSH, TRUE);
}
-
- GST_DEBUG_OBJECT (parse, "sink activate return %d", result);
- gst_object_unref (parse);
- return result;
}
static gboolean
gst_base_parse_activate (GstBaseParse * parse, gboolean active)
{
GstBaseParseClass *klass;
- gboolean result = FALSE;
+ gboolean result = TRUE;
GST_DEBUG_OBJECT (parse, "activate %d", active);
klass = GST_BASE_PARSE_GET_CLASS (parse);
if (active) {
- if (parse->priv->pad_mode == GST_ACTIVATE_NONE && klass->start)
+ if (parse->priv->pad_mode == GST_PAD_MODE_NONE && klass->start)
result = klass->start (parse);
+
+ /* If the subclass implements ::detect we want to
+ * call it for the first buffers now */
+ parse->priv->detecting = (klass->detect != NULL);
} else {
/* We must make sure streaming has finished before resetting things
* and calling the ::stop vfunc */
GST_PAD_STREAM_LOCK (parse->sinkpad);
GST_PAD_STREAM_UNLOCK (parse->sinkpad);
- if (parse->priv->pad_mode != GST_ACTIVATE_NONE && klass->stop)
+ if (parse->priv->pad_mode != GST_PAD_MODE_NONE && klass->stop)
result = klass->stop (parse);
- parse->priv->pad_mode = GST_ACTIVATE_NONE;
+ parse->priv->pad_mode = GST_PAD_MODE_NONE;
}
GST_DEBUG_OBJECT (parse, "activate return: %d", result);
return result;
}
static gboolean
-gst_base_parse_sink_activate_push (GstPad * pad, gboolean active)
+gst_base_parse_sink_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
{
- gboolean result = TRUE;
+ gboolean result;
GstBaseParse *parse;
- parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
+ parse = GST_BASE_PARSE (parent);
- GST_DEBUG_OBJECT (parse, "sink activate push %d", active);
+ GST_DEBUG_OBJECT (parse, "sink activate mode %d, %d", mode, active);
- result = gst_base_parse_activate (parse, active);
+ if (!gst_base_parse_activate (parse, active))
+ goto activate_failed;
+ switch (mode) {
+ case GST_PAD_MODE_PULL:
+ if (active) {
+ parse->priv->pending_events =
+ g_list_append (parse->priv->pending_events,
+ gst_event_new_segment (&parse->segment));
+ parse->priv->pending_segment = TRUE;
+ result = TRUE;
+ } else {
+ result = gst_pad_stop_task (pad);
+ }
+ break;
+ default:
+ result = TRUE;
+ break;
+ }
if (result)
- parse->priv->pad_mode = active ? GST_ACTIVATE_PUSH : GST_ACTIVATE_NONE;
+ parse->priv->pad_mode = active ? mode : GST_PAD_MODE_NONE;
- GST_DEBUG_OBJECT (parse, "sink activate push return: %d", result);
+ GST_DEBUG_OBJECT (parse, "sink activate return: %d", result);
- gst_object_unref (parse);
return result;
-}
-static gboolean
-gst_base_parse_sink_activate_pull (GstPad * sinkpad, gboolean active)
-{
- gboolean result = FALSE;
- GstBaseParse *parse;
-
- parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
-
- GST_DEBUG_OBJECT (parse, "activate pull %d", active);
-
- result = gst_base_parse_activate (parse, active);
-
- if (result) {
- if (active) {
- parse->priv->pending_segment = gst_event_new_segment (&parse->segment);
- result &=
- gst_pad_start_task (sinkpad, (GstTaskFunction) gst_base_parse_loop,
- sinkpad);
- } else {
- result &= gst_pad_stop_task (sinkpad);
- }
+ /* ERRORS */
+activate_failed:
+ {
+ GST_DEBUG_OBJECT (parse, "activate failed");
+ return FALSE;
}
-
- if (result)
- parse->priv->pad_mode = active ? GST_ACTIVATE_PULL : GST_ACTIVATE_NONE;
-
- GST_DEBUG_OBJECT (parse, "sink activate pull return: %d", result);
-
- gst_object_unref (parse);
- return result;
}
-
/**
* gst_base_parse_set_duration:
* @parse: #GstBaseParse.
@@ -3067,9 +3268,9 @@ gst_base_parse_set_syncable (GstBaseParse * parse, gboolean syncable)
* parsing, and the parser should operate in passthrough mode (which only
* applies when operating in push mode). That is, incoming buffers are
* pushed through unmodified, i.e. no @check_valid_frame or @parse_frame
- * callbacks will be invoked, but @pre_push_buffer will still be invoked,
+ * callbacks will be invoked, but @pre_push_frame will still be invoked,
* so subclass can perform as much or as little is appropriate for
- * passthrough semantics in @pre_push_buffer.
+ * passthrough semantics in @pre_push_frame.
*
* Since: 0.10.33
*/
@@ -3090,7 +3291,7 @@ gst_base_parse_set_passthrough (GstBaseParse * parse, gboolean passthrough)
* by the parsing process. If there is such a latency, which depends on the
* particular parsing of the format, it typically corresponds to 1 frame duration.
*
- * Since: 0.10.34
+ * Since: 0.10.36
*/
void
gst_base_parse_set_latency (GstBaseParse * parse, GstClockTime min_latency,
@@ -3126,6 +3327,8 @@ gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format,
GST_LOG_OBJECT (parse, "using estimated duration");
*duration = parse->priv->estimated_duration;
res = TRUE;
+ } else {
+ GST_LOG_OBJECT (parse, "cannot estimate duration");
}
GST_LOG_OBJECT (parse, "res: %d, duration %" GST_TIME_FORMAT, res,
@@ -3133,28 +3336,13 @@ gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format,
return res;
}
-static const GstQueryType *
-gst_base_parse_get_querytypes (GstPad * pad)
-{
- static const GstQueryType list[] = {
- GST_QUERY_POSITION,
- GST_QUERY_DURATION,
- GST_QUERY_FORMATS,
- GST_QUERY_SEEKING,
- GST_QUERY_CONVERT,
- GST_QUERY_NONE
- };
-
- return list;
-}
-
static gboolean
-gst_base_parse_query (GstPad * pad, GstQuery * query)
+gst_base_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstBaseParse *parse;
gboolean res = FALSE;
- parse = GST_BASE_PARSE (GST_PAD_PARENT (pad));
+ parse = GST_BASE_PARSE (parent);
GST_LOG_OBJECT (parse, "handling query: %" GST_PTR_FORMAT, query);
@@ -3167,27 +3355,29 @@ gst_base_parse_query (GstPad * pad, GstQuery * query)
GST_DEBUG_OBJECT (parse, "position query");
gst_query_parse_position (query, &format, NULL);
- GST_OBJECT_LOCK (parse);
- if (format == GST_FORMAT_BYTES) {
- dest_value = parse->priv->offset;
- res = TRUE;
- } else if (format == parse->segment.format &&
- GST_CLOCK_TIME_IS_VALID (parse->segment.position)) {
- dest_value = parse->segment.position;
- res = TRUE;
- }
- GST_OBJECT_UNLOCK (parse);
-
- if (res)
- gst_query_set_position (query, format, dest_value);
- else {
- res = gst_pad_query_default (pad, query);
+ /* try upstream first */
+ res = gst_pad_query_default (pad, parent, query);
+ if (!res) {
+ /* Fall back on interpreting segment */
+ GST_OBJECT_LOCK (parse);
+ if (format == GST_FORMAT_BYTES) {
+ dest_value = parse->priv->offset;
+ res = TRUE;
+ } else if (format == parse->segment.format &&
+ GST_CLOCK_TIME_IS_VALID (parse->segment.position)) {
+ dest_value = gst_segment_to_stream_time (&parse->segment,
+ parse->segment.format, parse->segment.position);
+ res = TRUE;
+ }
+ GST_OBJECT_UNLOCK (parse);
if (!res) {
/* no precise result, upstream no idea either, then best estimate */
/* priv->offset is updated in both PUSH/PULL modes */
res = gst_base_parse_convert (parse,
GST_FORMAT_BYTES, parse->priv->offset, format, &dest_value);
}
+ if (res)
+ gst_query_set_position (query, format, dest_value);
}
break;
}
@@ -3200,7 +3390,7 @@ gst_base_parse_query (GstPad * pad, GstQuery * query)
gst_query_parse_duration (query, &format, NULL);
/* consult upstream */
- res = gst_pad_query_default (pad, query);
+ res = gst_pad_query_default (pad, parent, query);
/* otherwise best estimate from us */
if (!res) {
@@ -3220,7 +3410,7 @@ gst_base_parse_query (GstPad * pad, GstQuery * query)
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
/* consult upstream */
- res = gst_pad_query_default (pad, query);
+ res = gst_pad_query_default (pad, parent, query);
/* we may be able to help if in TIME */
if (fmt == GST_FORMAT_TIME && gst_base_parse_is_seekable (parse)) {
@@ -3289,7 +3479,7 @@ gst_base_parse_query (GstPad * pad, GstQuery * query)
break;
}
default:
- res = gst_pad_query_default (pad, query);
+ res = gst_pad_query_default (pad, parent, query);
break;
}
return res;
@@ -3306,7 +3496,7 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos,
gboolean orig_drain, orig_discont;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf = NULL;
- GstBaseParseFrame frame;
+ GstBaseParseFrame *sframe = NULL;
g_return_val_if_fail (pos != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (time != NULL, GST_FLOW_ERROR);
@@ -3325,37 +3515,36 @@ gst_base_parse_find_frame (GstBaseParse * parse, gint64 * pos,
GST_DEBUG_OBJECT (parse, "scanning for frame starting at %" G_GINT64_FORMAT
" (%#" G_GINT64_MODIFIER "x)", *pos, *pos);
- gst_base_parse_frame_init (&frame);
-
/* jump elsewhere and locate next frame */
parse->priv->offset = *pos;
- ret = gst_base_parse_scan_frame (parse, klass, &frame, FALSE);
- if (ret != GST_FLOW_OK)
+ /* mark as scanning so frames don't get processed all the way */
+ parse->priv->scanning = TRUE;
+ ret = gst_base_parse_scan_frame (parse, klass, FALSE);
+ parse->priv->scanning = FALSE;
+ /* retrieve frame found during scan */
+ sframe = parse->priv->scanned_frame;
+ parse->priv->scanned_frame = NULL;
+
+ if (ret != GST_FLOW_OK || !sframe)
goto done;
- buf = frame.buffer;
- GST_LOG_OBJECT (parse,
- "peek parsing frame at offset %" G_GUINT64_FORMAT
- " (%#" G_GINT64_MODIFIER "x) of size %" G_GSIZE_FORMAT,
- GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET (buf),
- gst_buffer_get_size (buf));
-
/* get offset first, subclass parsing might dump other stuff in there */
- *pos = GST_BUFFER_OFFSET (buf);
- ret = klass->parse_frame (parse, &frame);
- buf = frame.buffer;
+ *pos = sframe->offset;
+ buf = sframe->buffer;
+ g_assert (buf);
/* but it should provide proper time */
*time = GST_BUFFER_TIMESTAMP (buf);
*duration = GST_BUFFER_DURATION (buf);
- gst_base_parse_frame_free (&frame);
-
GST_LOG_OBJECT (parse,
"frame with time %" GST_TIME_FORMAT " at offset %" G_GINT64_FORMAT,
GST_TIME_ARGS (*time), *pos);
done:
+ if (sframe)
+ gst_base_parse_frame_free (sframe);
+
/* restore state */
parse->priv->offset = orig_offset;
parse->priv->discont = orig_discont;
@@ -3381,6 +3570,9 @@ gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time,
g_return_val_if_fail (_time != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (_offset != NULL, GST_FLOW_ERROR);
+ GST_DEBUG_OBJECT (parse, "Bisecting for time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (*_time));
+
/* TODO also make keyframe aware if useful some day */
time = *_time;
@@ -3403,13 +3595,21 @@ gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time,
/* need initial positions; start and end */
lpos = parse->priv->first_frame_offset;
ltime = parse->priv->first_frame_ts;
- htime = parse->priv->duration;
+ if (!gst_base_parse_get_duration (parse, GST_FORMAT_TIME, &htime)) {
+ GST_DEBUG_OBJECT (parse, "Unknown time duration, cannot bisect");
+ return GST_FLOW_ERROR;
+ }
hpos = parse->priv->upstream_size;
+ GST_DEBUG_OBJECT (parse,
+ "Bisection initial bounds: bytes %" G_GINT64_FORMAT " %" G_GINT64_FORMAT
+ ", times %" GST_TIME_FORMAT " %" GST_TIME_FORMAT, lpos, htime,
+ GST_TIME_ARGS (ltime), GST_TIME_ARGS (htime));
+
/* check preconditions are satisfied;
* start and end are needed, except for special case where we scan for
* last frame to determine duration */
- if (parse->priv->pad_mode != GST_ACTIVATE_PULL || !hpos ||
+ if (parse->priv->pad_mode != GST_PAD_MODE_PULL || !hpos ||
!GST_CLOCK_TIME_IS_VALID (ltime) ||
(!GST_CLOCK_TIME_IS_VALID (htime) && time != G_MAXINT64)) {
return GST_FLOW_OK;
@@ -3455,7 +3655,7 @@ gst_base_parse_locate_time (GstBaseParse * parse, GstClockTime * _time,
GST_TIME_ARGS (time), newpos);
ret = gst_base_parse_find_frame (parse, &newpos, &newtime, &dur);
- if (ret == GST_FLOW_UNEXPECTED) {
+ if (ret == GST_FLOW_EOS) {
/* heuristic HACK */
hpos = MAX (lpos, hpos - chunk);
continue;
@@ -3514,13 +3714,13 @@ gst_base_parse_find_offset (GstBaseParse * parse, GstClockTime time,
goto exit;
}
- g_static_mutex_lock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_LOCK (parse);
if (parse->priv->index) {
/* Let's check if we have an index entry for that time */
entry = gst_index_get_assoc_entry (parse->priv->index,
parse->priv->index_id,
before ? GST_INDEX_LOOKUP_BEFORE : GST_INDEX_LOOKUP_AFTER,
- GST_ASSOCIATION_FLAG_KEY_UNIT, GST_FORMAT_TIME, time);
+ GST_INDEX_ASSOCIATION_FLAG_KEY_UNIT, GST_FORMAT_TIME, time);
}
if (entry) {
@@ -3538,7 +3738,7 @@ gst_base_parse_find_offset (GstBaseParse * parse, GstClockTime time,
ts = GST_CLOCK_TIME_NONE;
}
}
- g_static_mutex_unlock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_UNLOCK (parse);
exit:
if (_ts)
@@ -3569,7 +3769,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
cur_type, GST_TIME_ARGS (cur), stop_type, GST_TIME_ARGS (stop));
/* no negative rates in push mode */
- if (rate < 0.0 && parse->priv->pad_mode == GST_ACTIVATE_PUSH)
+ if (rate < 0.0 && parse->priv->pad_mode == GST_PAD_MODE_PUSH)
goto negative_rate;
if (cur_type != GST_SEEK_TYPE_SET ||
@@ -3579,16 +3779,9 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
/* For any format other than TIME, see if upstream handles
* it directly or fail. For TIME, try upstream, but do it ourselves if
* it fails upstream */
- if (format != GST_FORMAT_TIME) {
- /* default action delegates to upstream */
- res = FALSE;
+ res = gst_pad_push_event (parse->sinkpad, event);
+ if (format != GST_FORMAT_TIME || res)
goto done;
- } else {
- gst_event_ref (event);
- if ((res = gst_pad_push_event (parse->sinkpad, event))) {
- goto done;
- }
- }
/* get flush flag */
flush = flags & GST_SEEK_FLAG_FLUSH;
@@ -3624,10 +3817,10 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
NULL);
} else {
start_ts = seeksegment.position;
- if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.position,
+ if (!gst_base_parse_convert (parse, format, seeksegment.position,
GST_FORMAT_BYTES, &seekpos))
goto convert_failed;
- if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.stop,
+ if (!gst_base_parse_convert (parse, format, seeksegment.stop,
GST_FORMAT_BYTES, &seekstop))
goto convert_failed;
}
@@ -3639,7 +3832,7 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
"seek stop %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT,
seeksegment.stop, seekstop);
- if (parse->priv->pad_mode == GST_ACTIVATE_PULL) {
+ if (parse->priv->pad_mode == GST_PAD_MODE_PULL) {
gint64 last_stop;
GST_DEBUG_OBJECT (parse, "seek in PULL mode");
@@ -3681,11 +3874,11 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
memcpy (&parse->segment, &seeksegment, sizeof (GstSegment));
/* store the newsegment event so it can be sent from the streaming thread. */
- if (parse->priv->pending_segment)
- gst_event_unref (parse->priv->pending_segment);
-
/* This will be sent later in _loop() */
- parse->priv->pending_segment = gst_event_new_segment (&parse->segment);
+ parse->priv->pending_segment = TRUE;
+ parse->priv->pending_events =
+ g_list_append (parse->priv->pending_events,
+ gst_event_new_segment (&parse->segment));
GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
"start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
@@ -3775,9 +3968,6 @@ gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
}
done:
- /* handled event is ours to free */
- if (res)
- gst_event_unref (event);
return res;
/* ERRORS */
@@ -3826,12 +4016,13 @@ gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event)
}
}
+#if 0
static void
gst_base_parse_set_index (GstElement * element, GstIndex * index)
{
GstBaseParse *parse = GST_BASE_PARSE (element);
- g_static_mutex_lock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_LOCK (parse);
if (parse->priv->index)
gst_object_unref (parse->priv->index);
if (index) {
@@ -3842,7 +4033,7 @@ gst_base_parse_set_index (GstElement * element, GstIndex * index)
} else {
parse->priv->index = NULL;
}
- g_static_mutex_unlock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_UNLOCK (parse);
}
static GstIndex *
@@ -3851,13 +4042,14 @@ gst_base_parse_get_index (GstElement * element)
GstBaseParse *parse = GST_BASE_PARSE (element);
GstIndex *result = NULL;
- g_static_mutex_lock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_LOCK (parse);
if (parse->priv->index)
result = gst_object_ref (parse->priv->index);
- g_static_mutex_unlock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_UNLOCK (parse);
return result;
}
+#endif
static GstStateChangeReturn
gst_base_parse_change_state (GstElement * element, GstStateChange transition)
@@ -3871,7 +4063,7 @@ gst_base_parse_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* If this is our own index destroy it as the
* old entries might be wrong for the new stream */
- g_static_mutex_lock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_LOCK (parse);
if (parse->priv->own_index) {
gst_object_unref (parse->priv->index);
parse->priv->index = NULL;
@@ -3882,12 +4074,12 @@ gst_base_parse_change_state (GstElement * element, GstStateChange transition)
if (G_UNLIKELY (!parse->priv->index)) {
GST_DEBUG_OBJECT (parse, "no index provided creating our own");
- parse->priv->index = gst_index_factory_make ("memindex");
+ parse->priv->index = g_object_new (gst_mem_index_get_type (), NULL);
gst_index_get_writer_id (parse->priv->index, GST_OBJECT (parse),
&parse->priv->index_id);
parse->priv->own_index = TRUE;
}
- g_static_mutex_unlock (&parse->priv->index_lock);
+ GST_BASE_PARSE_INDEX_UNLOCK (parse);
break;
default:
break;