diff --git a/daemon/call.c b/daemon/call.c index 424b324c9..c24533929 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -3014,6 +3014,7 @@ static void monologue_stop(struct call_monologue *ml) { for (GList *l = ml->medias.head; l; l = l->next) { struct call_media *m = l->data; t38_gateway_stop(m->t38_gateway); + codec_handlers_stop(&m->codec_handlers_store); } } diff --git a/daemon/codec.c b/daemon/codec.c index ffd19775b..8180bb53b 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -15,6 +15,7 @@ #include "t38.h" #include "media_player.h" #include "timerthread.h" +#include "log_funcs.h" @@ -63,6 +64,24 @@ static GList *__delete_receiver_codec(struct call_media *receiver, GList *link) +struct codec_ssrc_handler; + +struct dtx_buffer { + struct timerthread_queue ttq; + mutex_t lock; + struct codec_ssrc_handler *csh; + int ptime; // ms per packet + int tspp; // timestamp increment per packet + struct call *call; + unsigned long ts; +}; +struct dtx_entry { + struct timerthread_queue_entry ttq_entry; + struct transcode_packet *packet; + struct media_packet mp; + unsigned long ts; +}; + struct codec_ssrc_handler { struct ssrc_entry h; // must be first struct codec_handler *handler; @@ -77,6 +96,7 @@ struct codec_ssrc_handler { struct timeval first_send; unsigned long first_send_ts; GString *sample_buffer; + struct dtx_buffer *dtx_buffer; // DTMF DSP stuff dtmf_rx_state_t *dtmf_dsp; @@ -1380,7 +1400,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa atomic64_set(&ssrc_in->packets_lost, ssrc_in_p->sequencer.lost_count); atomic64_set(&ssrc_in->last_seq, ssrc_in_p->sequencer.ext_seq); - ilog(LOG_DEBUG, "Decoding RTP packet: seq %u, TS %lu", + ilog(LOG_DEBUG, "Processing RTP packet: seq %u, TS %lu", packet->p.seq, packet->ts); if (seq_ret == 1) { @@ -1834,6 +1854,153 @@ static int codec_decoder_event(enum codec_event event, void *ptr, void *data) { return 0; } +static void __dtx_add_callback(struct dtx_buffer *dtxb, const struct timeval *base, unsigned int offset, + const struct media_packet *mp, unsigned long ts, int seq_add) +{ + struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe)); + dtxe->ttq_entry.when = *base; + timeval_add_usec(&dtxe->ttq_entry.when, offset); + dtxe->ts = ts; + media_packet_copy(&dtxe->mp, mp); + dtxe->mp.rtp->seq_num += htons(seq_add); + timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); +} + +static void __dtx_entry_free(void *p) { + struct dtx_entry *dtxe = p; + if (dtxe->packet) + __transcode_packet_free(dtxe->packet); + media_packet_release(&dtxe->mp); + g_slice_free1(sizeof(*dtxe), dtxe); +} +static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { + struct dtx_buffer *dtxb = (void *) ttq; + struct dtx_entry *dtxe = p; + struct transcode_packet *packet = dtxe->packet; + struct media_packet *mp = &dtxe->mp; + struct packet_stream *ps = mp->stream; + struct packet_stream *sink = ps->rtp_sink; + int ret = 0; + + mutex_lock(&dtxb->lock); + struct codec_ssrc_handler *ch = dtxb->csh ? obj_get(&dtxb->csh->h) : NULL; + struct call *call = dtxb->call ? obj_get(dtxb->call) : NULL; + mutex_unlock(&dtxb->lock); + + if (!call) + goto out; // shut down + + log_info_stream_fd(mp->sfd); + + rwlock_lock_r(&call->master_lock); + __ssrc_lock_both(mp); + + if (packet) { + ilog(LOG_DEBUG, "Decoding DTX-buffered RTP packet (TS %lu) now", packet->ts); + + ret = decoder_input_data(ch->decoder, packet->payload, packet->ts, + ch->handler->packet_decoded, ch, &dtxe->mp); + mp->ssrc_out->parent->seq_diff--; + if (ret) + ilog(LOG_WARN | LOG_FLAG_LIMIT, "Decoder error while processing buffered RTP packet"); + } + else { + unsigned long dtxe_ts = dtxe->ts; + + mutex_lock(&dtxb->lock); + unsigned long dtxb_ts = dtxb->ts; + + if (dtxe_ts == dtxb_ts) { + ilog(LOG_DEBUG, "RTP media for TS %lu+ missing, triggering DTX", + dtxe_ts); + + dtxb_ts += dtxb->tspp; + dtxe_ts = dtxb_ts; + dtxb->ts = dtxb_ts; + mutex_unlock(&dtxb->lock); + + ret = decoder_lost_packet(ch->decoder, dtxe_ts, + ch->handler->packet_decoded, ch, &dtxe->mp); + if (ret) + ilog(LOG_WARN | LOG_FLAG_LIMIT, "Decoder error handling DTX/lost packet"); + + __dtx_add_callback(dtxb, &dtxe->ttq_entry.when, dtxb->ptime * 1000, mp, dtxe_ts, 0); + } + else + mutex_unlock(&dtxb->lock); + } + + __ssrc_unlock_both(mp); + + if (mp->packets_out.length && ret == 0) { + if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, mp)) + ilog(LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); + + mutex_lock(&sink->out_lock); + if (media_socket_dequeue(mp, sink)) + ilog(LOG_ERR | LOG_FLAG_LIMIT, "Error sending buffered media to RTP sink"); + mutex_unlock(&sink->out_lock); + } + + rwlock_unlock_r(&call->master_lock); + obj_put(call); + obj_put(&ch->h); + +out: + __dtx_entry_free(dtxe); + log_info_clear(); +} +static void __dtx_shutdown(struct dtx_buffer *dtxb) { + if (dtxb->csh) + obj_put(&dtxb->csh->h); + dtxb->csh = NULL; + if (dtxb->call) + obj_put(dtxb->call); + dtxb->call = NULL; +} +static void __dtx_free(void *p) { + struct dtx_buffer *dtxb = p; + ilog(LOG_DEBUG, "__dtx_free"); + __dtx_shutdown(dtxb); + mutex_destroy(&dtxb->lock); +} +static void __dtx_setup(struct codec_ssrc_handler *ch) { + if (!ch->handler->source_pt.codec_def->packet_lost || ch->dtx_buffer) + return; + + if (!rtpe_config.dtx_delay) + return; + + struct dtx_buffer *dtx = + ch->dtx_buffer = timerthread_queue_new("dtx_buffer", sizeof(*ch->dtx_buffer), + &codec_timers_thread, NULL, __dtx_send_later, __dtx_free, __dtx_entry_free); + dtx->csh = obj_get(&ch->h); + dtx->call = obj_get(ch->handler->media->call); + mutex_init(&dtx->lock); + dtx->ptime = ch->ptime; + if (!dtx->ptime) + dtx->ptime = 20; // XXX ? + dtx->tspp = dtx->ptime * ch->handler->source_pt.clock_rate / 1000; +} +void __ssrc_handler_stop(void *p) { + struct codec_ssrc_handler *ch = p; + if (ch->dtx_buffer) { + mutex_lock(&ch->dtx_buffer->lock); + __dtx_shutdown(ch->dtx_buffer); + mutex_unlock(&ch->dtx_buffer->lock); + + obj_put(&ch->dtx_buffer->ttq.tt_obj); + ch->dtx_buffer = NULL; + } +} +void codec_handlers_stop(GQueue *q) { + for (GList *l = q->head; l; l = l->next) { + struct codec_handler *h = l->data; + ssrc_hash_foreach(h->ssrc_hash, __ssrc_handler_stop); + } +} + + static struct ssrc_entry *__ssrc_handler_transcode_new(void *p) { struct codec_handler *h = p; @@ -1894,6 +2061,8 @@ static struct ssrc_entry *__ssrc_handler_transcode_new(void *p) { ch->bytes_per_packet = (ch->encoder->samples_per_packet ? : ch->encoder->samples_per_frame) * h->dest_pt.codec_def->bits_per_sample / 8; + __dtx_setup(ch); + ilog(LOG_DEBUG, "Encoder created with clockrate %i, %i channels, using sample format %i " "(ptime %i for %i samples per frame and %i samples (%i bytes) per packet, bitrate %i)", ch->encoder_format.clockrate, ch->encoder_format.channels, ch->encoder_format.format, @@ -1930,6 +2099,8 @@ static void __free_ssrc_handler(void *chp) { dtmf_rx_free(ch->dtmf_dsp); resample_shutdown(&ch->dtmf_resampler); g_queue_clear_full(&ch->dtmf_events, dtmf_event_free); + if (ch->dtx_buffer) + obj_put(&ch->dtx_buffer->ttq.tt_obj); } static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { @@ -2116,11 +2287,43 @@ static int packet_decoded_direct(decoder_t *decoder, AVFrame *frame, void *u1, v static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) { + int ret = 0; + if (!ch->first_ts) ch->first_ts = packet->ts; - int ret = decoder_input_data(ch->decoder, packet->payload, packet->ts, ch->handler->packet_decoded, ch, mp); - //mp->iter_in++; - mp->ssrc_out->parent->seq_diff--; + + if (ch->dtx_buffer && mp->sfd && mp->ssrc_in && mp->ssrc_out) { + ilog(LOG_DEBUG, "Adding packet to DTX buffer"); + + struct dtx_buffer *dtxb = ch->dtx_buffer; + unsigned long ts = packet->ts; + + mutex_lock(&dtxb->lock); + if (ts != dtxb->ts) + dtxb->ts = ts; + mutex_unlock(&dtxb->lock); + + struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe)); + dtxe->ttq_entry.when = rtpe_now; + timeval_add_usec(&dtxe->ttq_entry.when, rtpe_config.dtx_delay * 1000); + dtxe->packet = packet; + media_packet_copy(&dtxe->mp, mp); + timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); + // packet now consumed + packet = NULL; + + __dtx_add_callback(dtxb, &rtpe_now, (rtpe_config.dtx_delay + dtxb->ptime) * 1000, mp, ts, 1); + + ret = 1; + } + else { + ilog(LOG_DEBUG, "Decoding RTP packet now"); + ret = decoder_input_data(ch->decoder, packet->payload, packet->ts, ch->handler->packet_decoded, + ch, mp); + ret = ret ? -1 : 0; + mp->ssrc_out->parent->seq_diff--; + } + return ret; } diff --git a/daemon/main.c b/daemon/main.c index 81ed4b784..cfc862fca 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -459,6 +459,9 @@ static void options(int *argc, char ***argv) { { "https-cert", 0,0, G_OPTION_ARG_STRING, &rtpe_config.https_cert,"Certificate for HTTPS and WSS","FILE"}, { "https-key", 0,0, G_OPTION_ARG_STRING, &rtpe_config.https_key, "Private key for HTTPS and WSS","FILE"}, { "http-threads", 0,0, G_OPTION_ARG_INT, &rtpe_config.http_threads,"Number of worker threads for HTTP and WS","INT"}, +#ifdef WITH_TRANSCODING + { "dtx-delay", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_delay, "Delay in milliseconds to trigger DTX handling","INT"}, +#endif { NULL, } }; diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index 18c760d4d..13888f442 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -726,6 +726,18 @@ Number of worker threads for HTTP/HTTPS/WS/WSS. If not specified, then the same number as given under B will be used. If no HTTP listeners are enabled, then no threads are created. +=item B<--dtx-delay=>I + +Processing delay in milliseconds to handle discontinuous transmission (DTX) or +other transmission gaps for codecs that support it (currently only AMR and +AMR-WB). Defaults to zero (disabled) and applicable to transcoded audio streams +only. When enabled, delays processing of received packets for the specified +time (much like a jitter buffer) in order to trigger DTX handling when a +transmission gap occurs. The decoder is then instructed to fill in the missing +time during a transmission gap, for example by generating comfort noise. The +delay should be configured to just slightly more than the expected incoming +jitter. + =back =head1 INTERFACES diff --git a/include/codec.h b/include/codec.h index 469d91a88..c3aa41adb 100644 --- a/include/codec.h +++ b/include/codec.h @@ -100,6 +100,7 @@ void codec_decoder_skip_pts(struct codec_ssrc_handler *ch, uint64_t); uint64_t codec_decoder_unskip_pts(struct codec_ssrc_handler *ch); void codec_tracker_init(struct call_media *); void codec_tracker_finish(struct call_media *); +void codec_handlers_stop(GQueue *); #else @@ -108,6 +109,7 @@ INLINE void codec_handlers_update(struct call_media *receiver, struct call_media INLINE void codec_handler_free(struct codec_handler **handler) { } INLINE void codec_tracker_init(struct call_media *m) { } INLINE void codec_tracker_finish(struct call_media *m) { } +INLINE void codec_handlers_stop(GQueue *q) { } #endif diff --git a/include/main.h b/include/main.h index 8d35c6543..5ff524609 100644 --- a/include/main.h +++ b/include/main.h @@ -106,6 +106,7 @@ struct rtpengine_config { char *https_cert; char *https_key; int http_threads; + int dtx_delay; }; diff --git a/lib/codeclib.c b/lib/codeclib.c index 3ea482b2f..76b92ad3d 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -65,6 +65,8 @@ static int dtmf_decoder_input(decoder_t *dec, const str *data, GQueue *out); static int format_cmp_ignore(const struct rtp_payload_type *, const struct rtp_payload_type *); +static int amr_packet_lost(decoder_t *, GQueue *); + @@ -370,6 +372,7 @@ static codec_def_t __codec_defs[] = { .codec_type = &codec_type_amr, .set_enc_options = amr_set_enc_options, .set_dec_options = amr_set_dec_options, + .packet_lost = amr_packet_lost, }, { .rtpname = "AMR-WB", @@ -387,6 +390,7 @@ static codec_def_t __codec_defs[] = { .codec_type = &codec_type_amr, .set_enc_options = amr_set_enc_options, .set_dec_options = amr_set_dec_options, + .packet_lost = amr_packet_lost, }, { .rtpname = "telephone-event", @@ -1887,7 +1891,7 @@ static int amr_decoder_input(decoder_t *dec, const str *data, GQueue *out) { unsigned int bits = dec->codec_options.amr.bits_per_frame[ft]; - // AMR encoder expects an octet aligned TOC byte plus the payload + // AMR decoder expects an octet aligned TOC byte plus the payload unsigned char frame_buf[(bits + 7) / 8 + 1 + 1]; str frame = STR_CONST_INIT_BUF(frame_buf); str_shift(&frame, 1); @@ -2081,6 +2085,15 @@ static int packetizer_amr(AVPacket *pkt, GString *buf, str *output, encoder_t *e return 0; } +static int amr_packet_lost(decoder_t *dec, GQueue *out) { + ilog(LOG_DEBUG, "pushing empty/lost frame to AMR decoder"); + unsigned char frame_buf[1]; + frame_buf[0] = 0xf << 3; // no data + str frame = STR_CONST_INIT_BUF(frame_buf); + if (avc_decoder_input(dec, &frame, out)) + ilog(LOG_WARN | LOG_FLAG_LIMIT, "Error while writing 'no data' frame to AMR decoder"); + return 0; +} diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index 6e9debdef..e527adb64 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -36,6 +36,168 @@ my ($sock_a, $sock_b, $sock_c, $sock_d, $port_a, $port_b, $ssrc, $resp, +if (0) { + +# AMR SID, needs --cn-delay=.. + +($sock_a, $sock_b) = new_call([qw(198.51.100.10 4024)], [qw(198.51.100.10 4026)]); + +($port_a) = offer('AMR SID', + { ICE => 'remove', replace => ['origin'], codec => { transcode => ['PCMA'], + 'set' => ['AMR-WB/16000/1/23850'] } }, < PCM CMR', + { ICE => 'remove', replace => ['origin'] }, < ft() }); + + + +($sock_a, $sock_b) = new_call([qw(198.51.100.10 4026)], [qw(198.51.100.10 4028)]); + +($port_a) = offer('AMR SID TS gap', + { ICE => 'remove', replace => ['origin'], codec => { transcode => ['PCMA'], + 'set' => ['AMR-WB/16000/1/23850'] } }, < PCM CMR', + { ICE => 'remove', replace => ['origin'] }, < ft() }); + + + + +done_testing; +exit; + + +} + + + + if (0) { # GH 1098 @@ -1359,6 +1521,8 @@ rcv($sock_a, $port_b, rtpm(96, 1004, 4200, $ssrc, "\xf0\x14\x41\x00\x30\x44\x41\ } + + new_call; offer('DTMF-inject w tp-e', { diff --git a/t/transcode-test.c b/t/transcode-test.c index 8ac454569..ab484c57d 100644 --- a/t/transcode-test.c +++ b/t/transcode-test.c @@ -318,6 +318,7 @@ int main(void) { codeclib_init(0); srandom(time(NULL)); statistics_init(); + codecs_init(); // plain start();