From 89984967e2ccd575787c5c9fef62a8b905e324d6 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 5 Apr 2024 10:44:29 -0400 Subject: [PATCH] MT#55283 move RTP stats into shm Allocate per-payload-type stats buffers in shared bufferpool. Push these to the kernel module for direct tracking of bytes/packets stats. This eliminates the need to return these values in the /blist data and continuously update the userspace stats. The accomodate updating the payload-type tracker, we use the "last received" timestamp from the kernel module together with the "last seen payload type" to give similar results as before. Change-Id: I524791a1940b5d1ff2d82716c3b7a262f64e106c --- daemon/media_socket.c | 44 +++++------------------ kernel-module/common_stats.h | 4 +++ kernel-module/xt_RTPENGINE.c | 67 +++++++++++++++++++----------------- kernel-module/xt_RTPENGINE.h | 7 +--- 4 files changed, 50 insertions(+), 72 deletions(-) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 4a932fd62..20e43260d 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1510,20 +1510,18 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out *payload_types = g_hash_table_get_values(stream->rtp_stats); *payload_types = g_list_sort(*payload_types, __rtp_stats_pt_sort); for (l = *payload_types; l; ) { - if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_input)) { + if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module"); break; } rs = l->data; // only add payload types that are passthrough for all sinks bool can_kernelize = true; - unsigned int clockrate = 0; for (__auto_type k = sinks->head; k; k = k->next) { struct sink_handler *ksh = k->data; struct packet_stream *ksink = ksh->sink; struct codec_handler *ch = codec_handler_get(media, rs->payload_type, ksink->media, ksh); - clockrate = ch->source_pt.clock_rate; if (ch->kernelize) continue; can_kernelize = false; @@ -1539,9 +1537,8 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out continue; } - struct rtpengine_pt_input *rpt = &reti->pt_input[reti->num_payload_types++]; - rpt->pt_num = rs->payload_type; - rpt->clock_rate = clockrate; + reti->pt_stats[reti->num_payload_types] = rs; + reti->num_payload_types++; l = l->next; } @@ -3480,9 +3477,6 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_ enum thread_looper_action kernel_stats_updater(void) { struct rtpengine_list_entry *ke; struct packet_stream *ps; - int j; - struct rtp_stats *rs; - unsigned int pt; endpoint_t ep; /* TODO: should we realy check the count of call timers? `call_timer_iterator()` */ @@ -3512,28 +3506,6 @@ enum thread_looper_action kernel_stats_updater(void) { ps->in_tos_tclass = ke->tos; - uint64_t max_diff = 0; - int max_pt = -1; - for (j = 0; j < ke->target.num_payload_types; j++) { - pt = ke->target.pt_input[j].pt_num; - rs = g_hash_table_lookup(ps->rtp_stats, GINT_TO_POINTER(pt)); - if (!rs) - continue; - if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) { - uint64_t diff = ke->rtp_stats[j].packets - atomic64_get(&rs->packets); - atomic64_add(&rs->packets, diff); - if (diff > max_diff) { - max_diff = diff; - max_pt = pt; - } - } - if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes)) - atomic64_add(&rs->bytes, - ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); - atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); - atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); - } - bool update = false; bool active_media = (rtpe_now.tv_sec - packet_stream_last_packet(ps) < 1); @@ -3562,8 +3534,9 @@ enum thread_looper_action kernel_stats_updater(void) { sink->ssrc_out, 0); if (!ctx) continue; - if (max_pt != -1) - payload_tracker_add(&ctx->tracker, max_pt); + if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2) + payload_tracker_add(&ctx->tracker, + atomic_get_na(&ps->stats_in->last_pt)); if (sink->crypto.params.crypto_suite && o->encrypt.last_rtp_index[u] - ctx->srtp_index > 0x4000) { @@ -3594,8 +3567,9 @@ enum thread_looper_action kernel_stats_updater(void) { // TODO: add in SSRC stats similar to __stream_update_stats atomic64_set(&ctx->last_seq, ke->target.decrypt.last_rtp_index[u]); - if (max_pt != -1) - payload_tracker_add(&ctx->tracker, max_pt); + if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2) + payload_tracker_add(&ctx->tracker, + atomic_get_na(&ps->stats_in->last_pt)); if (sfd->crypto.params.crypto_suite && ke->target.decrypt.last_rtp_index[u] diff --git a/kernel-module/common_stats.h b/kernel-module/common_stats.h index b2a009c63..34d564337 100644 --- a/kernel-module/common_stats.h +++ b/kernel-module/common_stats.h @@ -5,6 +5,9 @@ #ifdef __KERNEL__ typedef atomic64_t atomic64; static_assert(sizeof(atomic64_t) == sizeof(int64_t), "atomic64_t != int64_t"); +static_assert(sizeof(atomic_t) == sizeof(int), "atomic_t != int"); +#else +typedef int atomic_t; #endif @@ -44,6 +47,7 @@ struct stream_stats { atomic64 bytes; atomic64 errors; atomic64 last_packet; + atomic_t last_pt; }; struct rtp_stats { diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index 6f2b12cca..1d6eed4b0 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -309,10 +309,6 @@ struct re_crypto_context { const struct re_hmac *hmac; }; -struct rtpengine_rtp_stats_a { - atomic64_t packets; - atomic64_t bytes; -}; struct rtpengine_output { struct rtpengine_output_info output; struct re_crypto_context encrypt_rtp; @@ -325,7 +321,6 @@ struct rtpengine_target { unsigned int last_pt; // index into pt_input[] and pt_output[] atomic_t tos; - struct rtpengine_rtp_stats_a rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; spinlock_t ssrc_stats_lock; struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING]; @@ -1480,11 +1475,6 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t opp->tos = atomic_read(&g->tos); - for (i = 0; i < g->target.num_payload_types; i++) { - opp->rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets); - opp->rtp_stats[i].bytes = atomic64_read(&g->rtp_stats[i].bytes); - } - spin_lock_irqsave(&g->decrypt_rtp.lock, flags); for (i = 0; i < ARRAY_SIZE(opp->target.decrypt.last_rtp_index); i++) opp->target.decrypt.last_rtp_index[i] = g->target.decrypt.last_rtp_index[i]; @@ -1700,9 +1690,9 @@ static int proc_list_show(struct seq_file *f, void *v) { (unsigned long long) atomic64_read(&g->target.stats->errors)); for (i = 0; i < g->target.num_payload_types; i++) { seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n", - g->target.pt_input[i].pt_num, - (unsigned long long) atomic64_read(&g->rtp_stats[i].bytes), - (unsigned long long) atomic64_read(&g->rtp_stats[i].packets)); + g->target.pt_stats[i]->payload_type, + (unsigned long long) atomic64_read(&g->target.pt_stats[i]->bytes), + (unsigned long long) atomic64_read(&g->target.pt_stats[i]->packets)); } seq_printf(f, " SSRC in:"); @@ -1775,7 +1765,7 @@ static int proc_list_show(struct seq_file *f, void *v) { if (o->output.pt_output[j].replace_pattern_len || o->output.pt_output[j].min_payload_len) seq_printf(f, " RTP payload type %3u: " "%u bytes replacement payload, min payload len %u\n", - g->target.pt_input[j].pt_num, + g->target.pt_stats[j]->payload_type, o->output.pt_output[j].replace_pattern_len, o->output.pt_output[j].min_payload_len); } @@ -2421,6 +2411,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i unsigned int u; struct interface_stats_block *iface_stats; struct stream_stats *stats; + struct rtp_stats *pt_stats[RTPE_NUM_PAYLOAD_TYPES]; /* validation */ @@ -2432,6 +2423,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i return -EINVAL; if (i->num_rtcp_destinations > i->num_destinations) return -EINVAL; + if (i->num_payload_types > RTPE_NUM_PAYLOAD_TYPES) + return -EINVAL; if (!i->non_forwarding) { if (!i->num_destinations) return -EINVAL; @@ -2449,6 +2442,11 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i stats = shm_map_resolve(i->stats, sizeof(*stats)); if (!stats) return -EFAULT; + for (u = 0; u < i->num_payload_types; u++) { + pt_stats[u] = shm_map_resolve(i->pt_stats[u], sizeof(*pt_stats[u])); + if (!pt_stats[u]) + return -EFAULT; + } DBG("Creating new target\n"); @@ -2472,6 +2470,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i rwlock_init(&g->outputs_lock); g->target.iface_stats = iface_stats; g->target.stats = stats; + for (u = 0; u < i->num_payload_types; u++) + g->target.pt_stats[u] = pt_stats[u]; if (i->num_destinations) { err = -ENOMEM; @@ -5048,12 +5048,13 @@ static inline int is_dtls(struct sk_buff *skb) { return 1; } -static int rtp_payload_match(const void *a, const void *b) { - const struct rtpengine_pt_input *A = a, *B = b; +static int rtp_payload_match(const void *a, const void *bp) { + const struct rtp_stats *const *b = bp; + const struct rtp_stats *A = a, *B = *b; - if (A->pt_num < B->pt_num) + if (A->payload_type < B->payload_type) return -1; - if (A->pt_num > B->pt_num) + if (A->payload_type > B->payload_type) return 1; return 0; } @@ -5061,20 +5062,20 @@ static int rtp_payload_match(const void *a, const void *b) { static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rtpengine_target_info *tg, int *last_pt) { - struct rtpengine_pt_input pt; - const struct rtpengine_pt_input *match; + struct rtp_stats pt; + struct rtp_stats *const *pmatch; - pt.pt_num = hdr->m_pt & 0x7f; + pt.payload_type = hdr->m_pt & 0x7f; if (*last_pt < tg->num_payload_types) { - match = &tg->pt_input[*last_pt]; - if (rtp_payload_match(match, &pt) == 0) + pmatch = &tg->pt_stats[*last_pt]; + if (rtp_payload_match(&pt, pmatch) == 0) goto found; } - match = bsearch(&pt, tg->pt_input, tg->num_payload_types, sizeof(pt), rtp_payload_match); - if (!match) + pmatch = bsearch(&pt, tg->pt_stats, tg->num_payload_types, sizeof(*pmatch), rtp_payload_match); + if (!pmatch) return -1; found: - *last_pt = match - tg->pt_input; + *last_pt = pmatch - tg->pt_stats; return *last_pt; } @@ -5283,7 +5284,7 @@ static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 ar // jitter // RFC 3550 A.8 - clockrate = g->target.pt_input[pt_idx].clock_rate; + clockrate = g->target.pt_stats[pt_idx]->clock_rate; transit = ((uint32_t) (div64_s64(arrival_time, 1000) * clockrate) / 1000) - ts; d = 0; if (s->transit) @@ -5431,8 +5432,12 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb, goto out_error; // if RTP, only forward packets of known/passthrough payload types - if (g->target.pt_filter && rtp_pt_idx < 0) - goto out; + if (rtp_pt_idx < 0) { + if (g->target.pt_filter) + goto out; + } + else + atomic_set(&g->target.stats->last_pt, g->target.pt_stats[rtp_pt_idx]->payload_type); errstr = "SRTP decryption failed"; err = srtp_decrypt(&g->decrypt_rtp, &g->target.decrypt, &rtp, &pkt_idx); @@ -5554,8 +5559,8 @@ do_stats: atomic64_add(datalen, &t->rtpe_stats->bytes_kernel); if (rtp_pt_idx >= 0) { - atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets); - atomic64_add(datalen, &g->rtp_stats[rtp_pt_idx].bytes); + atomic64_inc(&g->target.pt_stats[rtp_pt_idx]->packets); + atomic64_add(datalen, &g->target.pt_stats[rtp_pt_idx]->bytes); } else if (rtp_pt_idx == -2) /* not RTP */ ; diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index cd371158a..bb078dc9b 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -90,10 +90,6 @@ enum rtpengine_src_mismatch { MSM_PROPAGATE, /* propagate to userspace daemon */ }; -struct rtpengine_pt_input { - unsigned char pt_num; - uint32_t clock_rate; -}; struct rtpengine_pt_output { unsigned int min_payload_len; char replace_pattern[16]; @@ -111,7 +107,7 @@ struct rtpengine_target_info { struct rtpengine_srtp decrypt; uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // Expose the SSRC to userspace when we resync. - struct rtpengine_pt_input pt_input[RTPE_NUM_PAYLOAD_TYPES]; /* must be sorted */ + struct rtp_stats *pt_stats[RTPE_NUM_PAYLOAD_TYPES]; // must be sorted by PT unsigned int num_payload_types; struct interface_stats_block *iface_stats; // for ingress stats @@ -272,7 +268,6 @@ struct rtpengine_command_send_packet { struct rtpengine_list_entry { struct rtpengine_target_info target; - struct rtpengine_rtp_stats rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; // same index as pt_input struct rtpengine_output_info outputs[RTPE_MAX_FORWARD_DESTINATIONS]; int tos; };