diff options
author | Sebastian Dröge <slomo@circular-chaos.org> | 2013-09-19 12:45:59 +0200 |
---|---|---|
committer | Sebastian Dröge <slomo@circular-chaos.org> | 2013-09-19 12:45:59 +0200 |
commit | f2da7bd60b4aad21e46b1eeb4d0df356f53dee73 (patch) | |
tree | 66bbf1e7a9977b2fa0c843f77d3fb98469c0d247 /gst/rtpmanager/gstrtpjitterbuffer.c | |
parent | 745561f0bd78088de77e79be5810300cc891b35c (diff) | |
parent | 73cae22d46e765755355ae7068ab415ea751cea8 (diff) |
Merge tag 'upstream/1.1.90' into debian-experimental
Upstream version 1.1.90
Diffstat (limited to 'gst/rtpmanager/gstrtpjitterbuffer.c')
-rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.c | 524 |
1 files changed, 294 insertions, 230 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 079ae0ca..bf676e6e 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -117,17 +117,29 @@ enum if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ goto label; \ } G_STMT_END - #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock)) -#define JBUF_WAIT(priv) (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock)) -#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \ - JBUF_WAIT(priv); \ - if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ - goto label; \ +#define JBUF_WAIT_TIMER(priv) G_STMT_START { \ + (priv)->waiting_timer = TRUE; \ + g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \ + (priv)->waiting_timer = FALSE; \ +} G_STMT_END +#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \ + if (G_UNLIKELY ((priv)->waiting_timer)) \ + g_cond_signal (&(priv)->jbuf_timer); \ } G_STMT_END -#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond)) +#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \ + (priv)->waiting_event = TRUE; \ + g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \ + (priv)->waiting_event = FALSE; \ + if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ + goto label; \ +} G_STMT_END +#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \ + if (G_UNLIKELY ((priv)->waiting_event)) \ + g_cond_signal (&(priv)->jbuf_event); \ +} G_STMT_END struct _GstRtpJitterBufferPrivate { @@ -136,13 +148,18 @@ struct _GstRtpJitterBufferPrivate RTPJitterBuffer *jbuf; GMutex jbuf_lock; - GCond jbuf_cond; - gboolean waiting; + gboolean waiting_timer; + GCond jbuf_timer; + gboolean waiting_event; + GCond jbuf_event; gboolean discont; gboolean ts_discont; gboolean active; guint64 out_offset; + gboolean timer_running; + GThread *timer_thread; + /* properties */ guint latency_ms; guint64 latency_ns; @@ -199,7 +216,6 @@ struct _GstRtpJitterBufferPrivate GstClockID clock_id; GstClockTime timer_timeout; guint16 timer_seqnum; - gboolean unscheduled; /* the latency of the upstream peer, we have to take this into account when * synchronizing the buffers. */ GstClockTime peer_latency; @@ -223,8 +239,10 @@ typedef struct { guint idx; guint16 seqnum; + guint num; TimerType type; GstClockTime timeout; + GstClockTime duration; GstClockTime rtx_base; GstClockTime rtx_retry; } TimerData; @@ -321,6 +339,8 @@ static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer); static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer); static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer); +static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer); + static void gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) { @@ -596,7 +616,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData)); priv->jbuf = rtp_jitter_buffer_new (); g_mutex_init (&priv->jbuf_lock); - g_cond_init (&priv->jbuf_cond); + g_cond_init (&priv->jbuf_timer); + g_cond_init (&priv->jbuf_event); /* reset skew detection initialy */ rtp_jitter_buffer_reset_skew (priv->jbuf); @@ -641,7 +662,8 @@ gst_rtp_jitter_buffer_finalize (GObject * object) g_array_free (jitterbuffer->priv->timers, TRUE); g_mutex_clear (&jitterbuffer->priv->jbuf_lock); - g_cond_clear (&jitterbuffer->priv->jbuf_cond); + g_cond_clear (&jitterbuffer->priv->jbuf_timer); + g_cond_clear (&jitterbuffer->priv->jbuf_event); g_object_unref (jitterbuffer->priv->jbuf); @@ -831,7 +853,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active, GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->out_offset)); priv->active = active; - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); } if (!active) { rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE); @@ -980,7 +1002,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer) priv->srcresult = GST_FLOW_FLUSHING; GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); /* this unblocks any waiting pops on the src pad task */ - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); /* unlock clock, we just unschedule, the entry will be released by the * locking streaming thread. */ unschedule_current_timer (jitterbuffer); @@ -1077,13 +1099,16 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, priv->last_pt = -1; /* block until we go to PLAYING */ priv->blocked = TRUE; + priv->timer_running = TRUE; + priv->timer_thread = + g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer); JBUF_UNLOCK (priv); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: JBUF_LOCK (priv); /* unblock to allow streaming in PLAYING */ priv->blocked = FALSE; - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); JBUF_UNLOCK (priv); break; default: @@ -1108,7 +1133,13 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PAUSED_TO_READY: + JBUF_LOCK (priv); gst_buffer_replace (&priv->last_sr, NULL); + priv->timer_running = FALSE; + JBUF_SIGNAL_TIMER (priv); + JBUF_UNLOCK (priv); + g_thread_join (priv->timer_thread); + priv->timer_thread = NULL; break; case GST_STATE_CHANGE_READY_TO_NULL: break; @@ -1229,7 +1260,7 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, if (ret && !priv->eos) { GST_INFO_OBJECT (jitterbuffer, "queuing EOS"); priv->eos = TRUE; - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); } else if (priv->eos) { GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS"); } else { @@ -1418,7 +1449,6 @@ unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer) if (priv->clock_id) { GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer"); gst_clock_id_unschedule (priv->clock_id); - priv->unscheduled = TRUE; } } @@ -1457,7 +1487,7 @@ recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) static TimerData * add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, - guint16 seqnum, GstClockTime timeout) + guint16 seqnum, guint num, GstClockTime timeout, GstClockTime duration) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; TimerData *timer; @@ -1473,12 +1503,15 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, timer->idx = len; timer->type = type; timer->seqnum = seqnum; + timer->num = num; timer->timeout = timeout; + timer->duration = duration; if (type == TIMER_TYPE_EXPECTED) { timer->rtx_base = timeout; timer->rtx_retry = 0; } recalculate_timer (jitterbuffer, timer); + JBUF_SIGNAL_TIMER (priv); return timer; } @@ -1531,7 +1564,7 @@ set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, /* find the seqnum timer */ timer = find_timer (jitterbuffer, type, seqnum); if (timer == NULL) { - timer = add_timer (jitterbuffer, type, seqnum, timeout); + timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, -1); } else { reschedule_timer (jitterbuffer, timer, seqnum, timeout); } @@ -1580,9 +1613,6 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, TimerData *timer = NULL; gint i, len; - if (!priv->do_retransmission) - return; - /* go through all timers and unschedule the ones with a large gap, also find * the timer for the seqnum */ len = priv->timers->len; @@ -1595,10 +1625,8 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i, test->seqnum, seqnum, gap); - if (test->type == TIMER_TYPE_DEADLINE) - continue; - if (gap == 0) { + GST_DEBUG ("found timer for current seqnum"); /* the timer for the current seqnum */ timer = test; } else if (gap > priv->rtx_delay_reorder) { @@ -1609,7 +1637,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, } } - if (priv->packet_spacing > 0 && do_next_seqnum) { + if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) { GstClockTime expected; /* calculate expected arrival time of the next seqnum */ @@ -1618,12 +1646,13 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, if (timer) reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected); else - add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, - expected); - } else if (timer) { + add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0, + expected, priv->packet_spacing); + } else if (timer && timer->type != TIMER_TYPE_DEADLINE) { /* if we had a timer, remove it, we don't know when to expect the next * packet. */ remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); } } @@ -1649,41 +1678,112 @@ calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime, } static void +send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum, + guint lost_packets, GstClockTime timestamp, GstClockTime duration, + gboolean late) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + + /* we had a gap and thus we lost some packets. Create an event for this. */ + if (lost_packets > 1) + GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum, + seqnum + lost_packets - 1); + else + GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum); + + priv->num_late += lost_packets; + priv->discont = TRUE; + + /* update our expected next packet but make sure the seqnum increases */ + if (seqnum + lost_packets > priv->next_seqnum) { + priv->next_seqnum = (seqnum + lost_packets) & 0xffff; + priv->last_popped_seqnum = seqnum; + priv->last_out_time = timestamp; + } + if (priv->do_lost) { + GstEvent *event; + + /* create paket lost event */ + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("GstRTPPacketLost", + "seqnum", G_TYPE_UINT, (guint) seqnum, + "timestamp", G_TYPE_UINT64, timestamp, + "duration", G_TYPE_UINT64, duration, + "late", G_TYPE_BOOLEAN, late, NULL)); + JBUF_UNLOCK (priv); + gst_pad_push_event (priv->srcpad, event); + JBUF_LOCK (priv); + } +} + +static void calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, guint16 seqnum, GstClockTime dts, gint gap) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstClockTime duration, expected_dts; + GstClockTime total_duration, duration, expected_dts; TimerType type; GST_DEBUG_OBJECT (jitterbuffer, "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts)); - /* interpolate between the current time and the last time based on - * number of packets we are missing, this is the estimated duration - * for the missing packet based on equidistant packet spacing. Also make - * sure we never go negative. */ + /* the total duration spanned by the missing packets */ if (dts >= priv->last_in_dts) - duration = (dts - priv->last_in_dts) / (gap + 1); + total_duration = dts - priv->last_in_dts; else - /* packet already lost, timer will timeout quickly */ - duration = 0; + total_duration = 0; + + /* interpolate between the current time and the last time based on + * number of packets we are missing, this is the estimated duration + * for the missing packet based on equidistant packet spacing. */ + duration = total_duration / (gap + 1); GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, GST_TIME_ARGS (duration)); + if (total_duration > priv->latency_ns) { + GstClockTime gap_time; + guint lost_packets; + + gap_time = total_duration - priv->latency_ns; + + if (duration > 0) { + lost_packets = gap_time / duration; + gap_time = lost_packets * duration; + } else { + lost_packets = gap; + } + + /* too many lost packets, some of the missing packets are already + * too late and we can generate lost packet events for them. */ + GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT ", consider %u lost", + GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns), + lost_packets); + + /* this timer will fire immediately and the lost event will be pushed from + * the timer thread */ + add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets, + priv->last_in_dts + duration, gap_time); + + expected += lost_packets; + priv->last_in_dts += gap_time; + } + expected_dts = priv->last_in_dts + duration; if (priv->do_retransmission) { - expected++; type = TIMER_TYPE_EXPECTED; + /* if we had a timer for the first missing packet, leave it. */ + if (find_timer (jitterbuffer, type, expected)) + expected++; } else { type = TIMER_TYPE_LOST; } while (expected < seqnum) { - add_timer (jitterbuffer, type, expected, expected_dts); + add_timer (jitterbuffer, type, expected, 0, expected_dts, duration); expected_dts += duration; expected++; } @@ -1811,6 +1911,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap); /* fill in the gap with EXPECTED timers */ calculate_expected (jitterbuffer, expected, seqnum, dts, gap); + do_next_seqnum = TRUE; } } @@ -1834,6 +1935,9 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, * while we wait */ set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts); do_next_seqnum = TRUE; + /* take rtptime and dts to calculate packet spacing */ + priv->ips_rtptime = rtptime; + priv->ips_dts = dts; } if (do_next_seqnum) { priv->last_in_seqnum = seqnum; @@ -1894,8 +1998,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, do_handle_sync (jitterbuffer); /* signal addition of new buffer when the _loop is waiting. */ - if (priv->waiting && priv->active) - JBUF_SIGNAL (priv); + if (priv->active && priv->waiting_timer) + JBUF_SIGNAL_EVENT (priv); /* let's unschedule and unblock any waiting buffers. We only want to do this * when the tail buffer changed */ @@ -2071,6 +2175,9 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) dts = GST_BUFFER_DTS (outbuf); pts = GST_BUFFER_PTS (outbuf); + dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts); + pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts); + /* apply timestamp with offset to buffer now */ GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts); @@ -2191,9 +2298,9 @@ wait: } /* the timeout for when we expected a packet expired */ -static GstFlowReturn +static gboolean do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, - GstClockTimeDiff clock_jitter) + GstClockTime now) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstEvent *event; @@ -2226,95 +2333,85 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, reschedule_timer (jitterbuffer, timer, timer->seqnum, timer->rtx_base + timer->rtx_retry); - return priv->srcresult; + return FALSE; } /* a packet is lost */ -static GstFlowReturn +static gboolean do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, - GstClockTimeDiff clock_jitter) + GstClockTime now) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstClockTime duration = GST_CLOCK_TIME_NONE; - guint32 lost_packets = 1; - gboolean lost_packets_late = FALSE; - -#if 0 - if (clock_jitter > 0 - && clock_jitter > (priv->latency_ns + priv->peer_latency)) { - GstClockTimeDiff total_duration; - GstClockTime out_time_diff; - - out_time_diff = - apply_offset (jitterbuffer, timer->timeout) - timer->timeout; - total_duration = MIN (out_time_diff, clock_jitter); - - if (duration > 0) - lost_packets = total_duration / duration; - else - lost_packets = gap; - total_duration = lost_packets * duration; + GstClockTime duration, timestamp; + guint seqnum, num; + gboolean late; + + seqnum = timer->seqnum; + timestamp = apply_offset (jitterbuffer, timer->timeout); + duration = timer->duration; + if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0) + duration = priv->packet_spacing; + num = MAX (timer->num, 1); + late = timer->num > 0; - GST_DEBUG_OBJECT (jitterbuffer, - "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT - ") Cover up %d lost packets with duration %" GST_TIME_FORMAT, - GST_TIME_ARGS (clock_jitter), - lost_packets, GST_TIME_ARGS (total_duration)); + /* remove timer now */ + remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); - duration = total_duration; - lost_packets_late = TRUE; - } -#endif + send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late); - /* we had a gap and thus we lost some packets. Create an event for this. */ - if (lost_packets > 1) - GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum, - timer->seqnum + lost_packets - 1); - else - GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum); + return TRUE; +} - priv->num_late += lost_packets; - priv->discont = TRUE; +static gboolean +do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, + GstClockTime now) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - /* update our expected next packet */ - priv->last_popped_seqnum = timer->seqnum; - priv->last_out_time = apply_offset (jitterbuffer, timer->timeout); - if (timer->seqnum + lost_packets > priv->next_seqnum) - priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff; - /* remove timer now */ + GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); - if (priv->do_lost) { - GstEvent *event; + return TRUE; +} - /* create paket lost event */ - event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, - gst_structure_new ("GstRTPPacketLost", - "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum, - "timestamp", G_TYPE_UINT64, priv->last_out_time, - "duration", G_TYPE_UINT64, duration, - "late", G_TYPE_BOOLEAN, lost_packets_late, NULL)); - JBUF_UNLOCK (priv); - gst_pad_push_event (priv->srcpad, event); - JBUF_LOCK_CHECK (priv, flushing); - } - return GST_FLOW_OK; +static gboolean +do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, + GstClockTime now) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - /* ERRORS */ -flushing: - { - GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - return priv->srcresult; - } + GST_INFO_OBJECT (jitterbuffer, "got deadline timeout"); + + priv->next_seqnum = timer->seqnum; + remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); + + return TRUE; } -static GstFlowReturn -do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) +static gboolean +do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, + GstClockTime now) { - GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); - remove_timer (jitterbuffer, timer); + gboolean removed = FALSE; - return GST_FLOW_EOS; + switch (timer->type) { + case TIMER_TYPE_EXPECTED: + removed = do_expected_timeout (jitterbuffer, timer, now); + break; + case TIMER_TYPE_LOST: + removed = do_lost_timeout (jitterbuffer, timer, now); + break; + case TIMER_TYPE_DEADLINE: + removed = do_deadline_timeout (jitterbuffer, timer, now); + break; + case TIMER_TYPE_EOS: + removed = do_eos_timeout (jitterbuffer, timer, now); + break; + } + return removed; } /* called when we need to wait for the next timeout. @@ -2324,137 +2421,100 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) * * If there are no timers, we wait on a gcond until something new happens. */ -static GstFlowReturn +static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstFlowReturn result = GST_FLOW_OK; - gint i, len; - TimerData *timer = NULL; - GstClockTime timer_timeout = -1; - gint timer_idx; - - len = priv->timers->len; - for (i = 0; i < len; i++) { - TimerData *test = &g_array_index (priv->timers, TimerData, i); - GstClockTime test_timeout = get_timeout (jitterbuffer, test); + GstClockTime now = 0; - GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT, - i, test->seqnum, GST_TIME_ARGS (test_timeout)); - - /* find the smallest timeout */ - if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) { - timer = test; - timer_timeout = test_timeout; - if (timer_timeout == -1) - break; + JBUF_LOCK (priv); + while (priv->timer_running) { + TimerData *timer = NULL; + GstClockTime timer_timeout = -1; + gint i, len; + + GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT, + GST_TIME_ARGS (now)); + + len = priv->timers->len; + for (i = 0; i < len; i++) { + TimerData *test = &g_array_index (priv->timers, TimerData, i); + GstClockTime test_timeout = get_timeout (jitterbuffer, test); + + GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT, + i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout)); + + /* no timestamp, timeout immeditately */ + if (test_timeout == -1 || test_timeout <= now) { + if (do_timeout (jitterbuffer, test, now)) + len--; + i--; + } else if (timer == NULL || test_timeout < timer_timeout) { + /* find the smallest timeout */ + timer = test; + timer_timeout = test_timeout; + } } - } - if (timer) { - GstClock *clock; - GstClockTime sync_time; - GstClockID id; - GstClockReturn ret; - GstClockTimeDiff clock_jitter; + if (timer) { + GstClock *clock; + GstClockTime sync_time; + GstClockID id; + GstClockReturn ret; + GstClockTimeDiff clock_jitter; + + GST_OBJECT_LOCK (jitterbuffer); + clock = GST_ELEMENT_CLOCK (jitterbuffer); + if (!clock) { + GST_OBJECT_UNLOCK (jitterbuffer); + /* let's just push if there is no clock */ + GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away"); + now = timer_timeout; + continue; + } - /* no timestamp, timeout immeditately */ - if (timer_timeout == -1) - goto do_timeout; + /* prepare for sync against clock */ + sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time; + /* add latency of peer to get input time */ + sync_time += priv->peer_latency; - GST_OBJECT_LOCK (jitterbuffer); - clock = GST_ELEMENT_CLOCK (jitterbuffer); - if (!clock) { - GST_OBJECT_UNLOCK (jitterbuffer); - /* let's just push if there is no clock */ - GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away"); - goto do_timeout; - } + GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT + " with sync time %" GST_TIME_FORMAT, + GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time)); - /* prepare for sync against clock */ - sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time; - /* add latency of peer to get input time */ - sync_time += priv->peer_latency; + /* create an entry for the clock */ + id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); + priv->timer_timeout = timer_timeout; + priv->timer_seqnum = timer->seqnum; + GST_OBJECT_UNLOCK (jitterbuffer); - GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT - " with sync time %" GST_TIME_FORMAT, - GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time)); + /* release the lock so that the other end can push stuff or unlock */ + JBUF_UNLOCK (priv); - /* create an entry for the clock */ - id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); - priv->unscheduled = FALSE; - priv->timer_timeout = timer_timeout; - priv->timer_seqnum = timer->seqnum; - timer_idx = timer->idx; - GST_OBJECT_UNLOCK (jitterbuffer); + ret = gst_clock_id_wait (id, &clock_jitter); - /* release the lock so that the other end can push stuff or unlock */ - JBUF_UNLOCK (priv); + JBUF_LOCK (priv); + if (!priv->timer_running) + break; - ret = gst_clock_id_wait (id, &clock_jitter); + GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT, + ret, priv->timer_seqnum, clock_jitter); + /* and free the entry */ + gst_clock_id_unref (id); + priv->clock_id = NULL; - JBUF_LOCK (priv); - GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT, - ret, priv->timer_seqnum, clock_jitter); - /* and free the entry */ - gst_clock_id_unref (id); - priv->clock_id = NULL; - - /* at this point, the clock could have been unlocked by a timeout, a new - * tail element was added to the queue or because we are shutting down. Check - * for shutdown first. */ - if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) - goto flushing; - - if (priv->timers->len <= timer_idx) - goto done; - - /* we released the lock, the array might have changed */ - timer = &g_array_index (priv->timers, TimerData, timer_idx); - /* if changed to timeout immediately, do so */ - if (timer->timeout == -1) - goto do_timeout; - - /* if we got unscheduled and we are not flushing, it's because a new tail - * element became available in the queue or we flushed the queue. - * Grab it and try to push or sync. */ - if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) { - GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled"); - goto done; - } - - do_timeout: - switch (timer->type) { - case TIMER_TYPE_EXPECTED: - result = do_expected_timeout (jitterbuffer, timer, clock_jitter); - break; - case TIMER_TYPE_LOST: - result = do_lost_timeout (jitterbuffer, timer, clock_jitter); - break; - case TIMER_TYPE_DEADLINE: - priv->next_seqnum = timer->seqnum; - remove_timer (jitterbuffer, timer); - break; - case TIMER_TYPE_EOS: - result = do_eos_timeout (jitterbuffer, timer); - break; + if (ret != GST_CLOCK_UNSCHEDULED) + now = timer_timeout + MAX (clock_jitter, 0); + } else { + /* no timers, wait for activity */ + GST_DEBUG_OBJECT (jitterbuffer, "waiting"); + JBUF_WAIT_TIMER (priv); + GST_DEBUG_OBJECT (jitterbuffer, "waiting done"); } - } else { - /* no timers, wait for activity */ - GST_DEBUG_OBJECT (jitterbuffer, "waiting"); - priv->waiting = TRUE; - JBUF_WAIT_CHECK (priv, flushing); - priv->waiting = FALSE; - GST_DEBUG_OBJECT (jitterbuffer, "waiting done"); } + JBUF_UNLOCK (priv); -done: - return result; - -flushing: - { - GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - return priv->srcresult; - } + GST_DEBUG_OBJECT (jitterbuffer, "we are stopping"); + return; } /* @@ -2474,9 +2534,13 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) JBUF_LOCK_CHECK (priv, flushing); do { result = handle_next_buffer (jitterbuffer); - if (G_LIKELY (result == GST_FLOW_WAIT)) + if (G_LIKELY (result == GST_FLOW_WAIT)) { + GST_DEBUG_OBJECT (jitterbuffer, "waiting for event"); /* now wait for the next event */ - result = wait_next_timeout (jitterbuffer); + JBUF_WAIT_EVENT (priv, flushing); + GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done"); + result = GST_FLOW_OK; + } } while (result == GST_FLOW_OK); JBUF_UNLOCK (priv); |