diff --git a/daemon/codec.c b/daemon/codec.c index fe6f84755..89f794a1e 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -75,16 +75,16 @@ struct dtx_buffer { int ptime; // ms per packet int tspp; // timestamp increment per packet struct call *call; - unsigned long ts; - unsigned int ts_seq; // for subsequent packets with same TS, e.g. DTMF + GQueue packets; + struct media_packet last_mp; + unsigned long head_ts; + uint32_t ssrc; + struct timerthread_queue_entry ttq_entry; time_t start; }; -struct dtx_entry { - struct timerthread_queue_entry ttq_entry; +struct dtx_packet { struct transcode_packet *packet; struct media_packet mp; - unsigned long ts; - void *ssrc_ptr; // opaque pointer, doesn't hold a reference struct codec_ssrc_handler *decoder_handler; // holds reference int (*func)(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp); }; @@ -2377,18 +2377,6 @@ static int codec_decoder_event(enum codec_event event, void *ptr, void *data) { return 0; } -static void __dtx_add_callback(struct dtx_buffer *dtxb, const struct timeval *base, unsigned int offset, - const struct media_packet *mp, unsigned long ts, int seq_add, void *ssrc_ptr) -{ - struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe)); - dtxe->ttq_entry.when = *base; - timeval_add_usec(&dtxe->ttq_entry.when, offset); - dtxe->ts = ts; - media_packet_copy(&dtxe->mp, mp); - dtxe->mp.rtp->seq_num += htons(seq_add); - dtxe->ssrc_ptr = ssrc_ptr; - timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); -} // consumes `packet` if buffered (returns 1) static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler, struct transcode_packet *packet, struct media_packet *mp, @@ -2398,135 +2386,227 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *deco if (!dtxb || !mp->sfd || !mp->ssrc_in || !mp->ssrc_out) return 0; - ilogs(transcoding, LOG_DEBUG, "Adding packet to DTX buffer"); - unsigned long ts = packet->ts; + // allocate packet object + struct dtx_packet *dtxp = g_slice_alloc0(sizeof(*dtxp)); + dtxp->packet = packet; + dtxp->func = func; + if (decoder_handler) + dtxp->decoder_handler = obj_get(&decoder_handler->h); + media_packet_copy(&dtxp->mp, mp); + + // add to processing queue + mutex_lock(&dtxb->lock); - if (ts != dtxb->ts) { - dtxb->ts = ts; - dtxb->ts_seq = 0; - } - else - dtxb->ts_seq++; - unsigned int ts_seq = dtxb->ts_seq; + dtxb->start = rtpe_now.tv_sec; - mutex_unlock(&dtxb->lock); + g_queue_push_tail(&dtxb->packets, dtxp); + ilogs(dtx, LOG_DEBUG, "Adding packet (TS %lu) to DTX buffer; now %i packets in DTX queue", + ts, dtxb->packets.length); - struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe)); - dtxe->ttq_entry.when = rtpe_now; - timeval_add_usec(&dtxe->ttq_entry.when, rtpe_config.dtx_delay * 1000); - dtxe->packet = packet; - dtxe->func = func; - if (decoder_handler) - dtxe->decoder_handler = obj_get(&decoder_handler->h); - media_packet_copy(&dtxe->mp, mp); - timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); - // packet now consumed - packet = NULL; + // schedule timer if not running yet + if (!dtxb->ttq_entry.when.tv_sec) { + if (!dtxb->ssrc) + dtxb->ssrc = mp->ssrc_in->parent->h.ssrc; + dtxb->ttq_entry.when = mp->tv; + timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_delay * 1000); + timerthread_queue_push(&dtxb->ttq, &dtxb->ttq_entry); + } - __dtx_add_callback(dtxb, &rtpe_now, (rtpe_config.dtx_delay + dtxb->ptime) * 1000, mp, ts + ts_seq, 1, - mp->stream->ssrc_in); + mutex_unlock(&dtxb->lock); return 1; } -static void __dtx_entry_free(void *p) { - struct dtx_entry *dtxe = p; - if (dtxe->packet) - __transcode_packet_free(dtxe->packet); - media_packet_release(&dtxe->mp); - if (dtxe->decoder_handler) - obj_put(&dtxe->decoder_handler->h); - g_slice_free1(sizeof(*dtxe), dtxe); +static void dtx_packet_free(struct dtx_packet *dtxp) { + if (dtxp->packet) + __transcode_packet_free(dtxp->packet); + media_packet_release(&dtxp->mp); + if (dtxp->decoder_handler) + obj_put(&dtxp->decoder_handler->h); + g_slice_free1(sizeof(*dtxp), dtxp); } static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { struct dtx_buffer *dtxb = (void *) ttq; - struct dtx_entry *dtxe = p; - struct transcode_packet *packet = dtxe->packet; - struct media_packet *mp = &dtxe->mp; - struct packet_stream *ps = mp->stream; - int ret = 0; + struct media_packet mp_copy = {0,}; + int ret = 0, discard = 0; + unsigned long ts; + int p_left = 0; + long tv_diff = -1, ts_diff = 0; mutex_lock(&dtxb->lock); - struct codec_ssrc_handler *ch = dtxe->decoder_handler ? obj_get(&dtxe->decoder_handler->h) : NULL; + + // do we have a packet? + struct dtx_packet *dtxp = g_queue_peek_head(&dtxb->packets); + if (dtxp) { + // inspect head packet and check TS, see if it's ready to be decoded + ts = dtxp->packet->ts; + ts_diff = ts - dtxb->head_ts; + + if (!dtxb->head_ts) + ; // first packet + else if (ts_diff < 0) + ilogs(dtx, LOG_DEBUG, "DTX timestamp reset (from %lu to %lu)", dtxb->head_ts, ts); + else if (ts_diff > 100000) // arbitrary value + ilogs(dtx, LOG_DEBUG, "DTX timestamp reset (from %lu to %lu)", dtxb->head_ts, ts); + else if (ts_diff > dtxb->tspp) { + ilogs(dtx, LOG_DEBUG, "First packet in DTX buffer not ready yet (packet TS %lu, " + "DTX TS %lu, diff %li)", + ts, dtxb->head_ts, ts_diff); + dtxp = NULL; + } + + // go or no go? + if (dtxp) + g_queue_pop_head(&dtxb->packets); + } + + p_left = dtxb->packets.length; + + if (dtxp) { + // save the `mp` for possible future DTX + media_packet_release(&dtxb->last_mp); + media_packet_copy(&dtxb->last_mp, &dtxp->mp); + media_packet_copy(&mp_copy, &dtxp->mp); + ts_diff = dtxp->packet->ts - dtxb->head_ts; + ts = dtxb->head_ts = dtxp->packet->ts; + tv_diff = timeval_diff(&rtpe_now, &mp_copy.tv); + } + else { + // no packet ready to decode: DTX + media_packet_copy(&mp_copy, &dtxb->last_mp); + // shift forward TS + dtxb->head_ts += dtxb->tspp; + ts = dtxb->head_ts; + } + struct packet_stream *ps = mp_copy.stream; + log_info_stream_fd(mp_copy.sfd); + + // copy out other fields so we can unlock + struct codec_ssrc_handler *ch = (dtxp && dtxp->decoder_handler) ? obj_get(&dtxp->decoder_handler->h) + : NULL; if (!ch && dtxb->csh) ch = obj_get(&dtxb->csh->h); struct call *call = dtxb->call ? obj_get(dtxb->call) : NULL; - mutex_unlock(&dtxb->lock); - if (!call || !ch) + if (!call || !ch || !ps || !ps->ssrc_in + || dtxb->ssrc != ps->ssrc_in->parent->h.ssrc + || dtxb->ttq_entry.when.tv_sec == 0) { + // shut down or SSRC change + ilogs(dtx, LOG_DEBUG, "DTX buffer for %lx has been shut down", (unsigned long) dtxb->ssrc); + dtxb->ttq_entry.when.tv_sec = 0; + mutex_unlock(&dtxb->lock); goto out; // shut down + } + + // schedule next run + timeval_add_usec(&dtxb->ttq_entry.when, dtxb->ptime * 1000); + + // handle timer drifts + if (dtxp && tv_diff < rtpe_config.dtx_delay * 1000) { + // timer underflow + ilogs(dtx, LOG_DEBUG, "Packet reception time has caught up with DTX timer " + "(%li ms < %i ms), " + "pushing DTX timer forward my %i ms", + tv_diff / 1000, rtpe_config.dtx_delay, rtpe_config.dtx_shift); + timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * 1000); + } + else if (dtxp && ts_diff < dtxb->tspp) { + // TS underflow + // special case: DTMF timestamps are static + if (ts_diff == 0 && ch->handler->source_pt.codec_def->dtmf) { + ; + } + else { + ilogs(dtx, LOG_DEBUG, "Packet timestamps have caught up with DTX timer " + "(TS %lu, diff %li), " + "pushing DTX timer forward by %i ms and discarding packet", + ts, ts_diff, rtpe_config.dtx_shift); + timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * 1000); + discard = 1; + } + } + else if (dtxp && dtxb->packets.length >= rtpe_config.dtx_buffer) { + // inspect TS is most recent packet + struct dtx_packet *dtxp_last = g_queue_peek_tail(&dtxb->packets); + ts_diff = dtxp_last->packet->ts - ts; + long long ts_diff_us = (long long) ts_diff * 1000000 / ch->handler->source_pt.clock_rate; + if (ts_diff_us >= rtpe_config.dtx_lag * 1000) { + // overflow + ilogs(dtx, LOG_DEBUG, "DTX timer queue overflowing (%i packets in queue, " + "%lli ms delay), speeding up DTX timer by %i ms", + dtxb->packets.length, ts_diff_us / 1000, rtpe_config.dtx_shift); + timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * -1000); + } + } - log_info_stream_fd(mp->sfd); + timerthread_queue_push(&dtxb->ttq, &dtxb->ttq_entry); + + mutex_unlock(&dtxb->lock); rwlock_lock_r(&call->master_lock); - __ssrc_lock_both(mp); + __ssrc_lock_both(&mp_copy); - if (packet) { - ilogs(transcoding, LOG_DEBUG, "Decoding DTX-buffered RTP packet (TS %lu) now", packet->ts); + if (dtxp) { + if (!discard) { + ilogs(dtx, LOG_DEBUG, "Decoding DTX-buffered RTP packet (TS %lu) now; " + "%i packets left in queue", ts, p_left); - ret = dtxe->func(ch, packet, &dtxe->mp); - if (ret) - ilogs(transcoding, LOG_WARN | LOG_FLAG_LIMIT, "Decoder error while processing buffered RTP packet"); + ret = dtxp->func(ch, dtxp->packet, &mp_copy); + if (ret) + ilogs(dtx, LOG_WARN | LOG_FLAG_LIMIT, + "Decoder error while processing buffered RTP packet"); + } } else { - unsigned long dtxe_ts = dtxe->ts; - - mutex_lock(&dtxb->lock); unsigned int diff = rtpe_now.tv_sec - dtxb->start; - unsigned long dtxb_ts = dtxb->ts + dtxb->ts_seq; - void *ssrc_ptr = dtxe->mp.stream->ssrc_in; - - if (dtxe_ts == dtxb_ts - && ssrc_ptr == dtxe->ssrc_ptr - && (rtpe_config.max_dtx <= 0 || diff < rtpe_config.max_dtx)) - { - ilogs(transcoding, LOG_DEBUG, "RTP media for TS %lu+ missing, triggering DTX", - dtxe_ts); - - dtxb_ts += dtxb->tspp; - dtxb_ts -= dtxb->ts_seq; - dtxe_ts = dtxb_ts; - dtxb->ts = dtxb_ts; - dtxb->ts_seq = 0; - mutex_unlock(&dtxb->lock); - - ret = decoder_lost_packet(ch->decoder, dtxe_ts, - ch->handler->packet_decoded, ch, &dtxe->mp); - if (ret) - ilogs(transcoding, LOG_WARN | LOG_FLAG_LIMIT, "Decoder error handling DTX/lost packet"); - __dtx_add_callback(dtxb, &dtxe->ttq_entry.when, dtxb->ptime * 1000, mp, dtxe_ts, 0, ssrc_ptr); + if (rtpe_config.max_dtx <= 0 || diff < rtpe_config.max_dtx) { + ilogs(dtx, LOG_DEBUG, "RTP media for TS %lu missing, triggering DTX", ts); + + // synthetic packet + mp_copy.rtp->seq_num += htons(1); + + ret = decoder_lost_packet(ch->decoder, ts, + ch->handler->packet_decoded, ch, &mp_copy); + if (ret) + ilogs(dtx, LOG_WARN | LOG_FLAG_LIMIT, + "Decoder error handling DTX/lost packet"); } else - mutex_unlock(&dtxb->lock); + ilogs(dtx, LOG_DEBUG, "Stopping DTX at TS %lu", ts); } - __ssrc_unlock_both(mp); + __ssrc_unlock_both(&mp_copy); - if (mp->packets_out.length && ret == 0) { + if (mp_copy.packets_out.length && ret == 0) { struct packet_stream *sink = ps->rtp_sink; if (!sink) - media_socket_dequeue(mp, NULL); // just free + media_socket_dequeue(&mp_copy, NULL); // just free else { - if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, mp)) - ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); + if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, &mp_copy)) + ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); mutex_lock(&sink->out_lock); - if (media_socket_dequeue(mp, sink)) - ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error sending buffered media to RTP sink"); + if (media_socket_dequeue(&mp_copy, sink)) + ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, + "Error sending buffered media to RTP sink"); mutex_unlock(&sink->out_lock); } } rwlock_unlock_r(&call->master_lock); - obj_put(call); - obj_put(&ch->h); out: - __dtx_entry_free(dtxe); + if (call) + obj_put(call); + if (ch) + obj_put(&ch->h); + if (dtxp) + dtx_packet_free(dtxp); + media_packet_release(&mp_copy); log_info_clear(); } static void __dtx_shutdown(struct dtx_buffer *dtxb) { @@ -2536,10 +2616,12 @@ static void __dtx_shutdown(struct dtx_buffer *dtxb) { if (dtxb->call) obj_put(dtxb->call); dtxb->call = NULL; + g_queue_clear_full(&dtxb->packets, (GDestroyNotify) dtx_packet_free); } static void __dtx_free(void *p) { struct dtx_buffer *dtxb = p; __dtx_shutdown(dtxb); + media_packet_release(&dtxb->last_mp); mutex_destroy(&dtxb->lock); } static void __dtx_setup(struct codec_ssrc_handler *ch) { @@ -2551,14 +2633,14 @@ static void __dtx_setup(struct codec_ssrc_handler *ch) { struct dtx_buffer *dtx = ch->dtx_buffer = timerthread_queue_new("dtx_buffer", sizeof(*ch->dtx_buffer), - &codec_timers_thread, NULL, __dtx_send_later, __dtx_free, __dtx_entry_free); + &codec_timers_thread, NULL, __dtx_send_later, __dtx_free, NULL); dtx->csh = obj_get(&ch->h); dtx->call = obj_get(ch->handler->media->call); mutex_init(&dtx->lock); dtx->ptime = ch->ptime; if (!dtx->ptime) - dtx->ptime = 20; // XXX ? - dtx->tspp = dtx->ptime * ch->handler->source_pt.clock_rate / 1000; + dtx->ptime = 20; // XXX should be replaced with length of actual decoded packet + dtx->tspp = dtx->ptime * ch->handler->source_pt.clock_rate / 1000; // XXX ditto } static void __ssrc_handler_stop(void *p) { struct codec_ssrc_handler *ch = p; @@ -3862,6 +3944,7 @@ void codec_rtp_payload_types(struct call_media *media, struct call_media *other_ void codecs_init(void) { #ifdef WITH_TRANSCODING + // XXX not real queue timer - unify to simple timerthread timerthread_init(&codec_timers_thread, timerthread_queue_run); rtcp_timer_queue = timerthread_queue_new("rtcp_timer_queue", sizeof(*rtcp_timer_queue), &codec_timers_thread, NULL, __rtcp_timer_run, NULL, __rtcp_timer_free); @@ -3869,6 +3952,7 @@ void codecs_init(void) { } void codecs_cleanup(void) { #ifdef WITH_TRANSCODING + obj_put(&rtcp_timer_queue->ttq.tt_obj); timerthread_free(&codec_timers_thread); #endif } diff --git a/daemon/loglevels.h b/daemon/loglevels.h index b2e9c951b..0c7e81128 100644 --- a/daemon/loglevels.h +++ b/daemon/loglevels.h @@ -10,3 +10,4 @@ ll(srtp, "SRTP encryption and decryption") ll(internals, "Noisy low-level internals") ll(http, "HTTP, HTTPS, Websockets") ll(control, "Control protocols including SDP exchanges, CLI") +ll(dtx, "DTX timer/buffer") diff --git a/daemon/main.c b/daemon/main.c index 61f1d7685..526ad7d1f 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -81,6 +81,9 @@ struct rtpengine_config rtpe_config = { .dtls_rsa_key_size = 2048, .dtls_signature = 256, .max_dtx = 30, + .dtx_shift = 5, + .dtx_buffer = 10, + .dtx_lag = 100, .common = { .log_levels = { [log_level_index_internals] = -1, @@ -483,6 +486,9 @@ static void options(int *argc, char ***argv) { #ifdef WITH_TRANSCODING { "dtx-delay", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_delay, "Delay in milliseconds to trigger DTX handling","INT"}, { "max-dtx", 0,0, G_OPTION_ARG_INT, &rtpe_config.max_dtx, "Maximum duration of DTX handling", "INT"}, + { "dtx-buffer", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_buffer,"Maxmium number of packets held in DTX buffer", "INT"}, + { "dtx-lag", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_lag, "Maxmium time span in milliseconds held in DTX buffer", "INT"}, + { "dtx-shift", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_shift, "Length of time (in ms) to shift DTX buffer after over/underflow", "INT"}, { "silence-detect",0,0, G_OPTION_ARG_DOUBLE, &silence_detect, "Audio level threshold in percent for silence detection","FLOAT"}, { "cn-payload",0,0, G_OPTION_ARG_STRING_ARRAY,&cn_payload, "Comfort noise parameters to replace silence with","INT INT INT ..."}, { "reorder-codecs",0,0, G_OPTION_ARG_NONE, &rtpe_config.reorder_codecs,"Reorder answer codecs based on sender preference",NULL}, diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index fb0427ede..0b8c133ad 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -788,8 +788,7 @@ only. When enabled, delays processing of received packets for the specified time (much like a jitter buffer) in order to trigger DTX handling when a transmission gap occurs. The decoder is then instructed to fill in the missing time during a transmission gap, for example by generating comfort noise. The -delay should be configured to just slightly more than the expected incoming -jitter. +delay should be configured to be higher than the expected incoming jitter. =item B<--max-dtx=>I @@ -798,6 +797,26 @@ received within this time frame, then DTX processing will stop. Can be set to zero or negative to disable and keep DTX processing on indefinitely. Defaults to 30 seconds. +=item B<--dtx-buffer=>I + +=item B<--dtx-lag=>I + +These two options together control the maximum number of packets and amount of +audio that is allowed to be held in the DTX buffer. The B option +limits the number of packets held in the DTX buffer, while the B +option limits the amount of audio (in milliseconds) to be held in the DTX +buffer. A DTX buffer overflow is declared when both limits are exceeded, in +which case DTX processing is sped up by B milliseconds. + +The defauls are 10 packets and 100 milliseconds. + +=item B<--dtx-shift=>I + +Amount of time in milliseconds that DTX processing is shifted forward (sped up) +or backwards (delayed) in case of a DTX buffer overflow or underflow. An +underflow occurs when RTP packets are received slower than expected, while an +overflow occurs when packets are received faster than expected. + =item B<--silence-detect=>I Enable silence detection and specify threshold in percent. This option is diff --git a/include/main.h b/include/main.h index f861e453e..721163705 100644 --- a/include/main.h +++ b/include/main.h @@ -111,6 +111,9 @@ struct rtpengine_config { int http_threads; int dtx_delay; int max_dtx; + int dtx_buffer; + int dtx_lag; + int dtx_shift; double silence_detect_double; uint32_t silence_detect_int; str cn_payload; diff --git a/t/loglevels.h b/t/loglevels.h index b2e9c951b..0c7e81128 100644 --- a/t/loglevels.h +++ b/t/loglevels.h @@ -10,3 +10,4 @@ ll(srtp, "SRTP encryption and decryption") ll(internals, "Noisy low-level internals") ll(http, "HTTP, HTTPS, Websockets") ll(control, "Control protocols including SDP exchanges, CLI") +ll(dtx, "DTX timer/buffer")