From c8dd521e334234e3534cd777e57b4a6b82b829dc Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 22 Apr 2025 14:31:55 -0400 Subject: [PATCH] MT#55283 obsolete SSRC tracking Change-Id: Ic0a3c7826180e1e1f4a783dcc6d775c92fe38d1f --- daemon/call.c | 70 +++++++++++------------- daemon/call_interfaces.c | 114 +++++++++++++++++++-------------------- daemon/cli.c | 3 +- daemon/codec.c | 48 ++++++++--------- daemon/dtmf.c | 2 +- daemon/media_socket.c | 93 +++++++++++++++----------------- daemon/mqtt.c | 63 ++++++++++++++-------- daemon/ssrc.c | 12 ++--- daemon/t38.c | 2 +- include/call.h | 5 -- include/ssrc.h | 4 ++ t/auto-daemon-tests.pl | 26 +++------ 12 files changed, 215 insertions(+), 227 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index e369f51cb..01c874d53 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -74,6 +74,7 @@ static void media_stop(struct call_media *m); __attribute__((nonnull(1, 2, 4))) static struct media_subscription *__subscribe_medias_both_ways(struct call_media * a, struct call_media * b, bool is_offer, medias_q *); +static void call_stream_crypto_reset(struct packet_stream *ps); /* called with call->master_lock held in R */ static int call_timer_delete_monologues(call_t *c) { @@ -212,26 +213,6 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { if (active_media) CALL_CLEAR(sfd->call, FOREIGN_MEDIA); - for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) { - struct ssrc_entry_call *ctx = ps->ssrc_in[u]; - if (!ctx) - break; - - if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L) - payload_tracker_add(&ctx->tracker, - atomic_get_na(&ctx->stats->last_pt)); - } - for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) { - struct ssrc_entry_call *ctx = ps->ssrc_out[u]; - if (!ctx) - break; - - if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L) - payload_tracker_add(&ctx->tracker, - atomic_get_na(&ctx->stats->last_pt)); - } - - no_sfd: if (good) goto next; @@ -263,6 +244,22 @@ next: ssrc_collect_metrics(media); if (MEDIA_ISSET(media, TRANSCODING)) hlp->transcoded_media++; + + for (__auto_type l = media->ssrc_hash_in.nq.head; l; l = l->next) { + struct ssrc_entry_call *ctx = l->data; + + if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L) + payload_tracker_add(&ctx->tracker, + atomic_get_na(&ctx->stats->last_pt)); + } + + for (__auto_type l = media->ssrc_hash_out.nq.head; l; l = l->next) { + struct ssrc_entry_call *ctx = l->data; + + if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L) + payload_tracker_add(&ctx->tracker, + atomic_get_na(&ctx->stats->last_pt)); + } } if (good) { @@ -1078,27 +1075,27 @@ static void __fill_stream(struct packet_stream *ps, const struct endpoint *epp, PS_SET(ps, NAT_WAIT); } -void call_stream_crypto_reset(struct packet_stream *ps) { +static void call_stream_crypto_reset(struct packet_stream *ps) { ilog(LOG_DEBUG, "Resetting crypto context"); crypto_reset(&ps->crypto); + struct call_media *media = ps->media; + if (PS_ISSET(ps, RTP)) { - mutex_lock(&ps->in_lock); - for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) { - if (!ps->ssrc_in[u]) // end of list - break; - atomic_set_na(&ps->ssrc_in[u]->stats->ext_seq, 0); + mutex_lock(&media->ssrc_hash_in.lock); + for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + atomic_set_na(&se->stats->ext_seq, 0); } - mutex_unlock(&ps->in_lock); + mutex_unlock(&media->ssrc_hash_in.lock); - mutex_lock(&ps->out_lock); - for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) { - if (!ps->ssrc_out[u]) // end of list - break; - atomic_set_na(&ps->ssrc_out[u]->stats->ext_seq, 0); + mutex_lock(&media->ssrc_hash_out.lock); + for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + atomic_set_na(&se->stats->ext_seq, 0); } - mutex_unlock(&ps->out_lock); + mutex_unlock(&media->ssrc_hash_out.lock); } } @@ -4163,6 +4160,7 @@ void call_destroy(call_t *c) { char *addr = sockaddr_print_buf(&ps->endpoint.address); endpoint_t *local_endpoint = packet_stream_local_addr(ps); char *local_addr = sockaddr_print_buf(&local_endpoint->address); + struct ssrc_entry_call *se = call_get_first_ssrc(&ps->media->ssrc_hash_in); ilog(LOG_INFO, "--------- Port %15s:%-5u <> %s%15s:%-5u%s%s, SSRC %s%" PRIx32 "%s, in " "%" PRIu64 " p, %" PRIu64 " b, %" PRIu64 " e, %" PRIu64 " ts, " @@ -4171,7 +4169,7 @@ void call_destroy(call_t *c) { (unsigned int) local_endpoint->port, FMT_M(addr, ps->endpoint.port), (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0), + FMT_M(se ? se->h.ssrc : 0), atomic64_get_na(&ps->stats_in->packets), atomic64_get_na(&ps->stats_in->bytes), atomic64_get_na(&ps->stats_in->errors), @@ -4360,10 +4358,6 @@ static void __call_free(call_t *c) { crypto_cleanup(&ps->crypto); t_queue_clear(&ps->sfds); t_hash_table_destroy(ps->rtp_stats); - for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) - ssrc_entry_release(ps->ssrc_in[u]); - for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) - ssrc_entry_release(ps->ssrc_out[u]); bufferpool_unref(ps->stats_in); bufferpool_unref(ps->stats_out); g_free(ps); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 9f7bf9820..9ac818397 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -64,7 +64,8 @@ static void call_ng_flags_list(const ng_parser_t *, parser_arg list, void (*item_callback)(const ng_parser_t *, parser_arg, helper_arg), helper_arg); static void call_ng_flags_esc_str_list(str *s, unsigned int, helper_arg); -static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, const struct ssrc_hash *ht); +static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, parser_arg list, + const struct ssrc_hash *ht); static str *str_dup_escape(const str *s); static void call_set_dtmf_block(call_t *call, struct call_monologue *monologue, sdp_ng_flags *flags); @@ -2748,27 +2749,6 @@ static void ng_stats_endpoint(const ng_parser_t *parser, parser_arg dict, const parser->dict_add_int(dict, "port", ep->port); } -static void ng_stats_stream_ssrc(const ng_parser_t *parser, parser_arg dict, - struct ssrc_entry_call *const ssrcs[RTPE_NUM_SSRC_TRACKING], - const char *label) -{ - parser_arg list = parser->dict_add_list(dict, label); - - for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { - struct ssrc_entry_call *c = ssrcs[i]; - if (!c) - break; - - parser_arg ssrc = parser->list_add_dict(list); - - parser->dict_add_int(ssrc, "SSRC", ssrcs[i]->h.ssrc); - parser->dict_add_int(ssrc, "bytes", atomic64_get_na(&c->stats->bytes)); - parser->dict_add_int(ssrc, "packets", atomic64_get_na(&c->stats->packets)); - parser->dict_add_int(ssrc, "last RTP timestamp", atomic_get_na(&c->stats->timestamp)); - parser->dict_add_int(ssrc, "last RTP seq", atomic_get_na(&c->stats->ext_seq)); - } -} - #define BF_PS(k, f) if (PS_ISSET(ps, f)) parser->list_add_string(flags, k) static void ng_stats_stream(ng_command_ctx_t *ctx, parser_arg list, const struct packet_stream *ps, @@ -2800,6 +2780,10 @@ static void ng_stats_stream(ng_command_ctx_t *ctx, parser_arg list, const struct parser->dict_add_int(dict, "last kernel packet", atomic64_get_na(&ps->stats_in->last_packet_us) / 1000000L); parser->dict_add_int(dict, "last user packet", atomic64_get_na(&ps->last_packet_us) / 1000000L); + __auto_type se = call_get_first_ssrc(&ps->media->ssrc_hash_in); + if (se) + parser->dict_add_int(dict, "SSRC", se->h.ssrc); + flags = parser->dict_add_list(dict, "flags"); BF_PS("RTP", RTP); @@ -2814,9 +2798,6 @@ static void ng_stats_stream(ng_command_ctx_t *ctx, parser_arg list, const struct BF_PS("media handover", MEDIA_HANDOVER); BF_PS("ICE", ICE); - ng_stats_stream_ssrc(parser, dict, ps->ssrc_in, "ingress SSRCs"); - ng_stats_stream_ssrc(parser, dict, ps->ssrc_out, "egress SSRCs"); - stats: if (totals->last_packet_us < packet_stream_last_packet(ps)) totals->last_packet_us = packet_stream_last_packet(ps); @@ -2888,7 +2869,8 @@ static void ng_stats_media(ng_command_ctx_t *ctx, parser_arg list, const struct BF_M("transcoding", TRANSCODING); BF_M("block egress", BLOCK_EGRESS); - ng_stats_ssrc(parser, ssrc, &m->ssrc_hash_in); // XXX out + ng_stats_ssrc(parser, ssrc, parser->dict_add_list(dict, "ingress SSRCs"), &m->ssrc_hash_in); + ng_stats_ssrc(parser, NULL, parser->dict_add_list(dict, "egress SSRCs"), &m->ssrc_hash_out); stats: for (auto_iter(l, m->streams.head); l; l = l->next) { @@ -3030,45 +3012,61 @@ static void ng_stats_ssrc_mos_entry_dict_avg(const ng_parser_t *parser, parser_a parser->dict_add_int(subent, "samples", div); } -static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, const struct ssrc_hash *ht) { +static void ng_stats_ssrc_1(const ng_parser_t *parser, parser_arg ent, struct ssrc_entry_call *se) { + parser->dict_add_int(ent, "bytes", atomic64_get_na(&se->stats->bytes)); + parser->dict_add_int(ent, "packets", atomic64_get_na(&se->stats->packets)); + parser->dict_add_int(ent, "last RTP timestamp", atomic_get_na(&se->stats->timestamp)); + parser->dict_add_int(ent, "last RTP seq", atomic_get_na(&se->stats->ext_seq)); + + parser->dict_add_int(ent, "cumulative loss", se->packets_lost); + + int mos_samples = se->stats_blocks.length - se->no_mos_count; + if (mos_samples < 1) mos_samples = 1; + ng_stats_ssrc_mos_entry_dict_avg(parser, ent, "average MOS", &se->average_mos, mos_samples); + ng_stats_ssrc_mos_entry_dict(parser, ent, "lowest MOS", se->lowest_mos); + ng_stats_ssrc_mos_entry_dict(parser, ent, "highest MOS", se->highest_mos); + + parser_arg progdict = parser->dict_add_dict(ent, "MOS progression"); + // aim for about 10 entries to the list + GList *listent = se->stats_blocks.head; + struct ssrc_stats_block *sb = listent->data; + int64_t interval + = ((struct ssrc_stats_block *) se->stats_blocks.tail->data)->reported + - sb->reported; + interval /= 10; + parser->dict_add_int(progdict, "interval", interval / 1000000L); + int64_t next_step = sb->reported; + parser_arg entlist = parser->dict_add_list(progdict, "entries"); + + for (; listent; listent = listent->next) { + sb = listent->data; + if (sb->reported < next_step) + continue; + next_step += interval; + parser_arg cent = parser->list_add_dict(entlist); + ng_stats_ssrc_mos_entry(parser, cent, sb); + } +} + +static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, parser_arg list, + const struct ssrc_hash *ht) +{ for (GList *l = ht->nq.head; l; l = l->next) { struct ssrc_entry_call *se = l->data; char tmp[12]; snprintf(tmp, sizeof(tmp), "%" PRIu32, se->h.ssrc); - if (parser->dict_contains(dict, tmp)) - continue; if (!se->stats_blocks.length || !se->lowest_mos || !se->highest_mos) continue; - parser_arg ent = parser->dict_add_dict_dup(dict, tmp); - - parser->dict_add_int(ent, "cumulative loss", se->packets_lost); - - int mos_samples = se->stats_blocks.length - se->no_mos_count; - if (mos_samples < 1) mos_samples = 1; - ng_stats_ssrc_mos_entry_dict_avg(parser, ent, "average MOS", &se->average_mos, mos_samples); - ng_stats_ssrc_mos_entry_dict(parser, ent, "lowest MOS", se->lowest_mos); - ng_stats_ssrc_mos_entry_dict(parser, ent, "highest MOS", se->highest_mos); - - parser_arg progdict = parser->dict_add_dict(ent, "MOS progression"); - // aim for about 10 entries to the list - GList *listent = se->stats_blocks.head; - struct ssrc_stats_block *sb = listent->data; - int64_t interval - = ((struct ssrc_stats_block *) se->stats_blocks.tail->data)->reported - - sb->reported; - interval /= 10; - parser->dict_add_int(progdict, "interval", interval / 1000000L); - int64_t next_step = sb->reported; - parser_arg entlist = parser->dict_add_list(progdict, "entries"); - - for (; listent; listent = listent->next) { - sb = listent->data; - if (sb->reported < next_step) - continue; - next_step += interval; - parser_arg cent = parser->list_add_dict(entlist); - ng_stats_ssrc_mos_entry(parser, cent, sb); + parser_arg ent = parser->list_add_dict(list); + + parser->dict_add_int(ent, "SSRC", se->h.ssrc); + + ng_stats_ssrc_1(parser, ent, se); + + if (dict.gen && !parser->dict_contains(dict, tmp)) { + ent = parser->dict_add_dict_dup(dict, tmp); + ng_stats_ssrc_1(parser, ent, se); } } } diff --git a/daemon/cli.c b/daemon/cli.c index c4b69b021..dd51e6c41 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -799,6 +799,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml) endpoint_t *local_endpoint = packet_stream_local_addr(ps); local_addr = sockaddr_print_buf(&local_endpoint->address); + struct ssrc_entry_call *se = call_get_first_ssrc(&md->ssrc_hash_in); cw->cw_printf(cw, "-------- Port %15s:%-5u <> %15s:%-5u%s, SSRC %" PRIx32 ", " "%" PRIu64 " p, %" PRIu64 " b, %" PRIu64 " e, %" PRIu64 " uts " @@ -808,7 +809,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml) sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0, + se ? se->h.ssrc : 0, atomic64_get_na(&ps->stats_in->packets), atomic64_get_na(&ps->stats_in->bytes), atomic64_get_na(&ps->stats_in->errors), diff --git a/daemon/codec.c b/daemon/codec.c index e5e7a55ab..a12d09fee 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -1315,32 +1315,24 @@ static void __rtcp_timer_run(struct codec_timer *ct) { rwlock_lock_r(&rt->call->master_lock); // copy out references to SSRCs for lock-free handling - struct ssrc_entry_call *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,}; - if (media->streams.head) { - struct packet_stream *ps = media->streams.head->data; - mutex_lock(&ps->out_lock); - for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { - if (!ps->ssrc_out[u]) // end of list - break; - ssrc_out[u] = ps->ssrc_out[u]; - ssrc_entry_hold(ssrc_out[u]); - } - mutex_unlock(&ps->out_lock); + GQueue ssrc_out = G_QUEUE_INIT; + mutex_lock(&media->ssrc_hash_out.lock); + for (GList *l = media->ssrc_hash_out.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + g_queue_push_tail(&ssrc_out, ssrc_entry_hold(se)); } + mutex_unlock(&media->ssrc_hash_out.lock); - for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { - if (!ssrc_out[u]) // end of list - break; - // coverity[use : FALSE] - rtcp_send_report(media, ssrc_out[u]); + for (GList *l = ssrc_out.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + rtcp_send_report(media, se); } rwlock_unlock_r(&rt->call->master_lock); - for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { - if (!ssrc_out[u]) // end of list - break; - ssrc_entry_release(ssrc_out[u]); + while (ssrc_out.length) { + struct ssrc_entry_call *se = g_queue_pop_head(&ssrc_out); + ssrc_entry_release(se); } out: @@ -3769,6 +3761,8 @@ static void __dtx_send_later(struct codec_timer *ct) { ts = dtxb->head_ts; } ps = mp_copy.stream; + struct call_media *media = ps->media; + struct ssrc_entry_call *se = call_get_first_ssrc(&media->ssrc_hash_in); log_info_stream_fd(mp_copy.sfd); // copy out other fields so we can unlock @@ -3787,20 +3781,20 @@ static void __dtx_send_later(struct codec_timer *ct) { shutdown = true; else if (!ps) shutdown = true; - else if (!ps->ssrc_in[0]) + else if (!se) shutdown = true; - else if (dtxb->ssrc != ps->ssrc_in[0]->h.ssrc) + else if (dtxb->ssrc != se->h.ssrc) shutdown = true; else if (dtxb->ct.next == 0) shutdown = true; else { shutdown = true; // default if no last used PTs are known - for (int i = 0; i < G_N_ELEMENTS(ps->ssrc_in[0]->tracker.last_pts); i++) { - int pt_idx = ps->ssrc_in[0]->tracker.last_pt_idx - i; - pt_idx += G_N_ELEMENTS(ps->ssrc_in[0]->tracker.last_pts); - pt_idx %= G_N_ELEMENTS(ps->ssrc_in[0]->tracker.last_pts); - int last_pt = ps->ssrc_in[0]->tracker.last_pts[pt_idx]; + for (int i = 0; i < G_N_ELEMENTS(se->tracker.last_pts); i++) { + int pt_idx = se->tracker.last_pt_idx - i; + pt_idx += G_N_ELEMENTS(se->tracker.last_pts); + pt_idx %= G_N_ELEMENTS(se->tracker.last_pts); + int last_pt = se->tracker.last_pts[pt_idx]; if (last_pt == 255) break; diff --git a/daemon/dtmf.c b/daemon/dtmf.c index eddd894f4..ac3c949ef 100644 --- a/daemon/dtmf.c +++ b/daemon/dtmf.c @@ -802,7 +802,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura if (!media->streams.head) return "Media doesn't have an RTP stream"; struct packet_stream *ps = media->streams.head->data; - struct ssrc_entry_call *ssrc_in = ps->ssrc_in[0]; + struct ssrc_entry_call *ssrc_in = call_get_first_ssrc(&media->ssrc_hash_in); if (!ssrc_in) return "No SSRC context present for DTMF injection"; // XXX fall back to generating stream diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 9baac3b2e..6d9a7515d 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1512,6 +1512,7 @@ TYPED_GQUEUE(kernel_output, struct rtpengine_destination_info) typedef struct { struct rtpengine_target_info reti; + struct ssrc_entry_call *ssrc[RTPE_NUM_SSRC_TRACKING]; kernel_output_q outputs; rtp_stats_arr *payload_types; bool blackhole; @@ -1604,11 +1605,15 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st s->manipulate_pt = s->silenced || ML_ISSET(media->monologue, BLOCK_SHORT); reti->track_ssrc = 1; - for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { - if (stream->ssrc_in[u]) { - reti->ssrc[u] = htonl(stream->ssrc_in[u]->h.ssrc); - reti->ssrc_stats[u] = stream->ssrc_in[u]->stats; - } + unsigned int u = 0; + for (GList *l = stream->media->ssrc_hash_in.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + if (u >= G_N_ELEMENTS(reti->ssrc)) + break; + s->ssrc[u] = se; // no reference needed + reti->ssrc[u] = htonl(se->h.ssrc); + reti->ssrc_stats[u] = se->stats; + u++; } recording_stream_kernel_info(stream, reti); @@ -1766,15 +1771,20 @@ static const char *kernelize_one(kernelize_state *s, redi->output.stats = sink->stats_out; if (reti->track_ssrc) { - for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { - if (sink->ssrc_out[u]) { - // XXX order can be different from ingress? - redi->output.seq_offset[u] = sink->ssrc_out[u]->seq_diff; - redi->output.ssrc_stats[u] = sink->ssrc_out[u]->stats; - } + unsigned int u = 0; + for (GList *l = sink->media->ssrc_hash_out.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + if (u >= G_N_ELEMENTS(redi->output.ssrc_out)) + break; + - if (redi->output.ssrc_subst && stream->ssrc_in[u]) - redi->output.ssrc_out[u] = htonl(stream->ssrc_in[u]->ssrc_map_out); + redi->output.seq_offset[u] = se->seq_diff; + redi->output.ssrc_stats[u] = se->stats; + + if (redi->output.ssrc_subst && s->ssrc[u]) + redi->output.ssrc_out[u] = htonl(s->ssrc[u]->ssrc_map_out); + + u++; } } @@ -2069,44 +2079,32 @@ noop: // returns non-null with reason string if stream should be removed from kernel -static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, mutex_t *lock, - struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p, +static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, uint32_t output_ssrc, - struct ssrc_entry_call **output, struct ssrc_hash *ssrc_hash, const char *label) + struct ssrc_entry_call **output, struct ssrc_hash *ssrc_hash, + const char *label) { const char *ret = NULL; - mutex_lock(lock); - - int ctx_idx = __hunt_ssrc_ctx_idx(ssrc, list, 0); - if (ctx_idx == -1) { - // SSRC mismatch - get the new entry: - ctx_idx = *ctx_idx_p; - // move to next slot - *ctx_idx_p = (*ctx_idx_p + 1) % RTPE_NUM_SSRC_TRACKING; - // eject old entry if present - if (list[ctx_idx]) - ssrc_entry_release(list[ctx_idx]); - // get new entry - list[ctx_idx] = - get_ssrc(ssrc, ssrc_hash); + mutex_lock(&ssrc_hash->lock); + struct ssrc_entry_call *first = call_get_first_ssrc(ssrc_hash); + if (first && first->h.ssrc == ssrc) + ssrc_entry_hold(first); + else + first = NULL; + mutex_unlock(&ssrc_hash->lock); + struct ssrc_entry_call *se = first ?: get_ssrc(ssrc, ssrc_hash); + + if (se != first) { ret = "SSRC changed"; ilog(LOG_DEBUG, "New %s SSRC for: %s%s:%d SSRC: %x%s", label, - FMT_M(sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, ssrc)); - } - if (ctx_idx != 0) { - // move most recent entry to front of the list - struct ssrc_entry_call *tmp = list[0]; - list[0] = list[ctx_idx]; - list[ctx_idx] = tmp; - ctx_idx = 0; + FMT_M(sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, ssrc)); } // extract and hold entry ssrc_entry_release(*output); - *output = list[ctx_idx]; - ssrc_entry_hold(*output); + *output = se; // reverse SSRC mapping if (!output_ssrc) @@ -2114,7 +2112,6 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, else (*output)->ssrc_map_out = output_ssrc; - mutex_unlock(lock); return ret; } // check and update input SSRC pointers @@ -2122,8 +2119,8 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, struct ssrc_entry_call **ssrc_in_p, struct ssrc_hash *ssrc_hash) { - return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in, - &in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, "ingress"); + return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), + 0, ssrc_in_p, ssrc_hash, "ingress"); } // check and update output SSRC pointers // returns non-null with reason string if stream should be removed from kernel @@ -2133,14 +2130,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss bool ssrc_change) { if (ssrc_change) - return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock, - out_srtp->ssrc_out, - &out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, + return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, + ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, "egress (mapped)"); - return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock, - out_srtp->ssrc_out, - &out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, + return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), + 0, ssrc_out_p, ssrc_hash, "egress (direct)"); } diff --git a/daemon/mqtt.c b/daemon/mqtt.c index 8e245daaa..bcfbd0aba 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -367,17 +367,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_begin_object(json); mqtt_stream_stats_dir(ps->stats_in, json); - json_builder_set_member_name(json, "SSRC"); - json_builder_begin_array(json); - for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { - if (!ps->ssrc_in[i]) - break; - json_builder_begin_object(json); - mqtt_ssrc_stats(ps->ssrc_in[i], json, ps->media); - json_builder_end_object(json); - } - json_builder_end_array(json); - json_builder_end_object(json); mutex_unlock(&ps->in_lock); @@ -388,17 +377,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_begin_object(json); mqtt_stream_stats_dir(ps->stats_out, json); - json_builder_set_member_name(json, "SSRC"); - json_builder_begin_array(json); - for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { - if (!ps->ssrc_out[i]) - break; - json_builder_begin_object(json); - mqtt_ssrc_stats(ps->ssrc_out[i], json, ps->media); - json_builder_end_object(json); - } - json_builder_end_array(json); - json_builder_end_object(json); mutex_unlock(&ps->out_lock); @@ -434,6 +412,47 @@ static void mqtt_media_stats(struct call_media *media, JsonBuilder *json) { json_builder_add_string_value(json, "inactive"); } + + mutex_lock(&media->ssrc_hash_in.lock); + + json_builder_set_member_name(json, "ingress"); + json_builder_begin_object(json); + + json_builder_set_member_name(json, "SSRC"); + json_builder_begin_array(json); + for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + json_builder_begin_object(json); + mqtt_ssrc_stats(se, json, media); + json_builder_end_object(json); + } + json_builder_end_array(json); + + json_builder_end_object(json); + + mutex_unlock(&media->ssrc_hash_in.lock); + + + mutex_lock(&media->ssrc_hash_out.lock); + + json_builder_set_member_name(json, "egress"); + json_builder_begin_object(json); + + json_builder_set_member_name(json, "SSRC"); + json_builder_begin_array(json); + for (GList *l = media->ssrc_hash_out.nq.head; l; l = l->next) { + struct ssrc_entry_call *se = l->data; + json_builder_begin_object(json); + mqtt_ssrc_stats(se, json, media); + json_builder_end_object(json); + } + json_builder_end_array(json); + + json_builder_end_object(json); + + mutex_unlock(&media->ssrc_hash_out.lock); + + struct packet_stream *ps = media->streams.head ? media->streams.head->data : NULL; if (ps) mqtt_stream_stats(ps, json); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 0fbd230b4..f3e1d1134 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -704,11 +704,8 @@ out: // call master lock held in R void ssrc_collect_metrics(struct call_media *media) { - if (!media->streams.head) - return; - struct packet_stream *ps = media->streams.head->data; - for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { - struct ssrc_entry_call *s = ps->ssrc_in[i]; + for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) { + struct ssrc_entry_call *s = l->data; if (!s) break; // end of list @@ -718,11 +715,12 @@ void ssrc_collect_metrics(struct call_media *media) { if (s->tracker.most_len > 0 && s->tracker.most[0] != 255) { const rtp_payload_type *rpt = get_rtp_payload_type(s->tracker.most[0], - &ps->media->codecs); + &media->codecs); if (rpt && rpt->clock_rate) s->jitter = s->jitter * 1000 / rpt->clock_rate; } - RTPE_SAMPLE_SFD(jitter_measured, s->jitter, ps->selected_sfd); + if (media->streams.head) + RTPE_SAMPLE_SFD(jitter_measured, s->jitter, media->streams.head->data->selected_sfd); } } diff --git a/daemon/t38.c b/daemon/t38.c index c44478637..24b136520 100644 --- a/daemon/t38.c +++ b/daemon/t38.c @@ -416,7 +416,7 @@ int t38_gateway_pair(struct call_media *t38_media, struct call_media *pcm_media, goto err; media_player_new(&tg->pcm_player, pcm_media->monologue, - (pcm_media->streams.length ? pcm_media->streams.head->data->ssrc_out[0] : NULL), + call_get_first_ssrc(&pcm_media->ssrc_hash_out), NULL); // even though we call media_player_set_media() here, we need to call it again in // t38_gateway_start because our sink might not have any streams added here yet, diff --git a/include/call.h b/include/call.h index 1f24be7e4..488ec911e 100644 --- a/include/call.h +++ b/include/call.h @@ -439,10 +439,6 @@ struct packet_stream { struct endpoint advertised_endpoint; /* RO */ struct endpoint learned_endpoint; /* LOCK: out_lock */ struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct ssrc_entry_call *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */ - *ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */ - unsigned int ssrc_in_idx, /* LOCK: in_lock */ - ssrc_out_idx; /* LOCK: out_lock */ struct send_timer *send_timer; /* RO */ struct jitter_buffer *jb; /* RO */ int64_t kernel_time_us; @@ -898,7 +894,6 @@ enum thread_looper_action call_timer(void); void __rtp_stats_update(rtp_stats_ht dst, struct codec_store *); bool __init_stream(struct packet_stream *ps); -void call_stream_crypto_reset(struct packet_stream *ps); const rtp_payload_type *__rtp_stats_codec(struct call_media *m); diff --git a/include/ssrc.h b/include/ssrc.h index 3f783af23..b499f98a2 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -202,6 +202,10 @@ INLINE void *get_ssrc(uint32_t ssrc, struct ssrc_hash *ht) { return get_ssrc_full(ssrc, ht, NULL); } +INLINE struct ssrc_entry_call *call_get_first_ssrc(struct ssrc_hash *ht) { + return ht->nq.head ? ht->nq.head->data : NULL; +} + void ssrc_sender_report(struct call_media *, const struct ssrc_sender_report *, int64_t); void ssrc_receiver_report(struct call_media *, stream_fd *, const struct ssrc_receiver_report *, int64_t); void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *rr, int64_t); diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index 42fe56f78..e4bcc9c9f 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -2021,7 +2021,6 @@ $resp = rtpe_req('query', 'unsolicited to-tag w/ via-branch', { }); Test2::Tools::Compare::like($resp, { 'result' => 'ok', 'last redis update' => '0', - 'SSRC' => {}, 'last signal' => qr//, 'tags' => { ft() => { @@ -2034,17 +2033,17 @@ Test2::Tools::Compare::like($resp, { ], 'medias' => [ { + 'ingress SSRCs' => [], + 'egress SSRCs' => [], 'index' => '1', 'streams' => [ { 'last user packet' => qr//, - 'egress SSRCs' => [], 'advertised endpoint' => { 'address' => '198.51.100.1', 'family' => 'IPv4', 'port' => '3000' }, - 'ingress SSRCs' => [], 'flags' => [ 'RTP', 'filled' @@ -2071,13 +2070,11 @@ Test2::Tools::Compare::like($resp, { 'last packet' => qr// }, { - 'egress SSRCs' => [], 'advertised endpoint' => { 'address' => '198.51.100.1', 'family' => 'IPv4', 'port' => '3001' }, - 'ingress SSRCs' => [], 'flags' => [ 'RTCP', 'filled' @@ -2127,6 +2124,8 @@ Test2::Tools::Compare::like($resp, { 'tag' => tt(), 'medias' => [ { + 'ingress SSRCs' => [], + 'egress SSRCs' => [], 'streams' => [ { 'stats_out' => { @@ -2134,8 +2133,6 @@ Test2::Tools::Compare::like($resp, { 'packets' => '0', 'errors' => '0' }, - 'egress SSRCs' => [], - 'ingress SSRCs' => [], 'advertised endpoint' => { 'port' => '4000', 'family' => 'IPv4', @@ -2182,13 +2179,11 @@ Test2::Tools::Compare::like($resp, { 'RTCP', 'filled' ], - 'ingress SSRCs' => [], 'advertised endpoint' => { 'port' => '4001', 'address' => '198.51.100.1', 'family' => 'IPv4' }, - 'egress SSRCs' => [], 'last user packet' => qr//, 'stats_out' => { 'bytes' => '0', @@ -2311,7 +2306,6 @@ $resp = rtpe_req('query', 'unsolicited to-tag w/ via-branch', { }); Test2::Tools::Compare::like($resp, { 'result' => 'ok', 'last redis update' => '0', - 'SSRC' => {}, 'last signal' => qr//, 'tags' => { ft() => { @@ -2324,17 +2318,17 @@ Test2::Tools::Compare::like($resp, { ], 'medias' => [ { + 'ingress SSRCs' => [], + 'egress SSRCs' => [], 'index' => '1', 'streams' => [ { 'last user packet' => qr//, - 'egress SSRCs' => [], 'advertised endpoint' => { 'address' => '198.51.100.1', 'family' => 'IPv4', 'port' => '3000' }, - 'ingress SSRCs' => [], 'flags' => [ 'RTP', 'filled' @@ -2361,13 +2355,11 @@ Test2::Tools::Compare::like($resp, { 'last packet' => qr// }, { - 'egress SSRCs' => [], 'advertised endpoint' => { 'address' => '198.51.100.1', 'family' => 'IPv4', 'port' => '3001' }, - 'ingress SSRCs' => [], 'flags' => [ 'RTCP', 'filled' @@ -2417,6 +2409,8 @@ Test2::Tools::Compare::like($resp, { 'tag' => tt(), 'medias' => [ { + 'ingress SSRCs' => [], + 'egress SSRCs' => [], 'streams' => [ { 'stats_out' => { @@ -2424,8 +2418,6 @@ Test2::Tools::Compare::like($resp, { 'packets' => '0', 'errors' => '0' }, - 'egress SSRCs' => [], - 'ingress SSRCs' => [], 'advertised endpoint' => { 'port' => '4000', 'family' => 'IPv4', @@ -2472,13 +2464,11 @@ Test2::Tools::Compare::like($resp, { 'RTCP', 'filled' ], - 'ingress SSRCs' => [], 'advertised endpoint' => { 'port' => '4001', 'address' => '198.51.100.1', 'family' => 'IPv4' }, - 'egress SSRCs' => [], 'last user packet' => qr//, 'stats_out' => { 'bytes' => '0',