diff --git a/daemon/codec.c b/daemon/codec.c index 66f155ab2..5dd94c354 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -182,12 +182,21 @@ struct silence_event { }; TYPED_GQUEUE(silence_event, struct silence_event) +struct transcode_job { + struct media_packet mp; + struct codec_ssrc_handler *ch; + struct codec_ssrc_handler *input_ch; + struct transcode_packet *packet; + bool done; // needed for in-order processing +}; +TYPED_GQUEUE(transcode_job, struct transcode_job); + struct codec_ssrc_handler { struct ssrc_entry h; // must be first struct codec_handler *handler; decoder_t *decoder; encoder_t *encoder; - codec_chain_t *chain; + codec_cc_t *chain; format_t encoder_format; int bitrate; int ptime; @@ -195,6 +204,7 @@ struct codec_ssrc_handler { struct codec_scheduler csch; GString *sample_buffer; struct dtx_buffer *dtx_buffer; + transcode_job_q async_jobs; // DTMF DSP stuff dtmf_rx_state_t *dtmf_dsp; @@ -244,6 +254,20 @@ struct rtcp_timer { +static mutex_t transcode_lock = MUTEX_STATIC_INIT; +static cond_t transcode_cond = COND_STATIC_INIT; +static transcode_job_q transcode_jobs = TYPED_GQUEUE_INIT; + +static tc_code (*__rtp_decode)(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch, + struct transcode_packet *packet, struct media_packet *mp); +static void transcode_job_free(struct transcode_job *j); +static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact); +static void packet_encoded_tx_seq_own(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact); + + + static codec_handler_func handler_func_passthrough_ssrc; static codec_handler_func handler_func_transcode; static codec_handler_func handler_func_playback; @@ -3635,6 +3659,7 @@ static void __ssrc_handler_stop(void *p, void *arg) { mutex_unlock(&ch->dtx_buffer->lock); dtx_buffer_stop(&ch->dtx_buffer); + codec_cc_stop(ch->chain); } } void codec_handlers_stop(codec_handlers_q *q, struct call_media *sink) { @@ -3751,6 +3776,40 @@ silence: +static void *async_chain_start(void *x, void *y, void *z) { + struct codec_ssrc_handler *ch = x; + struct codec_ssrc_handler *input_ch = y; + struct media_packet *mp = z; + + struct transcode_job *j = g_new0(__typeof(*j), 1); + //printf("call %p inc refs %p %p job %p\n", mp->call, ch, input_ch, j); + media_packet_copy(&j->mp, mp); + j->ch = obj_get(&ch->h); + j->input_ch = obj_get(&input_ch->h); + + return j; +} +static void async_chain_finish(AVPacket *pkt, void *async_cb_obj) { + struct transcode_job *j = async_cb_obj; + struct call *call = j->mp.call; + + gettimeofday(&rtpe_now, NULL); + + if (pkt) { + rwlock_lock_r(&call->master_lock); + __ssrc_lock_both(&j->mp); + + static const struct fraction chain_fact = {1,1}; + packet_encoded_packetize(pkt, j->ch, &j->mp, packetizer_passthrough, NULL, &chain_fact, + packet_encoded_tx_seq_own); + + __ssrc_unlock_both(&j->mp); + send_buffered(&j->mp, log_level_index_transcoding); + rwlock_unlock_r(&call->master_lock); + } + + transcode_job_free(j); +} static bool __ssrc_handler_decode_common(struct codec_ssrc_handler *ch, struct codec_handler *h, const format_t *enc_format) @@ -3816,9 +3875,9 @@ static struct ssrc_entry *__ssrc_handler_transcode_new(void *p) { // see if there's a complete codec chain usable for this if (!h->pcm_dtmf_detect) - ch->chain = codec_chain_new(h->source_pt.codec_def, &dec_format, + ch->chain = codec_cc_new(h->source_pt.codec_def, &dec_format, h->dest_pt.codec_def, &enc_format, - ch->bitrate, ch->ptime); + ch->bitrate, ch->ptime, async_chain_start, async_chain_finish); if (ch->chain) { ilogs(codec, LOG_DEBUG, "Using codec chain to transcode from " STR_FORMAT "/" STR_FORMAT @@ -3904,6 +3963,7 @@ static void __free_ssrc_handler(void *chp) { } while (going); encoder_free(ch->encoder); } + codec_cc_free(&ch->chain); if (ch->sample_buffer) g_string_free(ch->sample_buffer, TRUE); if (ch->dtmf_dsp) @@ -3911,6 +3971,7 @@ static void __free_ssrc_handler(void *chp) { resample_shutdown(&ch->dtmf_resampler); t_queue_clear_full(&ch->dtmf_events, dtmf_event_free); t_queue_clear_full(&ch->silence_events, silence_event_free); + t_queue_clear(&ch->async_jobs); dtx_buffer_stop(&ch->dtx_buffer); } @@ -3959,9 +4020,6 @@ void packet_encoded_packetize(AVPacket *pkt, struct codec_ssrc_handler *ch, stru } } -static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, - str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact); - static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { struct codec_ssrc_handler *ch = u1; struct media_packet *mp = u2; @@ -3975,8 +4033,33 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { return 0; } -static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, - str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact) +static void __codec_output_rtp_seq_passthrough(struct media_packet *mp, struct codec_scheduler *csch, + struct codec_handler *handler, + char *buf, // malloc'd, room for rtp_header + filled-in payload + unsigned int payload_len, + unsigned long payload_ts, + int marker, int payload_type, + unsigned long ts_delay) +{ + codec_output_rtp(mp, csch, handler, buf, payload_len, payload_ts, marker, -1, 0, payload_type, ts_delay); +} + +static void __codec_output_rtp_seq_own(struct media_packet *mp, struct codec_scheduler *csch, + struct codec_handler *handler, + char *buf, // malloc'd, room for rtp_header + filled-in payload + unsigned int payload_len, + unsigned long payload_ts, + int marker, int payload_type, + unsigned long ts_delay) +{ + // XXX this bypasses the send timer + codec_output_rtp(mp, csch, handler, buf, payload_len, payload_ts, marker, mp->ssrc_out->seq_out++, + 0, payload_type, ts_delay); +} + +static void __packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact, + __typeof(__codec_output_rtp_seq_passthrough) func) { // check special payloads @@ -4018,15 +4101,26 @@ static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, stru send_buf = malloc(pkt_len); memcpy(send_buf, buf, pkt_len); } - codec_output_rtp(mp, &ch->csch, ch->handler, send_buf, inout->len, ch->csch.first_ts + func(mp, &ch->csch, ch->handler, send_buf, inout->len, ch->csch.first_ts + fraction_divl(pkt->pts, cr_fact), - ch->rtp_mark ? 1 : 0, -1, 0, + ch->rtp_mark ? 1 : 0, payload_type, ts_delay); mp->ssrc_out->parent->seq_diff++; ch->rtp_mark = 0; } while (repeats--); } +static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact) +{ + __packet_encoded_tx(pkt, ch, mp, inout, buf, pkt_len, cr_fact, __codec_output_rtp_seq_passthrough); +} +static void packet_encoded_tx_seq_own(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len, const struct fraction *cr_fact) +{ + __packet_encoded_tx(pkt, ch, mp, inout, buf, pkt_len, cr_fact, __codec_output_rtp_seq_own); +} + static void __dtmf_detect(struct codec_ssrc_handler *ch, AVFrame *frame) { @@ -4165,18 +4259,24 @@ static int packet_decoded_audio_player(decoder_t *decoder, AVFrame *frame, void return 0; } -static tc_code __rtp_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch, +static tc_code __rtp_decode_direct(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch, struct transcode_packet *packet, struct media_packet *mp) { tc_code code = TCC_OK; if (packet) { +#ifdef HAVE_CODEC_CHAIN if (ch->chain) { +#else + if (false) { +#endif static const struct fraction chain_fact = {1,1}; - AVPacket *pkt = codec_chain_input_data(ch->chain, packet->payload, packet->ts); - assert(pkt != NULL); - packet_encoded_packetize(pkt, ch, mp, packetizer_passthrough, NULL, &chain_fact, - packet_encoded_tx); - av_packet_unref(pkt); + AVPacket *pkt = codec_cc_input_data(ch->chain, packet->payload, packet->ts, + /* x, y, z: */ ch, input_ch, mp); + if (pkt) { + packet_encoded_packetize(pkt, ch, mp, packetizer_passthrough, NULL, &chain_fact, + packet_encoded_tx); + av_packet_unref(pkt); + } } else { int ret = decoder_input_data_ptime(ch->decoder, packet->payload, packet->ts, &mp->ptime, @@ -4188,6 +4288,29 @@ static tc_code __rtp_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_han __buffer_delay_seq(input_ch->handler->delay_buffer, mp, -1); return code; } +static tc_code __rtp_decode_async(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch, + struct transcode_packet *packet, struct media_packet *mp) +{ + struct transcode_job *j = g_new(__typeof(*j), 1); + media_packet_copy(&j->mp, mp); + j->ch = obj_get(&ch->h); + j->input_ch = obj_get(&input_ch->h); + j->packet = packet; + j->done = false; + + // append-only here, with the SSRC handler locked + t_queue_push_tail(&ch->async_jobs, j); + + // if this is the first job for this SSRC handler, notify async worker + if (ch->async_jobs.length == 1) { + LOCK(&transcode_lock); + t_queue_push_tail(&transcode_jobs, j); + cond_signal(&transcode_cond); + } + + return TCC_CONSUMED; +} + static tc_code packet_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handler *input_ch, struct transcode_packet *packet, struct media_packet *mp) { @@ -5667,8 +5790,104 @@ static void codec_timers_run(void *p) { ct->timer_func(ct); } +#ifdef WITH_TRANSCODING +static void transcode_job_free(struct transcode_job *j) { + media_packet_release(&j->mp); + obj_put(&j->ch->h); + obj_put(&j->input_ch->h); + if (j->packet) + __transcode_packet_free(j->packet); + g_free(j); +} + +static void transcode_job_do(struct transcode_job *ref_j) { + struct call *call = ref_j->mp.call; + + rwlock_lock_r(&call->master_lock); + __ssrc_lock_both(&ref_j->mp); + + // the first job in the queue must be the one that was given to async worker + transcode_job_list *list = ref_j->ch->async_jobs.head; + // given: // assert(list->data == ref_j); + + do { + // nothing can remove entries while we're running. prepare to run job + __ssrc_unlock_both(&ref_j->mp); + + struct transcode_job *j = list->data; + + __ssrc_lock_both(&j->mp); + + tc_code ret = __rtp_decode_direct(j->ch, j->input_ch, j->packet, &j->mp); + if (ret == TCC_CONSUMED) + j->packet = NULL; + + // unlock and send + __ssrc_unlock_both(&j->mp); + send_buffered(&j->mp, log_level_index_transcoding); + + // reacquire primary lock and see if we're done. new jobs might have been + // added in the meantime. + __ssrc_lock_both(&ref_j->mp); + list = list->next; + } + while (list); + + // we've reached the end of the list while holding the SSRC handler lock. + // we will run no more jobs here. we take over the list for cleanup and + // then release the lock, guaranteeing that anything added afterwards will + // run later and will result in a new job given to the async worker threads. + transcode_job_q q = ref_j->ch->async_jobs; + t_queue_init(&ref_j->ch->async_jobs); + __ssrc_unlock_both(&ref_j->mp); + + while ((ref_j = t_queue_pop_head(&q))) + transcode_job_free(ref_j); + + rwlock_unlock_r(&call->master_lock); +} + +static void codec_worker(void *d) { + struct thread_waker waker = { .lock = &transcode_lock, .cond = &transcode_cond }; + thread_waker_add(&waker); + + mutex_lock(&transcode_lock); + + while (!rtpe_shutdown) { + // wait once, but then loop in case of shutdown + if (transcode_jobs.length == 0) + cond_wait(&transcode_cond, &transcode_lock); + if (transcode_jobs.length == 0) + continue; + + struct transcode_job *j = t_queue_pop_head(&transcode_jobs); + + mutex_unlock(&transcode_lock); + + gettimeofday(&rtpe_now, NULL); + transcode_job_do(j); + + mutex_lock(&transcode_lock); + } + + mutex_unlock(&transcode_lock); + thread_waker_del(&waker); +} +#endif + void codecs_init(void) { timerthread_init(&codec_timers_thread, rtpe_config.media_num_threads, codec_timers_run); + +#ifdef WITH_TRANSCODING + if (rtpe_config.codec_num_threads) { + for (unsigned int i = 0; i < rtpe_config.codec_num_threads; i++) + thread_create_detach(codec_worker, NULL, "transcode"); + + __rtp_decode = __rtp_decode_async; + } + else + __rtp_decode = __rtp_decode_direct; +#endif } void codecs_cleanup(void) { timerthread_free(&codec_timers_thread); diff --git a/daemon/main.c b/daemon/main.c index 59c98f048..885c14ef7 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -568,6 +568,9 @@ static void options(int *argc, char ***argv) { { "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &rtpe_config.fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only, 2: Kamailio", "INT" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.num_threads, "Number of worker threads to create", "INT" }, { "media-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.media_num_threads, "Number of worker threads for media playback", "INT" }, +#ifdef WITH_TRANSCODING + { "codec-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.codec_num_threads, "Number of transcoding threads for asynchronous operation", "INT" }, +#endif { "delete-delay", 'd', 0, G_OPTION_ARG_INT, &rtpe_config.delete_delay, "Delay for deleting a session from memory.", "INT" }, { "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL }, { "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL }, diff --git a/daemon/ssrc.c b/daemon/ssrc.c index b23cad4ff..42c8fd0ce 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -15,6 +15,7 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { payload_tracker_init(&c->tracker); while (!c->ssrc_map_out) c->ssrc_map_out = ssl_random(); + c->seq_out = ssl_random(); atomic64_set_na(&c->last_sample, ssrc_timeval_to_ts(&rtpe_now)); } static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) { diff --git a/docs/rtpengine.md b/docs/rtpengine.md index 94eb99d3f..a94d4ddec 100644 --- a/docs/rtpengine.md +++ b/docs/rtpengine.md @@ -443,6 +443,12 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp- So for example, if this option is set to 4, in total 8 threads will be launched. +- __\-\-codec-num-threads=__*INT* + + Enables asynchroneous transcoding operation using the specified number of + worker threads. This is an experimental feature and probably doesn't bring + any benefits over normal synchroneous transcoding. + - __\-\-poller-size=__*INT* Set the maximum number of event items (file descriptors) to retrieve from diff --git a/include/main.h b/include/main.h index bdc68868f..c86410275 100644 --- a/include/main.h +++ b/include/main.h @@ -90,6 +90,7 @@ struct rtpengine_config { gboolean active_switchover; int num_threads; int media_num_threads; + int codec_num_threads; char *spooldir; char *rec_method; char *rec_format; diff --git a/include/ssrc.h b/include/ssrc.h index 57c0cab69..3f8ba08c8 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -48,6 +48,7 @@ struct ssrc_ctx { // for transcoding uint32_t ssrc_map_out; + uint16_t seq_out; // RTCP stats atomic64 packets, diff --git a/lib/auxlib.c b/lib/auxlib.c index f181dd788..34c950097 100644 --- a/lib/auxlib.c +++ b/lib/auxlib.c @@ -202,6 +202,9 @@ void config_load(int *argc, char ***argv, GOptionEntry *app_entries, const char { "evs-lib-path", 0,0, G_OPTION_ARG_FILENAME, &rtpe_common_config_ptr->evs_lib_path, "Location of .so for 3GPP EVS codec", "FILE" }, #ifdef HAVE_CODEC_CHAIN { "codec-chain-lib-path",0,0, G_OPTION_ARG_FILENAME, &rtpe_common_config_ptr->codec_chain_lib_path,"Location of libcodec-chain.so", "FILE" }, + { "codec-chain-runners",0,0, G_OPTION_ARG_INT, &rtpe_common_config_ptr->codec_chain_runners,"Number of chain runners per codec","INT" }, + { "codec-chain-concurrency",0,0,G_OPTION_ARG_INT, &rtpe_common_config_ptr->codec_chain_concurrency,"Max concurrent codec jobs per runner","INT" }, + { "codec-chain-async",0,0, G_OPTION_ARG_INT, &rtpe_common_config_ptr->codec_chain_async,"Number of background callback threads","INT" }, #endif { NULL, } }; @@ -386,6 +389,17 @@ out: if (rtpe_common_config_ptr->poller_size <= 0) rtpe_common_config_ptr->poller_size = 128; +#ifdef HAVE_CODEC_CHAIN + if (rtpe_common_config_ptr->codec_chain_runners <= 0) + rtpe_common_config_ptr->codec_chain_runners = 4; + + if (rtpe_common_config_ptr->codec_chain_concurrency <= 0) + rtpe_common_config_ptr->codec_chain_concurrency = 256; + + if (rtpe_common_config_ptr->codec_chain_async < 0) + rtpe_common_config_ptr->codec_chain_async = 0; +#endif + return; err: diff --git a/lib/auxlib.h b/lib/auxlib.h index 1ba77b4d3..95bc83391 100644 --- a/lib/auxlib.h +++ b/lib/auxlib.h @@ -37,6 +37,9 @@ struct rtpengine_common_config { int max_log_line_length; char *evs_lib_path; char *codec_chain_lib_path; + int codec_chain_runners; + int codec_chain_concurrency; + int codec_chain_async; }; extern struct rtpengine_common_config *rtpe_common_config_ptr; diff --git a/lib/codeclib.c b/lib/codeclib.c index e02479e22..0736198fb 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -143,7 +143,7 @@ static select_encoder_format_f evs_select_encoder_format; -static void *codec_chain_lib_handle; +static void *cc_lib_handle; #ifdef HAVE_CODEC_CHAIN @@ -154,14 +154,27 @@ static __typeof__(codec_chain_client_pcmu2opus_runner_new) *cc_client_pcmu2opus_ static __typeof__(codec_chain_client_opus2pcma_runner_new) *cc_client_opus2pcma_runner_new; static __typeof__(codec_chain_client_opus2pcmu_runner_new) *cc_client_opus2pcmu_runner_new; +static __typeof__(codec_chain_client_pcma2opus_async_runner_new) *cc_client_pcma2opus_async_runner_new; +static __typeof__(codec_chain_client_pcmu2opus_async_runner_new) *cc_client_pcmu2opus_async_runner_new; +static __typeof__(codec_chain_client_opus2pcma_async_runner_new) *cc_client_opus2pcma_async_runner_new; +static __typeof__(codec_chain_client_opus2pcmu_async_runner_new) *cc_client_opus2pcmu_async_runner_new; + static __typeof__(codec_chain_pcma2opus_runner_do) *cc_pcma2opus_runner_do; static __typeof__(codec_chain_pcmu2opus_runner_do) *cc_pcmu2opus_runner_do; static __typeof__(codec_chain_opus2pcma_runner_do) *cc_opus2pcma_runner_do; static __typeof__(codec_chain_opus2pcmu_runner_do) *cc_opus2pcmu_runner_do; +static __typeof__(codec_chain_pcma2opus_runner_async_do_nonblock) *cc_pcma2opus_runner_async_do_nonblock; +static __typeof__(codec_chain_pcmu2opus_runner_async_do_nonblock) *cc_pcmu2opus_runner_async_do_nonblock; +static __typeof__(codec_chain_opus2pcma_runner_async_do_nonblock) *cc_opus2pcma_runner_async_do_nonblock; +static __typeof__(codec_chain_opus2pcmu_runner_async_do_nonblock) *cc_opus2pcmu_runner_async_do_nonblock; + static __typeof__(codec_chain_client_float2opus_new) *cc_client_float2opus_new; static __typeof__(codec_chain_client_opus2float_new) *cc_client_opus2float_new; +static __typeof__(codec_chain_client_float2opus_free) *cc_client_float2opus_free; +static __typeof__(codec_chain_client_opus2float_free) *cc_client_opus2float_free; + static codec_chain_client *cc_client; static codec_chain_pcma2opus_runner *pcma2opus_runner; @@ -169,7 +182,25 @@ static codec_chain_pcmu2opus_runner *pcmu2opus_runner; static codec_chain_opus2pcmu_runner *opus2pcmu_runner; static codec_chain_opus2pcma_runner *opus2pcma_runner; -struct codec_chain_s { +static codec_chain_pcma2opus_async_runner *pcma2opus_async_runner; +static codec_chain_pcmu2opus_async_runner *pcmu2opus_async_runner; +static codec_chain_opus2pcmu_async_runner *opus2pcmu_async_runner; +static codec_chain_opus2pcma_async_runner *opus2pcma_async_runner; + +typedef enum { + CCC_OK, + CCC_ASYNC, + CCC_ERR, +} codec_cc_state; + +struct async_job { + str data; + unsigned long ts; + void *async_cb_obj; +}; +TYPED_GQUEUE(async_job, struct async_job); + +struct codec_cc_s { union { struct { codec_chain_pcmu2opus_runner *runner; @@ -187,12 +218,61 @@ struct codec_chain_s { codec_chain_opus2pcma_runner *runner; codec_chain_opus2float *dec; } opus2pcma; - } u; + struct { + codec_chain_pcmu2opus_async_runner *runner; + codec_chain_float2opus *enc; + } pcmu2opus_async; + struct { + codec_chain_pcma2opus_async_runner *runner; + codec_chain_float2opus *enc; + } pcma2opus_async; + struct { + codec_chain_opus2pcmu_async_runner *runner; + codec_chain_opus2float *dec; + } opus2pcmu_async; + struct { + codec_chain_opus2pcma_async_runner *runner; + codec_chain_opus2float *dec; + } opus2pcma_async; + }; AVPacket *avpkt; - int (*run)(codec_chain_t *c, const str *data, unsigned long ts, AVPacket *); + codec_cc_state (*run)(codec_cc_t *c, const str *data, unsigned long ts, void *); + void (*clear)(void *); + void *clear_arg; + + mutex_t async_lock; + AVPacket *avpkt_async; + size_t data_len; + bool async_busy; // currently processing a packet + bool async_blocked; // couldn't find context + bool async_shutdown; // shutdown/free happened while busy + async_job_q async_jobs; + unsigned long ts; + void *(*async_init)(void *, void *, void *); + void (*async_callback)(AVPacket *, void *); + void *async_cb_obj; }; -#endif +static codec_cc_t *codec_cc_new_sync(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*async_init)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)); +static codec_cc_t *codec_cc_new_async(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*async_init)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)); + +static bool __cc_pcmu2opus_run_async(codec_cc_t *, const str *, unsigned long, void *); +static bool __cc_pcma2opus_run_async(codec_cc_t *, const str *, unsigned long, void *); +static bool __cc_opus2pcma_run_async(codec_cc_t *, const str *, unsigned long, void *); +static bool __cc_opus2pcmu_run_async(codec_cc_t *, const str *, unsigned long, void *); + +codec_cc_t *(*codec_cc_new)(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*async_init)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)); + +#endif @@ -1228,8 +1308,8 @@ void codeclib_free(void) { avformat_network_deinit(); if (evs_lib_handle) dlclose(evs_lib_handle); - if (codec_chain_lib_handle) - dlclose(codec_chain_lib_handle); + if (cc_lib_handle) + dlclose(cc_lib_handle); } @@ -1279,34 +1359,164 @@ static void *dlsym_assert(void *handle, const char *sym, const char *fn) { #ifdef HAVE_CODEC_CHAIN -static void codec_chain_dlsym_resolve(const char *fn) { - cc_client_connect = dlsym_assert(codec_chain_lib_handle, "codec_chain_client_connect", fn); +static void cc_dlsym_resolve(const char *fn) { + cc_client_connect = dlsym_assert(cc_lib_handle, "codec_chain_client_connect", fn); - cc_client_pcma2opus_runner_new = dlsym_assert(codec_chain_lib_handle, + cc_client_pcma2opus_runner_new = dlsym_assert(cc_lib_handle, "codec_chain_client_pcma2opus_runner_new", fn); - cc_client_pcmu2opus_runner_new = dlsym_assert(codec_chain_lib_handle, + cc_client_pcmu2opus_runner_new = dlsym_assert(cc_lib_handle, "codec_chain_client_pcmu2opus_runner_new", fn); - cc_client_opus2pcma_runner_new = dlsym_assert(codec_chain_lib_handle, + cc_client_opus2pcma_runner_new = dlsym_assert(cc_lib_handle, "codec_chain_client_opus2pcma_runner_new", fn); - cc_client_opus2pcmu_runner_new = dlsym_assert(codec_chain_lib_handle, + cc_client_opus2pcmu_runner_new = dlsym_assert(cc_lib_handle, "codec_chain_client_opus2pcmu_runner_new", fn); - cc_pcma2opus_runner_do = dlsym_assert(codec_chain_lib_handle, + cc_client_pcma2opus_async_runner_new = dlsym_assert(cc_lib_handle, + "codec_chain_client_pcma2opus_async_runner_new", fn); + cc_client_pcmu2opus_async_runner_new = dlsym_assert(cc_lib_handle, + "codec_chain_client_pcmu2opus_async_runner_new", fn); + cc_client_opus2pcma_async_runner_new = dlsym_assert(cc_lib_handle, + "codec_chain_client_opus2pcma_async_runner_new", fn); + cc_client_opus2pcmu_async_runner_new = dlsym_assert(cc_lib_handle, + "codec_chain_client_opus2pcmu_async_runner_new", fn); + + cc_pcma2opus_runner_do = dlsym_assert(cc_lib_handle, "codec_chain_pcma2opus_runner_do", fn); - cc_pcmu2opus_runner_do = dlsym_assert(codec_chain_lib_handle, + cc_pcmu2opus_runner_do = dlsym_assert(cc_lib_handle, "codec_chain_pcmu2opus_runner_do", fn); - cc_opus2pcma_runner_do = dlsym_assert(codec_chain_lib_handle, + cc_opus2pcma_runner_do = dlsym_assert(cc_lib_handle, "codec_chain_opus2pcma_runner_do", fn); - cc_opus2pcmu_runner_do = dlsym_assert(codec_chain_lib_handle, + cc_opus2pcmu_runner_do = dlsym_assert(cc_lib_handle, "codec_chain_opus2pcmu_runner_do", fn); - cc_client_float2opus_new = dlsym_assert(codec_chain_lib_handle, + cc_pcma2opus_runner_async_do_nonblock = dlsym_assert(cc_lib_handle, + "codec_chain_pcma2opus_runner_async_do_nonblock", fn); + cc_pcmu2opus_runner_async_do_nonblock = dlsym_assert(cc_lib_handle, + "codec_chain_pcmu2opus_runner_async_do_nonblock", fn); + cc_opus2pcma_runner_async_do_nonblock = dlsym_assert(cc_lib_handle, + "codec_chain_opus2pcma_runner_async_do_nonblock", fn); + cc_opus2pcmu_runner_async_do_nonblock = dlsym_assert(cc_lib_handle, + "codec_chain_opus2pcmu_runner_async_do_nonblock", fn); + + cc_client_float2opus_new = dlsym_assert(cc_lib_handle, "codec_chain_client_float2opus_new", fn); - cc_client_opus2float_new = dlsym_assert(codec_chain_lib_handle, + cc_client_opus2float_new = dlsym_assert(cc_lib_handle, "codec_chain_client_opus2float_new", fn); + + cc_client_float2opus_free = dlsym_assert(cc_lib_handle, + "codec_chain_client_float2opus_free", fn); + cc_client_opus2float_free = dlsym_assert(cc_lib_handle, + "codec_chain_client_opus2float_free", fn); +} + +static void cc_create_runners(void) { + pcma2opus_runner = cc_client_pcma2opus_runner_new(cc_client, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!pcma2opus_runner) + die("Failed to initialise GPU pcma2opus"); + + pcmu2opus_runner = cc_client_pcmu2opus_runner_new(cc_client, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!pcmu2opus_runner) + die("Failed to initialise GPU pcmu2opus"); + + opus2pcmu_runner = cc_client_opus2pcmu_runner_new(cc_client, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!opus2pcmu_runner) + die("Failed to initialise GPU opus2pcmu"); + + opus2pcma_runner = cc_client_opus2pcma_runner_new(cc_client, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!opus2pcma_runner) + die("Failed to initialise GPU opus2pcma"); +} + +static void cc_create_async_runners(void) { + pcma2opus_async_runner = cc_client_pcma2opus_async_runner_new(cc_client, + rtpe_common_config_ptr->codec_chain_async, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!pcma2opus_async_runner) + die("Failed to initialise GPU pcma2opus"); + + pcmu2opus_async_runner = cc_client_pcmu2opus_async_runner_new(cc_client, + rtpe_common_config_ptr->codec_chain_async, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!pcmu2opus_async_runner) + die("Failed to initialise GPU pcmu2opus"); + + opus2pcmu_async_runner = cc_client_opus2pcmu_async_runner_new(cc_client, + rtpe_common_config_ptr->codec_chain_async, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!opus2pcmu_async_runner) + die("Failed to initialise GPU opus2pcmu"); + + opus2pcma_async_runner = cc_client_opus2pcma_async_runner_new(cc_client, + rtpe_common_config_ptr->codec_chain_async, + 10000, + rtpe_common_config_ptr->codec_chain_runners, + rtpe_common_config_ptr->codec_chain_concurrency, 160); + if (!opus2pcma_async_runner) + die("Failed to initialise GPU opus2pcma"); +} + + +static codec_cc_t *codec_cc_new_dummy(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*async_init)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)) +{ + return NULL; } -#endif +static void cc_init(void) { + codec_cc_new = codec_cc_new_dummy; + + if (!rtpe_common_config_ptr->codec_chain_lib_path) + return; + + cc_lib_handle = dlopen(rtpe_common_config_ptr->codec_chain_lib_path, RTLD_NOW | RTLD_LOCAL); + if (!cc_lib_handle) + die("Failed to load libcodec-chain.so '%s': %s", + rtpe_common_config_ptr->codec_chain_lib_path, + dlerror()); + + cc_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path); + + cc_client = cc_client_connect(4); + if (!cc_client) + die("Failed to connect to cudecsd"); + + if (!rtpe_common_config_ptr->codec_chain_async) { + cc_create_runners(); + codec_cc_new = codec_cc_new_sync; + } + else { + cc_create_async_runners(); + codec_cc_new = codec_cc_new_async; + } + + ilog(LOG_DEBUG, "CUDA codecs initialised"); +} + +#else + +static void cc_init(void) { } + +#endif void codeclib_init(int print) { #if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58, 9, 100) @@ -1320,39 +1530,7 @@ void codeclib_init(int print) { codecs_ht = g_hash_table_new(str_case_hash, str_case_equal); codecs_ht_by_av = g_hash_table_new(g_direct_hash, g_direct_equal); -#ifdef HAVE_CODEC_CHAIN - if (rtpe_common_config_ptr->codec_chain_lib_path) { - codec_chain_lib_handle = dlopen(rtpe_common_config_ptr->codec_chain_lib_path, RTLD_NOW | RTLD_LOCAL); - if (!codec_chain_lib_handle) - die("Failed to load libcodec-chain.so '%s': %s", - rtpe_common_config_ptr->codec_chain_lib_path, - dlerror()); - - codec_chain_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path); - - cc_client = cc_client_connect(4); - if (!cc_client) - die("Failed to connect to cudecsd"); - - pcma2opus_runner = cc_client_pcma2opus_runner_new(cc_client, 4, 3000, 160); - if (!pcma2opus_runner) - die("Failed to initialise GPU pcma2opus"); - - pcmu2opus_runner = cc_client_pcmu2opus_runner_new(cc_client, 4, 3000, 160); - if (!pcmu2opus_runner) - die("Failed to initialise GPU pcmu2opus"); - - opus2pcmu_runner = cc_client_opus2pcmu_runner_new(cc_client, 4, 3000, 160); - if (!opus2pcmu_runner) - die("Failed to initialise GPU opus2pcmu"); - - opus2pcma_runner = cc_client_opus2pcma_runner_new(cc_client, 4, 3000, 160); - if (!opus2pcma_runner) - die("Failed to initialise GPU opus2pcma"); - - ilog(LOG_DEBUG, "CUDA codecs initialised"); - } -#endif + cc_init(); for (int i = 0; i < G_N_ELEMENTS(__codec_defs); i++) { // add to hash table @@ -4672,65 +4850,315 @@ static int evs_dtx(decoder_t *dec, GQueue *out, int ptime) { #ifdef HAVE_CODEC_CHAIN -int codec_chain_pcmu2opus_run(codec_chain_t *c, const str *data, unsigned long ts, AVPacket *pkt) { - ssize_t ret = cc_pcmu2opus_runner_do(c->u.pcmu2opus.runner, c->u.pcmu2opus.enc, +codec_cc_state cc_pcmu2opus_run(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt; + ssize_t ret = cc_pcmu2opus_runner_do(c->pcmu2opus.runner, c->pcmu2opus.enc, (unsigned char *) data->s, data->len, pkt->data, pkt->size); - assert(ret > 0); + assert(ret > 0); // XXX handle errors XXX handle input frame sizes != 160 pkt->size = ret; pkt->duration = data->len * 6L; pkt->pts = ts * 6L; - return 0; + return CCC_OK; } -int codec_chain_pcma2opus_run(codec_chain_t *c, const str *data, unsigned long ts, AVPacket *pkt) { - ssize_t ret = cc_pcma2opus_runner_do(c->u.pcma2opus.runner, c->u.pcma2opus.enc, +codec_cc_state cc_pcma2opus_run(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt; + ssize_t ret = cc_pcma2opus_runner_do(c->pcma2opus.runner, c->pcma2opus.enc, (unsigned char *) data->s, data->len, pkt->data, pkt->size); - assert(ret > 0); + assert(ret > 0); // XXX handle errors XXX handle input frame sizes != 160 pkt->size = ret; pkt->duration = data->len * 6L; pkt->pts = ts * 6L; - return 0; + return CCC_OK; } -int codec_chain_opus2pcmu_run(codec_chain_t *c, const str *data, unsigned long ts, AVPacket *pkt) { - ssize_t ret = cc_opus2pcmu_runner_do(c->u.opus2pcmu.runner, c->u.opus2pcmu.dec, +codec_cc_state cc_opus2pcmu_run(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt; + ssize_t ret = cc_opus2pcmu_runner_do(c->opus2pcmu.runner, c->opus2pcmu.dec, (unsigned char *) data->s, data->len, pkt->data, pkt->size); - assert(ret > 0); + assert(ret > 0); // XXX handle errors pkt->size = ret; pkt->duration = ret; pkt->pts = ts / 6L; - return 0; + return CCC_OK; } -int codec_chain_opus2pcma_run(codec_chain_t *c, const str *data, unsigned long ts, AVPacket *pkt) { - ssize_t ret = cc_opus2pcma_runner_do(c->u.opus2pcma.runner, c->u.opus2pcma.dec, +codec_cc_state cc_opus2pcma_run(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt; + ssize_t ret = cc_opus2pcma_runner_do(c->opus2pcma.runner, c->opus2pcma.dec, (unsigned char *) data->s, data->len, pkt->data, pkt->size); - assert(ret > 0); + assert(ret > 0); // XXX handle errors pkt->size = ret; pkt->duration = ret; pkt->pts = ts / 6L; - return 0; + return CCC_OK; } -#endif +static void __cc_async_job_free(struct async_job *j) { + g_free(j->data.s); + g_free(j); +} + +static void __codec_cc_free(codec_cc_t *c) { + c->clear(c->clear_arg); + while (c->async_jobs.length) { + __auto_type j = t_queue_pop_head(&c->async_jobs); + c->async_callback(NULL, j->async_cb_obj); + __cc_async_job_free(j); + } + av_packet_free(&c->avpkt); + av_packet_free(&c->avpkt_async); + g_slice_free1(sizeof(*c), c); +} -codec_chain_t *codec_chain_new(codec_def_t *src, format_t *src_format, codec_def_t *dst, - format_t *dst_format, int bitrate, int ptime) +// lock must be held +// append job to queue +static void __cc_async_do_add_queue(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + struct async_job *j = g_new0(__typeof__(*j), 1); + str_init_dup_str(&j->data, data); + j->async_cb_obj = async_cb_obj; + j->ts = ts; + t_queue_push_tail(&c->async_jobs, j); +} +// check busy flag and append to queue if set +// if not busy, sets busy flag +// also check blocked flag if busy: if set, try running first job +static bool __cc_async_check_busy_blocked_queue(codec_cc_t *c, const str *data, unsigned long ts, + void *async_cb_obj, __typeof__(__cc_pcmu2opus_run_async) run_async) +{ + struct async_job *j = NULL; + + { + LOCK(&c->async_lock); + + if (!c->async_busy) { + // we can try running + c->async_busy = true; + return false; + } + + // codec is busy (either currently running or was blocked) + // append to queue + __cc_async_do_add_queue(c, data, ts, async_cb_obj); + + // if we were blocked (not currently running), try running now + if (c->async_blocked) + j = t_queue_pop_head(&c->async_jobs); + } + + if (j) { + if (!run_async(c, &j->data, j->ts, j->async_cb_obj)) { + // still blocked. return to queue + LOCK(&c->async_lock); + t_queue_push_head(&c->async_jobs, j); + } + else { + // unblocked, running now + __cc_async_job_free(j); + LOCK(&c->async_lock); + c->async_blocked = false; + } + } + + return true; +} +// runner failed, needed to block (no available context) +// set blocked flag and append to queue +// queue is guaranteed to be empty +static void __cc_async_blocked_queue(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + LOCK(&c->async_lock); + __cc_async_do_add_queue(c, data, ts, async_cb_obj); + c->async_blocked = true; + // busy == true +} + +static codec_cc_state cc_X_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj, + __typeof__(__cc_pcmu2opus_run_async) run_async) +{ + if (__cc_async_check_busy_blocked_queue(c, data, ts, async_cb_obj, run_async)) + return CCC_ASYNC; + if (!run_async(c, data, ts, async_cb_obj)) + __cc_async_blocked_queue(c, data, ts, async_cb_obj); + return CCC_ASYNC; +} + +static void cc_X_pkt_callback(codec_cc_t *c, int size, __typeof__(__cc_pcmu2opus_run_async) run_async) { + AVPacket *pkt = c->avpkt_async; + void *async_cb_obj = c->async_cb_obj; + c->async_cb_obj = NULL; + + c->async_callback(pkt, async_cb_obj); + + pkt->size = 0; + + struct async_job *j = NULL; + bool shutdown = false; + { + LOCK(&c->async_lock); + j = t_queue_pop_head(&c->async_jobs); + if (!j) { + if (c->async_shutdown) + shutdown = true; + else + c->async_busy = false; + } + } + + if (shutdown) { + __codec_cc_free(c); + return; + } + + if (j) { + if (!run_async(c, &j->data, j->ts, j->async_cb_obj)) { + LOCK(&c->async_lock); + t_queue_push_head(&c->async_jobs, j); + c->async_blocked = true; + } + else { + g_free(j->data.s); + g_free(j); + LOCK(&c->async_lock); + c->async_blocked = false; + } + } +} + +static void cc_pcmX2opus_run_callback(void *p, int size, __typeof__(__cc_pcmu2opus_run_async) run_async) { + codec_cc_t *c = p; + + assert(size > 0); // XXX handle errors XXX handle input frame sizes != 160 + + AVPacket *pkt = c->avpkt_async; + + pkt->size = size; + pkt->duration = c->data_len * 6L; + pkt->pts = c->ts * 6L; + + cc_X_pkt_callback(c, size, run_async); +} + +static void cc_pcmu2opus_run_callback(void *p, int size) { + cc_pcmX2opus_run_callback(p, size, __cc_pcmu2opus_run_async); +} +static bool __cc_pcmu2opus_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt_async; + pkt->size = MAX_OPUS_FRAME_SIZE * MAX_OPUS_FRAMES_PER_PACKET + MAX_OPUS_HEADER_SIZE; + + c->data_len = data->len; + c->ts = ts; + c->async_cb_obj = async_cb_obj; + + return cc_pcmu2opus_runner_async_do_nonblock(c->pcmu2opus_async.runner, c->pcmu2opus.enc, + (unsigned char *) data->s, data->len, + pkt->data, pkt->size, cc_pcmu2opus_run_callback, c); +} +codec_cc_state cc_pcmu2opus_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + return cc_X_run_async(c, data, ts, async_cb_obj, __cc_pcmu2opus_run_async); +} + +static void cc_pcma2opus_run_callback(void *p, int size) { + cc_pcmX2opus_run_callback(p, size, __cc_pcma2opus_run_async); +} +static bool __cc_pcma2opus_run_async(codec_cc_t *c, const str *data, unsigned long ts, + void *async_cb_obj) +{ + AVPacket *pkt = c->avpkt_async; + pkt->size = MAX_OPUS_FRAME_SIZE * MAX_OPUS_FRAMES_PER_PACKET + MAX_OPUS_HEADER_SIZE; + + c->data_len = data->len; + c->ts = ts; + c->async_cb_obj = async_cb_obj; + + return cc_pcma2opus_runner_async_do_nonblock(c->pcma2opus_async.runner, c->pcma2opus.enc, + (unsigned char *) data->s, data->len, + pkt->data, pkt->size, cc_pcma2opus_run_callback, c); +} +codec_cc_state cc_pcma2opus_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + return cc_X_run_async(c, data, ts, async_cb_obj, __cc_pcma2opus_run_async); +} + +static void cc_opus2pcmX_run_callback(void *p, int size, __typeof__(__cc_opus2pcma_run_async) run_async) { + codec_cc_t *c = p; + + assert(size > 0); // XXX handle errors + + AVPacket *pkt = c->avpkt_async; + + pkt->size = size; + pkt->duration = size; + pkt->pts = c->ts / 6L; + + cc_X_pkt_callback(c, size, run_async); +} + +static void cc_opus2pcmu_run_callback(void *p, int size) { + cc_opus2pcmX_run_callback(p, size, __cc_opus2pcmu_run_async); +} +static bool __cc_opus2pcmu_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt_async; + pkt->size = 960; + + c->data_len = data->len; + c->ts = ts; + c->async_cb_obj = async_cb_obj; + + return cc_opus2pcmu_runner_async_do_nonblock(c->opus2pcmu_async.runner, c->opus2pcmu.dec, + (unsigned char *) data->s, data->len, + pkt->data, pkt->size, cc_opus2pcmu_run_callback, c); +} +codec_cc_state cc_opus2pcmu_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + return cc_X_run_async(c, data, ts, async_cb_obj, __cc_opus2pcmu_run_async); +} + +static void cc_opus2pcma_run_callback(void *p, int size) { + return cc_opus2pcmX_run_callback(p, size, __cc_opus2pcma_run_async); +} +static bool __cc_opus2pcma_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + AVPacket *pkt = c->avpkt_async; + pkt->size = 960; + + c->data_len = data->len; + c->ts = ts; + c->async_cb_obj = async_cb_obj; + + return cc_opus2pcma_runner_async_do_nonblock(c->opus2pcma_async.runner, c->opus2pcma.dec, + (unsigned char *) data->s, data->len, + pkt->data, pkt->size, cc_opus2pcma_run_callback, c); +} +codec_cc_state cc_opus2pcma_run_async(codec_cc_t *c, const str *data, unsigned long ts, void *async_cb_obj) { + return cc_X_run_async(c, data, ts, async_cb_obj, __cc_opus2pcma_run_async); +} + + + +static void cc_float2opus_clear(void *a) { + codec_chain_float2opus *enc = a; + cc_client_float2opus_free(cc_client, enc); +} +static void cc_opus2float_clear(void *a) { + codec_chain_opus2float *dec = a; + cc_client_opus2float_free(cc_client, dec); +} + +static codec_cc_t *codec_cc_new_sync(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*async_init)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)) { -#ifdef HAVE_CODEC_CHAIN if (!strcmp(dst->rtpname, "opus") && !strcmp(src->rtpname, "PCMA")) { if (src_format->clockrate != 8000) return NULL; @@ -4744,11 +5172,13 @@ codec_chain_t *codec_chain_new(codec_def_t *src, format_t *src_format, codec_def if (!pcma2opus_runner) return NULL; - codec_chain_t *ret = g_slice_alloc0(sizeof(*ret)); - ret->u.pcma2opus.enc = cc_client_float2opus_new(cc_client, bitrate); - ret->u.pcma2opus.runner = pcma2opus_runner; + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->pcma2opus.enc = cc_client_float2opus_new(cc_client, bitrate); + ret->clear = cc_float2opus_clear; + ret->clear_arg = ret->pcma2opus.enc; + ret->pcma2opus.runner = pcma2opus_runner; ret->avpkt = av_packet_alloc(); - ret->run = codec_chain_pcma2opus_run; + ret->run = cc_pcma2opus_run; return ret; } @@ -4765,11 +5195,13 @@ codec_chain_t *codec_chain_new(codec_def_t *src, format_t *src_format, codec_def if (!pcmu2opus_runner) return NULL; - codec_chain_t *ret = g_slice_alloc0(sizeof(*ret)); - ret->u.pcmu2opus.enc = cc_client_float2opus_new(cc_client, bitrate); - ret->u.pcmu2opus.runner = pcmu2opus_runner; + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->pcmu2opus.enc = cc_client_float2opus_new(cc_client, bitrate); + ret->clear = cc_float2opus_clear; + ret->clear_arg = ret->pcmu2opus.enc; + ret->pcmu2opus.runner = pcmu2opus_runner; ret->avpkt = av_packet_alloc(); - ret->run = codec_chain_pcmu2opus_run; + ret->run = cc_pcmu2opus_run; return ret; } @@ -4786,11 +5218,13 @@ codec_chain_t *codec_chain_new(codec_def_t *src, format_t *src_format, codec_def if (!opus2pcmu_runner) return NULL; - codec_chain_t *ret = g_slice_alloc0(sizeof(*ret)); - ret->u.opus2pcmu.dec = cc_client_opus2float_new(cc_client); - ret->u.opus2pcmu.runner = opus2pcmu_runner; + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->opus2pcmu.dec = cc_client_opus2float_new(cc_client); + ret->clear = cc_opus2float_clear; + ret->clear_arg = ret->opus2pcmu.dec; + ret->opus2pcmu.runner = opus2pcmu_runner; ret->avpkt = av_packet_alloc(); - ret->run = codec_chain_opus2pcmu_run; + ret->run = cc_opus2pcmu_run; return ret; } @@ -4807,27 +5241,200 @@ codec_chain_t *codec_chain_new(codec_def_t *src, format_t *src_format, codec_def if (!opus2pcma_runner) return NULL; - codec_chain_t *ret = g_slice_alloc0(sizeof(*ret)); - ret->u.opus2pcma.dec = cc_client_opus2float_new(cc_client); - ret->u.opus2pcma.runner = opus2pcma_runner; + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->opus2pcma.dec = cc_client_opus2float_new(cc_client); + ret->clear = cc_opus2float_clear; + ret->clear_arg = ret->opus2pcma.dec; + ret->opus2pcma.runner = opus2pcma_runner; ret->avpkt = av_packet_alloc(); - ret->run = codec_chain_opus2pcma_run; + ret->run = cc_opus2pcma_run; return ret; } -#endif return NULL; } -AVPacket *codec_chain_input_data(codec_chain_t *c, const str *data, unsigned long ts) { +static codec_cc_t *codec_cc_new_async(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*async_init)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)) +{ + // XXX check ptime, adjust avpkt sizes + if (!strcmp(dst->rtpname, "opus") && !strcmp(src->rtpname, "PCMA")) { + if (src_format->clockrate != 8000) + return NULL; + if (src_format->channels != 1) + return NULL; + if (dst_format->channels != 2) + return NULL; + if (dst_format->clockrate != 48000) + return NULL; + + if (!pcma2opus_async_runner) + return NULL; + + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->pcma2opus.enc = cc_client_float2opus_new(cc_client, bitrate); + ret->clear = cc_float2opus_clear; + ret->clear_arg = ret->pcma2opus.enc; + ret->pcma2opus_async.runner = pcma2opus_async_runner; + ret->run = cc_pcma2opus_run_async; + ret->avpkt_async = av_packet_alloc(); + av_new_packet(ret->avpkt_async, + MAX_OPUS_FRAME_SIZE * MAX_OPUS_FRAMES_PER_PACKET + MAX_OPUS_HEADER_SIZE); + mutex_init(&ret->async_lock); + t_queue_init(&ret->async_jobs); + ret->async_init = async_init; + ret->async_callback = async_callback; + + return ret; + } + else if (!strcmp(dst->rtpname, "opus") && !strcmp(src->rtpname, "PCMU")) { + if (src_format->clockrate != 8000) + return NULL; + if (src_format->channels != 1) + return NULL; + if (dst_format->channels != 2) + return NULL; + if (dst_format->clockrate != 48000) + return NULL; + + if (!pcmu2opus_async_runner) + return NULL; + + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->pcmu2opus.enc = cc_client_float2opus_new(cc_client, bitrate); + ret->clear = cc_float2opus_clear; + ret->clear_arg = ret->pcmu2opus.enc; + ret->pcmu2opus_async.runner = pcmu2opus_async_runner; + ret->run = cc_pcmu2opus_run_async; + ret->avpkt_async = av_packet_alloc(); + av_new_packet(ret->avpkt_async, + MAX_OPUS_FRAME_SIZE * MAX_OPUS_FRAMES_PER_PACKET + MAX_OPUS_HEADER_SIZE); + mutex_init(&ret->async_lock); + t_queue_init(&ret->async_jobs); + ret->async_init = async_init; + ret->async_callback = async_callback; + + return ret; + } + else if (!strcmp(dst->rtpname, "PCMU") && !strcmp(src->rtpname, "opus")) { + if (dst_format->clockrate != 8000) + return NULL; + if (dst_format->channels != 1) + return NULL; + if (src_format->channels != 2) + return NULL; + if (src_format->clockrate != 48000) + return NULL; + + if (!opus2pcmu_async_runner) + return NULL; + + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->opus2pcmu.dec = cc_client_opus2float_new(cc_client); + ret->clear = cc_opus2float_clear; + ret->clear_arg = ret->opus2pcmu.dec; + ret->opus2pcmu_async.runner = opus2pcmu_async_runner; + ret->run = cc_opus2pcmu_run_async; + ret->avpkt_async = av_packet_alloc(); + av_new_packet(ret->avpkt_async, 960); + mutex_init(&ret->async_lock); + t_queue_init(&ret->async_jobs); + ret->async_init = async_init; + ret->async_callback = async_callback; + + return ret; + } + else if (!strcmp(dst->rtpname, "PCMA") && !strcmp(src->rtpname, "opus")) { + if (dst_format->clockrate != 8000) + return NULL; + if (dst_format->channels != 1) + return NULL; + if (src_format->channels != 2) + return NULL; + if (src_format->clockrate != 48000) + return NULL; + + if (!opus2pcma_async_runner) + return NULL; + + codec_cc_t *ret = g_slice_alloc0(sizeof(*ret)); + ret->opus2pcma.dec = cc_client_opus2float_new(cc_client); + ret->clear = cc_opus2float_clear; + ret->clear_arg = ret->opus2pcma.dec; + ret->opus2pcma_async.runner = opus2pcma_async_runner; + ret->run = cc_opus2pcma_run_async; + ret->avpkt_async = av_packet_alloc(); + av_new_packet(ret->avpkt_async, 960); + mutex_init(&ret->async_lock); + t_queue_init(&ret->async_jobs); + ret->async_init = async_init; + ret->async_callback = async_callback; + + return ret; + } + + return NULL; +} + +void codec_cc_stop(codec_cc_t *c) { + if (!c) + return; + + // steal and fire all callbacks to release any references + + async_job_q q; + + { + LOCK(&c->async_lock); + q = c->async_jobs; + t_queue_init(&c->async_jobs); + } + + while (q.length) { + __auto_type j = t_queue_pop_head(&q); + c->async_callback(NULL, j->async_cb_obj); + __cc_async_job_free(j); + } +} + +void codec_cc_free(codec_cc_t **ccp) { + codec_cc_t *c = *ccp; + if (!c) + return; + *ccp = NULL; + + { + LOCK(&c->async_lock); + if (c->async_busy) { + c->async_shutdown = true; + return; // wait for callback + } + } + __codec_cc_free(c); +} + + +#endif + +AVPacket *codec_cc_input_data(codec_cc_t *c, const str *data, unsigned long ts, void *x, void *y, void *z) { #ifdef HAVE_CODEC_CHAIN - av_new_packet(c->avpkt, MAX_OPUS_FRAME_SIZE * MAX_OPUS_FRAMES_PER_PACKET + MAX_OPUS_HEADER_SIZE); + if (c->avpkt) + av_new_packet(c->avpkt, MAX_OPUS_FRAME_SIZE * MAX_OPUS_FRAMES_PER_PACKET + MAX_OPUS_HEADER_SIZE); + void *async_cb_obj = NULL; + if (c->async_init) + async_cb_obj = c->async_init(x, y, z); + + codec_cc_state ret = c->run(c, data, ts, async_cb_obj); + assert(ret != CCC_ERR); - int ret = c->run(c, data, ts, c->avpkt); - assert(ret == 0); + if (ret == CCC_OK) + return c->avpkt; - return c->avpkt; + // CCC_ASYNC + return NULL; #else return NULL; diff --git a/lib/codeclib.h b/lib/codeclib.h index 4cca70477..f0abe4916 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -95,7 +95,7 @@ typedef struct seq_packet_s seq_packet_t; typedef union codec_options_u codec_options_t; typedef struct encoder_callback_s encoder_callback_t; typedef struct dtx_method_s dtx_method_t; -typedef struct codec_chain_s codec_chain_t; +typedef struct codec_cc_s codec_cc_t; typedef int packetizer_f(AVPacket *, GString *, str *, encoder_t *); typedef void format_init_f(struct rtp_payload_type *); @@ -425,9 +425,27 @@ void frame_fill_dtmf_samples(enum AVSampleFormat fmt, void *samples, unsigned in unsigned int event, unsigned int volume, unsigned int sample_rate, unsigned int channels); -codec_chain_t *codec_chain_new(codec_def_t *src, format_t *src_format, codec_def_t *dst, - format_t *dst_format, int bitrate, int ptime); -AVPacket *codec_chain_input_data(codec_chain_t *c, const str *data, unsigned long ts); +#ifdef HAVE_CODEC_CHAIN + +extern codec_cc_t *(*codec_cc_new)(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*init_async)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)); +void codec_cc_stop(codec_cc_t *); +void codec_cc_free(codec_cc_t **); + +#else + +INLINE codec_cc_t *codec_cc_new(codec_def_t *src, format_t *src_format, codec_def_t *dst, + format_t *dst_format, int bitrate, int ptime, + void *(*init_async)(void *, void *, void *), + void (*async_callback)(AVPacket *, void *)) { return NULL; } +INLINE void codec_cc_stop(codec_cc_t *c) { } +INLINE void codec_cc_free(codec_cc_t **c) { } + +#endif + +AVPacket *codec_cc_input_data(codec_cc_t *c, const str *data, unsigned long ts, void *, void *, void *); #include "auxlib.h" diff --git a/perf-tester/main.c b/perf-tester/main.c index 7d9e5594a..876371179 100644 --- a/perf-tester/main.c +++ b/perf-tester/main.c @@ -45,10 +45,11 @@ struct stream { unsigned long long output_ts; decoder_t *decoder; encoder_t *encoder; - codec_chain_t *chain; + codec_cc_t *chain; struct testparams in_params; struct testparams out_params; uint fixture_idx; + long long encoding_start; uint dump_count; AVFormatContext *fmtctx; @@ -65,7 +66,7 @@ struct stats { }; struct stats_sample { - struct timeval tv; // last time stats were sampled + long long ts; // last time stats were sampled struct stats stats; // last sampled stats }; @@ -99,10 +100,18 @@ struct thread_freq_stats { struct freq_stats stats; }; +struct delay_stats { + long long max_actual; + long long max_allowed; + uint slots; + uint *counts; +}; + typedef void render_fn(const struct stats *stats, int line, int x, int breadth, int width, int color, const char *titlefmt, ...); +typedef void delay_fn(const struct delay_stats *stats, int line, int x, int breadth, int width); @@ -197,12 +206,20 @@ static WINDOW *popup; static long long ptime = 20000; // us TODO: support different ptimes +static mutex_t delay_stats_lock = MUTEX_STATIC_INIT; +static struct delay_stats delay_stats; + static render_fn usage_bar; static render_fn time_bar; +static render_fn no_bar; + +static delay_fn delay_bar; +static delay_fn no_delay; static render_fn *output_fn = usage_bar; // startup default +static delay_fn *delay_out_fn = no_delay; // startup default static bool do_cpu_stats = false; static bool do_thread_stats = false; @@ -228,6 +245,11 @@ static pthread_t thread_new(const char *name, void *(*fn)(void *), void *p) { static inline long long us_ticks_scale(long long val) { return val * ticks_per_sec / 1000000; } +static inline long long now_us(void) { + struct timeval now; + gettimeofday(&now, NULL); + return timeval_us(&now); +} // stream is locked @@ -249,6 +271,19 @@ static int got_packet_pkt(struct stream *s, AVPacket *pkt) { ssize_t ret = write(s->output_fd, pkt->data, pkt->size); (void)ret; + long long now = now_us(); + long long diff = now - s->encoding_start; + + { + LOCK(&delay_stats_lock); + if (delay_stats.max_actual < diff) + delay_stats.max_actual = diff; + if (delay_stats.max_allowed && diff < delay_stats.max_allowed) { + uint slot = diff * delay_stats.slots / delay_stats.max_allowed; + delay_stats.counts[slot]++; + } + } + if (s->fmtctx) { // mkv uses millisecond timestamps pkt->pts = pkt->dts = av_rescale(pkt->pts, 1000, out_params.clock_rate); @@ -295,11 +330,9 @@ static void *worker(void *p) { static void readable(int fd, void *o, uintptr_t x) { struct stream *s = o; + obj_hold(s); - struct timeval start; - gettimeofday(&start, NULL); - - LOCK(&s->lock); + long long start = now_us(); static const uint64_t max_iters = 10; // hard upper limit for iterations uint64_t total_iters = 0; @@ -323,6 +356,10 @@ static void readable(int fd, void *o, uintptr_t x) { break; // bail while (exp) { + LOCK(&s->lock); + + s->encoding_start = start; + AVPacket *data = s->in_params.fixture->pdata[s->fixture_idx++]; if (s->fixture_idx >= s->in_params.fixture->len) s->fixture_idx = 0; @@ -333,8 +370,11 @@ static void readable(int fd, void *o, uintptr_t x) { if (!s->chain) decoder_input_data(s->decoder, &frame, s->input_ts, got_frame, s, NULL); else { - AVPacket *pkt = codec_chain_input_data(s->chain, &frame, s->input_ts); - got_packet_pkt(s, pkt); + AVPacket *pkt = codec_cc_input_data(s->chain, &frame, s->input_ts, s, NULL, NULL); + if (pkt) + got_packet_pkt(s, pkt); + else + mutex_lock(&s->lock); // was unlocked by async_init } s->input_ts += data->duration; @@ -343,11 +383,12 @@ static void readable(int fd, void *o, uintptr_t x) { } } - struct timeval end; - gettimeofday(&end, NULL); + obj_put(s); + + long long end = now_us(); LOCK(&worker_self->comput_lock); - worker_self->comput += timeval_diff(&end, &start); + worker_self->comput += end - start; } static void closed(int fd, void *o, uintptr_t x) { @@ -404,6 +445,23 @@ static void stream_free(void *p) { } +static void *async_init(void *x, void *y, void *z) { + struct stream *s = x; + // unlock in case the chain is busy and this blocks, so that whoever keeps the chain + // busy can lock the stream once the result is in + mutex_unlock(&s->lock); + return obj_hold(s); +} +static void async_finish(AVPacket *pkt, void *async_cb_obj) { + struct stream *s = async_cb_obj; + { + LOCK(&s->lock); + got_packet_pkt(s, pkt); + } + obj_put(s); + av_packet_free(&pkt); +} + static void new_stream_params( const codec_def_t *in_def, const struct testparams *inprm, @@ -447,7 +505,7 @@ static void new_stream_params( format_t actual_enc_format; - s->chain = codec_chain_new(in_def, &dec_format, out_def, &enc_format, bitrate, 20); + s->chain = codec_cc_new(in_def, &dec_format, out_def, &enc_format, bitrate, 20, async_init, async_finish); if (!s->chain) { s->encoder = encoder_new(); @@ -848,10 +906,17 @@ static void *do_input(void *p) { case '1': output_fn = usage_bar; + delay_out_fn = no_delay; break; case '2': output_fn = time_bar; + delay_out_fn = no_delay; + break; + + case '3': + output_fn = no_bar; + delay_out_fn = delay_bar; break; case 'o': @@ -993,6 +1058,12 @@ static void time_bar(const struct stats *stats, int line, int x, int breadth, in } +static void no_bar(const struct stats *stats, int line, int x, int breadth, int width, int color, + const char *titlefmt, ...) +{ +} + + static bool thread_collect(pid_t pid, struct stats *outp, struct stats_sample *sample, char comm_out[COMM_SIZE]) { @@ -1005,8 +1076,7 @@ static bool thread_collect(pid_t pid, struct stats *outp, struct stats_sample *s if (!fp) return false; - struct timeval now; - gettimeofday(&now, NULL); + long long now = now_us(); long long utime, stime; char comm[COMM_SIZE]; @@ -1015,13 +1085,13 @@ static bool thread_collect(pid_t pid, struct stats *outp, struct stats_sample *s if (rets != 3) return false; - if (sample->tv.tv_sec) { - outp->iv = us_ticks_scale(timeval_diff(&now, &sample->tv)); + if (sample->ts) { + outp->iv = us_ticks_scale(now - sample->ts); outp->ucpu = utime - sample->stats.ucpu; outp->scpu = stime - sample->stats.scpu; } - sample->tv = now; + sample->ts = now; sample->stats.ucpu = utime; sample->stats.scpu = stime; @@ -1116,8 +1186,7 @@ static bool cpu_collect(stats_q *outp, struct stats *totals) { return false; while (!feof(fp)) { - struct timeval now; - gettimeofday(&now, NULL); + long long now = now_us(); char cpu[7]; long long utime, nice, stime; @@ -1134,13 +1203,13 @@ static bool cpu_collect(stats_q *outp, struct stats *totals) { struct stats stats = {0}; - if (cpu_stats[idx].tv.tv_sec) { - stats.iv = us_ticks_scale(timeval_diff(&now, &cpu_stats[idx].tv)); + if (cpu_stats[idx].ts) { + stats.iv = us_ticks_scale(now - cpu_stats[idx].ts); stats.ucpu = utime - cpu_stats[idx].stats.ucpu; stats.scpu = stime - cpu_stats[idx].stats.scpu; } - cpu_stats[idx].tv = now; + cpu_stats[idx].ts = now; cpu_stats[idx].stats.ucpu = utime; cpu_stats[idx].stats.scpu = stime; @@ -1189,6 +1258,42 @@ static int cpu_collect_stats(const bool do_output, int starty, int maxy, int max } +static void delay_stats_collect(struct delay_stats *local, uint slots, long long max_allowed) { + { + // copy out and reset to zero + LOCK(&delay_stats_lock); + *local = delay_stats; + + delay_stats = (struct delay_stats) {0}; + delay_stats.slots = slots; + delay_stats.counts = g_new0(__typeof__(*delay_stats.counts), delay_stats.slots); + delay_stats.max_allowed = max_allowed; + } +} + +static void delay_stats_free(struct delay_stats *local) { + g_free(local->counts); +} + + +static void no_delay(const struct delay_stats *stats, int line, int x, int breadth, int width) { +} + +static void delay_bar(const struct delay_stats *stats, int line, int x, int breadth, int width) { + if (!stats->slots) + return; + + uint per_slot = stats->max_allowed / stats->slots; + + for (uint i = 0; i < stats->slots; i++) { + move(line, x); + uint start = i * stats->max_allowed / stats->slots; + printw("%3.1f ms - %3.1f ms: %u", start / 1000., (start + per_slot) / 1000., stats->counts[i]); + line++; + } +} + + static void other_thread_free(struct other_thread *thr) { g_slice_free1(sizeof(*thr), thr); } @@ -1357,6 +1462,10 @@ static void *do_stats(void *p) { line += 2; } + // collect delay stats + struct delay_stats delay_stats_local; + delay_stats_collect(&delay_stats_local, maxy - 3, 20000); + // worker thread stats totals_line = line; struct stats worker_totals = {0}; @@ -1396,7 +1505,11 @@ static void *do_stats(void *p) { output_fn(&worker_totals, totals_line, 0, breadth, maxx, SUMMARY_COLOR, "Threads:"); + delay_out_fn(&delay_stats_local, totals_line, 0, breadth, maxx); + refresh_all(); + + delay_stats_free(&delay_stats_local); } thread_cancel_enable(); diff --git a/t/Makefile b/t/Makefile index 78508d198..47250713a 100644 --- a/t/Makefile +++ b/t/Makefile @@ -64,6 +64,8 @@ LDLIBS+= -lhiredis LDLIBS+= $(shell mysql_config --libs) endif +include ../lib/codec-chain.Makefile + SRCS= test-bitstr.c aes-crypt.c aead-aes-crypt.c test-const_str_hash.strhash.c LIBSRCS= loglib.c auxlib.c str.c rtplib.c ssllib.c mix_buffer.c DAEMONSRCS= crypto.c ssrc.c helpers.c rtp.c @@ -130,7 +132,7 @@ unit-tests: $(TESTS) fi daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub daemon-tests-websocket \ - daemon-tests-evs \ + daemon-tests-evs daemon-tests-async-tc \ daemon-tests-audio-player daemon-tests-audio-player-play-media \ daemon-tests-intfs daemon-tests-stats daemon-tests-player-cache daemon-tests-redis \ daemon-tests-rtpp-flags @@ -186,6 +188,9 @@ daemon-tests-audio-player-play-media: daemon-test-deps daemon-tests-rtpp-flags: daemon-test-deps ./auto-test-helper "$@" perl -I../perl auto-daemon-tests-rtpp-flags.pl +daemon-tests-async-tc: daemon-test-deps + ./auto-test-helper "$@" perl -I../perl auto-daemon-tests-async-tc.pl + test-bitstr: test-bitstr.o test-mix-buffer: test-mix-buffer.o $(COMMONOBJS) mix_buffer.o ssrc.o rtp.o crypto.o helpers.o \ diff --git a/t/auto-daemon-tests-async-tc.pl b/t/auto-daemon-tests-async-tc.pl new file mode 100755 index 000000000..1d677cca9 --- /dev/null +++ b/t/auto-daemon-tests-async-tc.pl @@ -0,0 +1,467 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use NGCP::Rtpengine::Test; +use NGCP::Rtpclient::SRTP; +use NGCP::Rtpengine::AutoTest; +use Test::More; +use NGCP::Rtpclient::ICE; +use POSIX; + + +autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 --codec-num-threads=2 + -n 2223 -c 12345 -f -L 7 -E -u 2222 --silence-detect=1 --log-level-internals=7)) + or die; + + +my $extended_tests = $ENV{RTPENGINE_EXTENDED_TESTS}; + + +# 100 ms sine wave + + +my $pcma_1 = "\xd5\xb4\xa5\xa3\xac\xac\xa3\xa5\xb7\xfc\x0a\x3a\x20\x2d\x2c\x23\x24\x31\x6c\x89\xbb\xa0\xad\xac\xa2\xa7\xb0\x96\x0c\x39\x21\x2d\x2c\x22\x27\x32\x1c\x83\xbe\xa1\xad\xac\xa2\xa6\xbd\x9a\x06\x3f\x26\x2d\x2c\x2d\x26\x3f\x06\x9a\xbd\xa6\xa2\xac\xad\xa1\xbe\x83\x1c\x32\x27\x22\x2c\x2d\x21\x39\x0c\x96\xb0\xa7\xa2\xac\xad\xa0\xbb\x89\x6c\x31\x24\x23\x2c\x2d\x20\x3a\x0a\xfc\xb7\xa5\xa3\xac\xac\xa3\xa5\xb4\x55\x34\x25\x23\x2c\x2c\x23\x25\x37\x7c\x8a\xba\xa0\xad\xac\xa3\xa4\xb1\xec\x09\x3b\x20\x2d\x2c\x22\x27\x30\x16\x8c\xb9\xa1\xad\xac\xa2\xa7\xb2\x9c\x03\x3e\x21\x2d\x2c\x22\x26\x3d\x1a\x86\xbf\xa6\xad\xac\xad\xa6\xbf\x86\x1a\x3d\x26\x22\x2c"; +my $pcma_2 = "\x2d\x21\x3e\x03\x9c\xb2\xa7\xa2\xac\xad\xa1\xb9\x8c\x16\x30\x27\x22\x2c\x2d\x20\x3b\x09\xec\xb1\xa4\xa3\xac\xad\xa0\xba\x8a\x7c\x37\x25\x23\x2c\x2c\x23\x25\x34\xd5\xb4\xa5\xa3\xac\xac\xa3\xa5\xb7\xfc\x0a\x3a\x20\x2d\x2c\x23\x24\x31\x6c\x89\xbb\xa0\xad\xac\xa2\xa7\xb0\x96\x0c\x39\x21\x2d\x2c\x22\x27\x32\x1c\x83\xbe\xa1\xad\xac\xa2\xa6\xbd\x9a\x06\x3f\x26\x2d\x2c\x2d\x26\x3f\x06\x9a\xbd\xa6\xa2\xac\xad\xa1\xbe\x83\x1c\x32\x27\x22\x2c\x2d\x21\x39\x0c\x96\xb0\xa7\xa2\xac\xad\xa0\xbb\x89\x6c\x31\x24\x23\x2c\x2d\x20\x3a\x0a\xfc\xb7\xa5\xa3\xac\xac\xa3\xa5\xb4\xd5\x34\x25\x23\x2c\x2c\x23\x25\x37\x7c\x8a\xba\xa0\xad\xac\xa3\xa4\xb1\xec\x09"; +my $pcma_3 = "\x3b\x20\x2d\x2c\x22\x27\x30\x16\x8c\xb9\xa1\xad\xac\xa2\xa7\xb2\x9c\x03\x3e\x21\x2d\x2c\x22\x26\x3d\x1a\x86\xbf\xa6\xad\xac\xad\xa6\xbf\x86\x1a\x3d\x26\x22\x2c\x2d\x21\x3e\x03\x9c\xb2\xa7\xa2\xac\xad\xa1\xb9\x8c\x16\x30\x27\x22\x2c\x2d\x20\x3b\x09\xec\xb1\xa4\xa3\xac\xad\xa0\xba\x8a\x7c\x37\x25\x23\x2c\x2c\x23\x25\x34\x55\xb4\xa5\xa3\xac\xac\xa3\xa5\xb7\xfc\x0a\x3a\x20\x2d\x2c\x23\x24\x31\x6c\x89\xbb\xa0\xad\xac\xa2\xa7\xb0\x96\x0c\x39\x21\x2d\x2c\x22\x27\x32\x1c\x83\xbe\xa1\xad\xac\xa2\xa6\xbd\x9a\x06\x3f\x26\x2d\x2c\x2d\x26\x3f\x06\x9a\xbd\xa6\xa2\xac\xad\xa1\xbe\x83\x1c\x32\x27\x22\x2c\x2d\x21\x39\x0c\x96\xb0\xa7\xa2\xac\xad\xa0"; +my $pcma_4 = "\xbb\x89\x6c\x31\x24\x23\x2c\x2d\x20\x3a\x0a\xfc\xb7\xa5\xa3\xac\xac\xa3\xa5\xb4\x55\x34\x25\x23\x2c\x2c\x23\x25\x37\x7c\x8a\xba\xa0\xad\xac\xa3\xa4\xb1\xec\x09\x3b\x20\x2d\x2c\x22\x27\x30\x16\x8c\xb9\xa1\xad\xac\xa2\xa7\xb2\x9c\x03\x3e\x21\x2d\x2c\x22\x26\x3d\x1a\x86\xbf\xa6\xad\xac\xad\xa6\xbf\x86\x1a\x3d\x26\x22\x2c\x2d\x21\x3e\x03\x9c\xb2\xa7\xa2\xac\xad\xa1\xb9\x8c\x16\x30\x27\x22\x2c\x2d\x20\x3b\x09\xec\xb1\xa4\xa3\xac\xad\xa0\xba\x8a\x7c\x37\x25\x23\x2c\x2c\x23\x25\x34\x55\xb4\xa5\xa3\xac\xac\xa3\xa5\xb7\xfc\x0a\x3a\x20\x2d\x2c\x23\x24\x31\x6c\x89\xbb\xa0\xad\xac\xa2\xa7\xb0\x96\x0c\x39\x21\x2d\x2c\x22\x27\x32\x1c\x83\xbe\xa1"; +my $pcma_5 = "\xad\xac\xa2\xa6\xbd\x9a\x06\x3f\x26\x2d\x2c\x2d\x26\x3f\x06\x9a\xbd\xa6\xa2\xac\xad\xa1\xbe\x83\x1c\x32\x27\x22\x2c\x2d\x21\x39\x0c\x96\xb0\xa7\xa2\xac\xad\xa0\xbb\x89\x6c\x31\x24\x23\x2c\x2d\x20\x3a\x0a\xfc\xb7\xa5\xa3\xac\xac\xa3\xa5\xb4\xd5\x34\x25\x23\x2c\x2c\x23\x25\x37\x7c\x8a\xba\xa0\xad\xac\xa3\xa4\xb1\xec\x09\x3b\x20\x2d\x2c\x22\x27\x30\x16\x8c\xb9\xa1\xad\xac\xa2\xa7\xb2\x9c\x03\x3e\x21\x2d\x2c\x22\x26\x3d\x1a\x86\xbf\xa6\xad\xac\xad\xa6\xbf\x86\x1a\x3d\x26\x22\x2c\x2d\x21\x3e\x03\x9c\xb2\xa7\xa2\xac\xad\xa1\xb9\x8c\x16\x30\x27\x22\x2c\x2d\x20\x3b\x09\xec\xb1\xa4\xa3\xac\xad\xa0\xba\x8a\x7c\x37\x25\x23\x2c\x2c\x23\x25\x34"; + + + +my ($sock_a, $sock_b, $sock_c, $sock_d, $port_a, $port_b, $ssrc, $ssrc_b, $resp, + $sock_ax, $sock_bx, $port_ax, $port_bx, + $srtp_ctx_a, $srtp_ctx_b, $srtp_ctx_a_rev, $srtp_ctx_b_rev, $ufrag_a, $ufrag_b, + @ret1, @ret2, @ret3, @ret4, $srtp_key_a, $srtp_key_b, $ts, $seq, $has_recv); + + + +if ($extended_tests) { + +($sock_a, $sock_b) = new_call([qw(198.51.100.43 6060)], [qw(198.51.100.43 6062)]); + +($port_a) = offer('opus fmtp options, full offer list', + { codec => { transcode => + ['opus/48000/2///maxaveragebitrate--40000;maxplaybackrate--32000;sprop-stereo--0;stereo--0;cbr--0;useinbandfec--0;usedtx--0;sprop-maxcapturerate--16000', + 'PCMU'], + mask => ['all'] } }, < { transcode => ['PCMA'], mask => ['opus'] } }, < { transcode => ['opus'] } }, < { transcode => ['opus/48000/2///useinbandfec=1;stereo=1;sprop-stereo=1'] } }, < { transcode => ['opus/48000/2///useinbandfec=1;stereo=0;sprop-stereo=0'] } }, < { transcode => ['opus/48000/2///stereo=1;sprop-stereo=0'] } }, < { transcode => ['opus/48000/2///stereo=0;sprop-stereo=1'] } }, < { transcode => ['PCMA'], mask => ['opus'] } }, <