aboutsummaryrefslogtreecommitdiff
path: root/gst/rtpmanager/gstrtpjitterbuffer.c
diff options
context:
space:
mode:
authorSebastian Dröge <slomo@circular-chaos.org>2013-09-19 12:45:59 +0200
committerSebastian Dröge <slomo@circular-chaos.org>2013-09-19 12:45:59 +0200
commitf2da7bd60b4aad21e46b1eeb4d0df356f53dee73 (patch)
tree66bbf1e7a9977b2fa0c843f77d3fb98469c0d247 /gst/rtpmanager/gstrtpjitterbuffer.c
parent745561f0bd78088de77e79be5810300cc891b35c (diff)
parent73cae22d46e765755355ae7068ab415ea751cea8 (diff)
Merge tag 'upstream/1.1.90' into debian-experimental
Upstream version 1.1.90
Diffstat (limited to 'gst/rtpmanager/gstrtpjitterbuffer.c')
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c524
1 files changed, 294 insertions, 230 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index 079ae0ca..bf676e6e 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -117,17 +117,29 @@ enum
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
-
#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
-#define JBUF_WAIT(priv) (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
-#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
- JBUF_WAIT(priv); \
- if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
- goto label; \
+#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
+ (priv)->waiting_timer = TRUE; \
+ g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
+ (priv)->waiting_timer = FALSE; \
+} G_STMT_END
+#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_timer)) \
+ g_cond_signal (&(priv)->jbuf_timer); \
} G_STMT_END
-#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
+#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
+ (priv)->waiting_event = TRUE; \
+ g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
+ (priv)->waiting_event = FALSE; \
+ if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
+ goto label; \
+} G_STMT_END
+#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_event)) \
+ g_cond_signal (&(priv)->jbuf_event); \
+} G_STMT_END
struct _GstRtpJitterBufferPrivate
{
@@ -136,13 +148,18 @@ struct _GstRtpJitterBufferPrivate
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
- GCond jbuf_cond;
- gboolean waiting;
+ gboolean waiting_timer;
+ GCond jbuf_timer;
+ gboolean waiting_event;
+ GCond jbuf_event;
gboolean discont;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
+ gboolean timer_running;
+ GThread *timer_thread;
+
/* properties */
guint latency_ms;
guint64 latency_ns;
@@ -199,7 +216,6 @@ struct _GstRtpJitterBufferPrivate
GstClockID clock_id;
GstClockTime timer_timeout;
guint16 timer_seqnum;
- gboolean unscheduled;
/* the latency of the upstream peer, we have to take this into account when
* synchronizing the buffers. */
GstClockTime peer_latency;
@@ -223,8 +239,10 @@ typedef struct
{
guint idx;
guint16 seqnum;
+ guint num;
TimerType type;
GstClockTime timeout;
+ GstClockTime duration;
GstClockTime rtx_base;
GstClockTime rtx_retry;
} TimerData;
@@ -321,6 +339,8 @@ static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
+static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
+
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{
@@ -596,7 +616,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
- g_cond_init (&priv->jbuf_cond);
+ g_cond_init (&priv->jbuf_timer);
+ g_cond_init (&priv->jbuf_event);
/* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf);
@@ -641,7 +662,8 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
g_array_free (jitterbuffer->priv->timers, TRUE);
g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
- g_cond_clear (&jitterbuffer->priv->jbuf_cond);
+ g_cond_clear (&jitterbuffer->priv->jbuf_timer);
+ g_cond_clear (&jitterbuffer->priv->jbuf_event);
g_object_unref (jitterbuffer->priv->jbuf);
@@ -831,7 +853,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->out_offset));
priv->active = active;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
}
if (!active) {
rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
@@ -980,7 +1002,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
priv->srcresult = GST_FLOW_FLUSHING;
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
/* unlock clock, we just unschedule, the entry will be released by the
* locking streaming thread. */
unschedule_current_timer (jitterbuffer);
@@ -1077,13 +1099,16 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
priv->last_pt = -1;
/* block until we go to PLAYING */
priv->blocked = TRUE;
+ priv->timer_running = TRUE;
+ priv->timer_thread =
+ g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
JBUF_LOCK (priv);
/* unblock to allow streaming in PLAYING */
priv->blocked = FALSE;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
JBUF_UNLOCK (priv);
break;
default:
@@ -1108,7 +1133,13 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
+ priv->timer_running = FALSE;
+ JBUF_SIGNAL_TIMER (priv);
+ JBUF_UNLOCK (priv);
+ g_thread_join (priv->timer_thread);
+ priv->timer_thread = NULL;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
@@ -1229,7 +1260,7 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
if (ret && !priv->eos) {
GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
priv->eos = TRUE;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
} else if (priv->eos) {
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
} else {
@@ -1418,7 +1449,6 @@ unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
if (priv->clock_id) {
GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
gst_clock_id_unschedule (priv->clock_id);
- priv->unscheduled = TRUE;
}
}
@@ -1457,7 +1487,7 @@ recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
static TimerData *
add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
- guint16 seqnum, GstClockTime timeout)
+ guint16 seqnum, guint num, GstClockTime timeout, GstClockTime duration)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
TimerData *timer;
@@ -1473,12 +1503,15 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
timer->idx = len;
timer->type = type;
timer->seqnum = seqnum;
+ timer->num = num;
timer->timeout = timeout;
+ timer->duration = duration;
if (type == TIMER_TYPE_EXPECTED) {
timer->rtx_base = timeout;
timer->rtx_retry = 0;
}
recalculate_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_TIMER (priv);
return timer;
}
@@ -1531,7 +1564,7 @@ set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
/* find the seqnum timer */
timer = find_timer (jitterbuffer, type, seqnum);
if (timer == NULL) {
- timer = add_timer (jitterbuffer, type, seqnum, timeout);
+ timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, -1);
} else {
reschedule_timer (jitterbuffer, timer, seqnum, timeout);
}
@@ -1580,9 +1613,6 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
TimerData *timer = NULL;
gint i, len;
- if (!priv->do_retransmission)
- return;
-
/* go through all timers and unschedule the ones with a large gap, also find
* the timer for the seqnum */
len = priv->timers->len;
@@ -1595,10 +1625,8 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
test->seqnum, seqnum, gap);
- if (test->type == TIMER_TYPE_DEADLINE)
- continue;
-
if (gap == 0) {
+ GST_DEBUG ("found timer for current seqnum");
/* the timer for the current seqnum */
timer = test;
} else if (gap > priv->rtx_delay_reorder) {
@@ -1609,7 +1637,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
}
}
- if (priv->packet_spacing > 0 && do_next_seqnum) {
+ if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
GstClockTime expected;
/* calculate expected arrival time of the next seqnum */
@@ -1618,12 +1646,13 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
if (timer)
reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected);
else
- add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum,
- expected);
- } else if (timer) {
+ add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
+ expected, priv->packet_spacing);
+ } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
}
}
@@ -1649,41 +1678,112 @@ calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
}
static void
+send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
+ guint lost_packets, GstClockTime timestamp, GstClockTime duration,
+ gboolean late)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+ /* we had a gap and thus we lost some packets. Create an event for this. */
+ if (lost_packets > 1)
+ GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
+ seqnum + lost_packets - 1);
+ else
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
+
+ priv->num_late += lost_packets;
+ priv->discont = TRUE;
+
+ /* update our expected next packet but make sure the seqnum increases */
+ if (seqnum + lost_packets > priv->next_seqnum) {
+ priv->next_seqnum = (seqnum + lost_packets) & 0xffff;
+ priv->last_popped_seqnum = seqnum;
+ priv->last_out_time = timestamp;
+ }
+ if (priv->do_lost) {
+ GstEvent *event;
+
+ /* create paket lost event */
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) seqnum,
+ "timestamp", G_TYPE_UINT64, timestamp,
+ "duration", G_TYPE_UINT64, duration,
+ "late", G_TYPE_BOOLEAN, late, NULL));
+ JBUF_UNLOCK (priv);
+ gst_pad_push_event (priv->srcpad, event);
+ JBUF_LOCK (priv);
+ }
+}
+
+static void
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime dts, gint gap)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration, expected_dts;
+ GstClockTime total_duration, duration, expected_dts;
TimerType type;
GST_DEBUG_OBJECT (jitterbuffer,
"dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
- /* interpolate between the current time and the last time based on
- * number of packets we are missing, this is the estimated duration
- * for the missing packet based on equidistant packet spacing. Also make
- * sure we never go negative. */
+ /* the total duration spanned by the missing packets */
if (dts >= priv->last_in_dts)
- duration = (dts - priv->last_in_dts) / (gap + 1);
+ total_duration = dts - priv->last_in_dts;
else
- /* packet already lost, timer will timeout quickly */
- duration = 0;
+ total_duration = 0;
+
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. */
+ duration = total_duration / (gap + 1);
GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
+ if (total_duration > priv->latency_ns) {
+ GstClockTime gap_time;
+ guint lost_packets;
+
+ gap_time = total_duration - priv->latency_ns;
+
+ if (duration > 0) {
+ lost_packets = gap_time / duration;
+ gap_time = lost_packets * duration;
+ } else {
+ lost_packets = gap;
+ }
+
+ /* too many lost packets, some of the missing packets are already
+ * too late and we can generate lost packet events for them. */
+ GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT ", consider %u lost",
+ GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
+ lost_packets);
+
+ /* this timer will fire immediately and the lost event will be pushed from
+ * the timer thread */
+ add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
+ priv->last_in_dts + duration, gap_time);
+
+ expected += lost_packets;
+ priv->last_in_dts += gap_time;
+ }
+
expected_dts = priv->last_in_dts + duration;
if (priv->do_retransmission) {
- expected++;
type = TIMER_TYPE_EXPECTED;
+ /* if we had a timer for the first missing packet, leave it. */
+ if (find_timer (jitterbuffer, type, expected))
+ expected++;
} else {
type = TIMER_TYPE_LOST;
}
while (expected < seqnum) {
- add_timer (jitterbuffer, type, expected, expected_dts);
+ add_timer (jitterbuffer, type, expected, 0, expected_dts, duration);
expected_dts += duration;
expected++;
}
@@ -1811,6 +1911,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
/* fill in the gap with EXPECTED timers */
calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+
do_next_seqnum = TRUE;
}
}
@@ -1834,6 +1935,9 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
* while we wait */
set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
do_next_seqnum = TRUE;
+ /* take rtptime and dts to calculate packet spacing */
+ priv->ips_rtptime = rtptime;
+ priv->ips_dts = dts;
}
if (do_next_seqnum) {
priv->last_in_seqnum = seqnum;
@@ -1894,8 +1998,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
do_handle_sync (jitterbuffer);
/* signal addition of new buffer when the _loop is waiting. */
- if (priv->waiting && priv->active)
- JBUF_SIGNAL (priv);
+ if (priv->active && priv->waiting_timer)
+ JBUF_SIGNAL_EVENT (priv);
/* let's unschedule and unblock any waiting buffers. We only want to do this
* when the tail buffer changed */
@@ -2071,6 +2175,9 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
dts = GST_BUFFER_DTS (outbuf);
pts = GST_BUFFER_PTS (outbuf);
+ dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
+ pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
+
/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
@@ -2191,9 +2298,9 @@ wait:
}
/* the timeout for when we expected a packet expired */
-static GstFlowReturn
+static gboolean
do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
- GstClockTimeDiff clock_jitter)
+ GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event;
@@ -2226,95 +2333,85 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
reschedule_timer (jitterbuffer, timer, timer->seqnum,
timer->rtx_base + timer->rtx_retry);
- return priv->srcresult;
+ return FALSE;
}
/* a packet is lost */
-static GstFlowReturn
+static gboolean
do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
- GstClockTimeDiff clock_jitter)
+ GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration = GST_CLOCK_TIME_NONE;
- guint32 lost_packets = 1;
- gboolean lost_packets_late = FALSE;
-
-#if 0
- if (clock_jitter > 0
- && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
- GstClockTimeDiff total_duration;
- GstClockTime out_time_diff;
-
- out_time_diff =
- apply_offset (jitterbuffer, timer->timeout) - timer->timeout;
- total_duration = MIN (out_time_diff, clock_jitter);
-
- if (duration > 0)
- lost_packets = total_duration / duration;
- else
- lost_packets = gap;
- total_duration = lost_packets * duration;
+ GstClockTime duration, timestamp;
+ guint seqnum, num;
+ gboolean late;
+
+ seqnum = timer->seqnum;
+ timestamp = apply_offset (jitterbuffer, timer->timeout);
+ duration = timer->duration;
+ if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
+ duration = priv->packet_spacing;
+ num = MAX (timer->num, 1);
+ late = timer->num > 0;
- GST_DEBUG_OBJECT (jitterbuffer,
- "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
- ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
- GST_TIME_ARGS (clock_jitter),
- lost_packets, GST_TIME_ARGS (total_duration));
+ /* remove timer now */
+ remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
- duration = total_duration;
- lost_packets_late = TRUE;
- }
-#endif
+ send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
- /* we had a gap and thus we lost some packets. Create an event for this. */
- if (lost_packets > 1)
- GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
- timer->seqnum + lost_packets - 1);
- else
- GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
+ return TRUE;
+}
- priv->num_late += lost_packets;
- priv->discont = TRUE;
+static gboolean
+do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- /* update our expected next packet */
- priv->last_popped_seqnum = timer->seqnum;
- priv->last_out_time = apply_offset (jitterbuffer, timer->timeout);
- if (timer->seqnum + lost_packets > priv->next_seqnum)
- priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
- /* remove timer now */
+ GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
- if (priv->do_lost) {
- GstEvent *event;
+ return TRUE;
+}
- /* create paket lost event */
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
- gst_structure_new ("GstRTPPacketLost",
- "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
- "timestamp", G_TYPE_UINT64, priv->last_out_time,
- "duration", G_TYPE_UINT64, duration,
- "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->srcpad, event);
- JBUF_LOCK_CHECK (priv, flushing);
- }
- return GST_FLOW_OK;
+static gboolean
+do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- /* ERRORS */
-flushing:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- return priv->srcresult;
- }
+ GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
+
+ priv->next_seqnum = timer->seqnum;
+ remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
+
+ return TRUE;
}
-static GstFlowReturn
-do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+static gboolean
+do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
{
- GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
- remove_timer (jitterbuffer, timer);
+ gboolean removed = FALSE;
- return GST_FLOW_EOS;
+ switch (timer->type) {
+ case TIMER_TYPE_EXPECTED:
+ removed = do_expected_timeout (jitterbuffer, timer, now);
+ break;
+ case TIMER_TYPE_LOST:
+ removed = do_lost_timeout (jitterbuffer, timer, now);
+ break;
+ case TIMER_TYPE_DEADLINE:
+ removed = do_deadline_timeout (jitterbuffer, timer, now);
+ break;
+ case TIMER_TYPE_EOS:
+ removed = do_eos_timeout (jitterbuffer, timer, now);
+ break;
+ }
+ return removed;
}
/* called when we need to wait for the next timeout.
@@ -2324,137 +2421,100 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
*
* If there are no timers, we wait on a gcond until something new happens.
*/
-static GstFlowReturn
+static void
wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstFlowReturn result = GST_FLOW_OK;
- gint i, len;
- TimerData *timer = NULL;
- GstClockTime timer_timeout = -1;
- gint timer_idx;
-
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
- GstClockTime test_timeout = get_timeout (jitterbuffer, test);
+ GstClockTime now = 0;
- GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT,
- i, test->seqnum, GST_TIME_ARGS (test_timeout));
-
- /* find the smallest timeout */
- if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
- timer = test;
- timer_timeout = test_timeout;
- if (timer_timeout == -1)
- break;
+ JBUF_LOCK (priv);
+ while (priv->timer_running) {
+ TimerData *timer = NULL;
+ GstClockTime timer_timeout = -1;
+ gint i, len;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (now));
+
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ GstClockTime test_timeout = get_timeout (jitterbuffer, test);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
+ i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
+
+ /* no timestamp, timeout immeditately */
+ if (test_timeout == -1 || test_timeout <= now) {
+ if (do_timeout (jitterbuffer, test, now))
+ len--;
+ i--;
+ } else if (timer == NULL || test_timeout < timer_timeout) {
+ /* find the smallest timeout */
+ timer = test;
+ timer_timeout = test_timeout;
+ }
}
- }
- if (timer) {
- GstClock *clock;
- GstClockTime sync_time;
- GstClockID id;
- GstClockReturn ret;
- GstClockTimeDiff clock_jitter;
+ if (timer) {
+ GstClock *clock;
+ GstClockTime sync_time;
+ GstClockID id;
+ GstClockReturn ret;
+ GstClockTimeDiff clock_jitter;
+
+ GST_OBJECT_LOCK (jitterbuffer);
+ clock = GST_ELEMENT_CLOCK (jitterbuffer);
+ if (!clock) {
+ GST_OBJECT_UNLOCK (jitterbuffer);
+ /* let's just push if there is no clock */
+ GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
+ now = timer_timeout;
+ continue;
+ }
- /* no timestamp, timeout immeditately */
- if (timer_timeout == -1)
- goto do_timeout;
+ /* prepare for sync against clock */
+ sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ /* add latency of peer to get input time */
+ sync_time += priv->peer_latency;
- GST_OBJECT_LOCK (jitterbuffer);
- clock = GST_ELEMENT_CLOCK (jitterbuffer);
- if (!clock) {
- GST_OBJECT_UNLOCK (jitterbuffer);
- /* let's just push if there is no clock */
- GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
- goto do_timeout;
- }
+ GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
+ " with sync time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
- /* prepare for sync against clock */
- sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
- /* add latency of peer to get input time */
- sync_time += priv->peer_latency;
+ /* create an entry for the clock */
+ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
+ priv->timer_timeout = timer_timeout;
+ priv->timer_seqnum = timer->seqnum;
+ GST_OBJECT_UNLOCK (jitterbuffer);
- GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
- " with sync time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
+ /* release the lock so that the other end can push stuff or unlock */
+ JBUF_UNLOCK (priv);
- /* create an entry for the clock */
- id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
- priv->unscheduled = FALSE;
- priv->timer_timeout = timer_timeout;
- priv->timer_seqnum = timer->seqnum;
- timer_idx = timer->idx;
- GST_OBJECT_UNLOCK (jitterbuffer);
+ ret = gst_clock_id_wait (id, &clock_jitter);
- /* release the lock so that the other end can push stuff or unlock */
- JBUF_UNLOCK (priv);
+ JBUF_LOCK (priv);
+ if (!priv->timer_running)
+ break;
- ret = gst_clock_id_wait (id, &clock_jitter);
+ GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
+ ret, priv->timer_seqnum, clock_jitter);
+ /* and free the entry */
+ gst_clock_id_unref (id);
+ priv->clock_id = NULL;
- JBUF_LOCK (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
- ret, priv->timer_seqnum, clock_jitter);
- /* and free the entry */
- gst_clock_id_unref (id);
- priv->clock_id = NULL;
-
- /* at this point, the clock could have been unlocked by a timeout, a new
- * tail element was added to the queue or because we are shutting down. Check
- * for shutdown first. */
- if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
- goto flushing;
-
- if (priv->timers->len <= timer_idx)
- goto done;
-
- /* we released the lock, the array might have changed */
- timer = &g_array_index (priv->timers, TimerData, timer_idx);
- /* if changed to timeout immediately, do so */
- if (timer->timeout == -1)
- goto do_timeout;
-
- /* if we got unscheduled and we are not flushing, it's because a new tail
- * element became available in the queue or we flushed the queue.
- * Grab it and try to push or sync. */
- if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
- GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
- goto done;
- }
-
- do_timeout:
- switch (timer->type) {
- case TIMER_TYPE_EXPECTED:
- result = do_expected_timeout (jitterbuffer, timer, clock_jitter);
- break;
- case TIMER_TYPE_LOST:
- result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
- break;
- case TIMER_TYPE_DEADLINE:
- priv->next_seqnum = timer->seqnum;
- remove_timer (jitterbuffer, timer);
- break;
- case TIMER_TYPE_EOS:
- result = do_eos_timeout (jitterbuffer, timer);
- break;
+ if (ret != GST_CLOCK_UNSCHEDULED)
+ now = timer_timeout + MAX (clock_jitter, 0);
+ } else {
+ /* no timers, wait for activity */
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting");
+ JBUF_WAIT_TIMER (priv);
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
- } else {
- /* no timers, wait for activity */
- GST_DEBUG_OBJECT (jitterbuffer, "waiting");
- priv->waiting = TRUE;
- JBUF_WAIT_CHECK (priv, flushing);
- priv->waiting = FALSE;
- GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
+ JBUF_UNLOCK (priv);
-done:
- return result;
-
-flushing:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- return priv->srcresult;
- }
+ GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
+ return;
}
/*
@@ -2474,9 +2534,13 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
- if (G_LIKELY (result == GST_FLOW_WAIT))
+ if (G_LIKELY (result == GST_FLOW_WAIT)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
/* now wait for the next event */
- result = wait_next_timeout (jitterbuffer);
+ JBUF_WAIT_EVENT (priv, flushing);
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
+ result = GST_FLOW_OK;
+ }
}
while (result == GST_FLOW_OK);
JBUF_UNLOCK (priv);