From 8cf038e4fa7869d8f5601384b82fc99eb1109437 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 10 Sep 2025 10:30:59 -0400 Subject: [PATCH] MT#63317 rework kernel output grouping Change-Id: I3e798e5f1e5ae2f4af247819e3d323cdbf56b36b --- daemon/call.c | 11 +++- daemon/media_socket.c | 102 +++++++++++++++++++++++------------ include/call.h | 5 +- kernel-module/xt_RTPENGINE.c | 56 +++++++++++++------ kernel-module/xt_RTPENGINE.h | 17 +++++- 5 files changed, 139 insertions(+), 52 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 028e5db5a..6e27ff43c 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -3265,7 +3265,7 @@ static void media_update_transcoding_flag(struct call_media *media) { * For handling sdp media level manipulations (media sessions remove). * This function just adds a fictitious media for this side, pretending it had 0 port. */ -static struct call_media * monologue_add_zero_media(struct call_monologue *sender_ml, struct stream_params *sp, +static struct call_media *monologue_add_zero_media(struct call_monologue *sender_ml, struct stream_params *sp, unsigned int *num_ports_other, sdp_ng_flags *flags, str_ht tracker) { struct call_media *sender_media = NULL; @@ -3289,6 +3289,15 @@ static struct call_media * monologue_add_zero_media(struct call_monologue *sende return sender_media; } +struct packet_stream *get_media_component(struct call_media *media, unsigned int component) { + // XXX maybe turn into array? + for (__auto_type l = media->streams.head; l; l = l->next) { + if (l->data->component == component) + return l->data; + } + return NULL; +} + // reset all bundle state __attribute__((nonnull(1))) static void monologue_bundle_reset(struct call_monologue *ml) { diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 40590bafc..842678add 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1553,6 +1553,9 @@ typedef struct { struct rtpengine_target_info reti; struct ssrc_entry_call *ssrc[RTPE_NUM_SSRC_TRACKING]; kernel_output_q outputs; + sink_handler_q *rtp_sinks[RTPE_NUM_OUTPUT_MEDIA]; + sink_handler_q *rtp_mirrors[RTPE_NUM_OUTPUT_MEDIA]; + sink_handler_q *rtcp_sinks[RTPE_NUM_OUTPUT_MEDIA]; struct rtp_stats *payload_types[RTPE_NUM_PAYLOAD_TYPES]; unsigned int num_payload_types; bool blackhole; @@ -1572,6 +1575,10 @@ G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(kernelize_state, kernelize_state_clear) __attribute__((nonnull(1, 2))) static const char *kernelize_target(kernelize_state *s, struct packet_stream *stream) { struct call_media *media = stream->media; + unsigned int media_idx = media->index - 1; + + if (media_idx >= RTPE_NUM_OUTPUT_MEDIA) + return "media index too large"; if (MEDIA_ISSET(media, BLACKHOLE)) s->blackhole = true; @@ -1643,7 +1650,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st reti->track_ssrc = 1; unsigned int u = 0; - for (GList *l = stream->media->ssrc_hash_in.nq.head; l; l = l->next) { + for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) { struct ssrc_entry_call *se = l->data; if (u >= G_N_ELEMENTS(reti->ssrc)) break; @@ -1655,6 +1662,11 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st recording_stream_kernel_info(stream, reti); + // record our outputs for this media + s->rtp_sinks[media_idx] = &stream->rtp_sinks; + s->rtp_mirrors[media_idx] = &stream->rtp_mirrors; + s->rtcp_sinks[media_idx] = &stream->rtcp_sinks; + if (!proto_is_rtp(media->protocol)) return NULL; // everything below is RTP-specific @@ -1712,6 +1724,8 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st } reti->pt_stats[i] = rs; + reti->pt_media_idx[i] = media_idx; + i++; } @@ -1727,7 +1741,8 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st */ __attribute__((nonnull(1, 2, 3))) static const char *kernelize_one(kernelize_state *s, - struct packet_stream *stream, struct sink_handler *sink_handler) + struct packet_stream *stream, struct sink_handler *sink_handler, + bool rtcp) { call_t *call = stream->call; struct call_media *media = stream->media; @@ -1767,6 +1782,7 @@ static const char *kernelize_one(kernelize_state *s, __auto_type redi = g_new0(struct rtpengine_destination_info, 1); redi->local = reti->local; redi->output.tos = call->tos; + redi->output.rtcp = rtcp; // PT manipulations bool silenced = s->silenced || sink_handler->attrs.silence_media; @@ -1850,13 +1866,15 @@ static const char *kernelize_one(kernelize_state *s, } // helper function for kernelize() // called with stream->lock held +__attribute__((nonnull(1, 2, 3))) static bool kernelize_one_sink_handler(kernelize_state *s, - struct packet_stream *stream, struct sink_handler *sink_handler) + struct packet_stream *stream, struct sink_handler *sink_handler, + bool rtcp) { struct packet_stream *sink = sink_handler->sink; if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) return true; - const char *err = kernelize_one(s, stream, sink_handler); + const char *err = kernelize_one(s, stream, sink_handler, rtcp); if (err) { if (!*err) return false; // indicate deadlock @@ -1864,11 +1882,21 @@ static bool kernelize_one_sink_handler(kernelize_state *s, } return true; } + /* called with master_lock held */ static void kernelize(struct packet_stream *stream) { call_t *call = stream->call; struct call_media *media = stream->media; + // act on bundle head if there is one + if (media->bundle) { + unsigned int component = stream->component; + media = media->bundle; + stream = get_media_component(media, component); + if (!stream) + return; // nothing to do? + } + while (true) { g_auto(kernelize_state) s = {0}; @@ -1900,38 +1928,46 @@ static void kernelize(struct packet_stream *stream) { if (err) ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); - // primary RTP sinks - for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) { - struct sink_handler *sh = l->data; - if (sh->attrs.block_media) - continue; - bool ok = kernelize_one_sink_handler(&s, stream, sh); - if (!ok) - continue; // retry - } - // RTP egress mirrors - for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) { - struct sink_handler *sh = l->data; - bool ok = kernelize_one_sink_handler(&s, stream, sh); - if (!ok) - continue; // retry - } - // RTP -> RTCP sinks - // record number of RTP destinations up to now - unsigned int num_rtp_dests = s.reti.num_destinations; - // ignore RTP payload types - for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { - struct sink_handler *sh = l->data; - bool ok = kernelize_one_sink_handler(&s, stream, sh); - if (!ok) - continue; // retry - } - // mark the start of RTCP outputs - s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests; - if (!s.reti.local.family) goto no_kernel; + for (unsigned int mi = 0; mi < RTPE_NUM_OUTPUT_MEDIA; mi++) { + if (!s.rtp_sinks[mi]) + continue; // not filled + + // primary RTP sinks + s.reti.media_output_idxs[mi].rtp_start_idx = s.reti.num_destinations; + for (__auto_type l = s.rtp_sinks[mi]->head; l; l = l->next) { + struct sink_handler *sh = l->data; + if (sh->attrs.block_media) + continue; + bool ok = kernelize_one_sink_handler(&s, stream, sh, false); + if (!ok) + continue; // retry + } + // RTP egress mirrors + for (__auto_type l = s.rtp_mirrors[mi]->head; l; l = l->next) { + struct sink_handler *sh = l->data; + bool ok = kernelize_one_sink_handler(&s, stream, sh, false); + if (!ok) + continue; // retry + } + // RTP -> RTCP sinks + // record number of RTP destinations up to now + s.reti.media_output_idxs[mi].rtp_end_idx = s.reti.num_destinations; + // also marks the start of RTCP outputs + s.reti.media_output_idxs[mi].rtcp_start_idx = s.reti.num_destinations; + // ignore RTP payload types + for (__auto_type l = s.rtcp_sinks[mi]->head; l; l = l->next) { + struct sink_handler *sh = l->data; + bool ok = kernelize_one_sink_handler(&s, stream, sh, true); + if (!ok) + continue; // retry + } + // mark the end of RTCP outputs + s.reti.media_output_idxs[mi].rtcp_end_idx = s.reti.num_destinations; + } + if (!s.outputs.length && !s.reti.non_forwarding) { s.reti.non_forwarding = 1; ilog(LOG_NOTICE | LOG_FLAG_LIMIT, "Setting 'non-forwarding' flag for kernel stream due to " diff --git a/include/call.h b/include/call.h index 9f529b02e..e67844f5b 100644 --- a/include/call.h +++ b/include/call.h @@ -847,7 +847,10 @@ struct media_subscription *call_ml_get_top_ms(struct call_monologue *ml); bool call_ml_sendonly_inactive(struct call_monologue *ml); struct media_subscription *call_media_get_top_ms(struct call_media * cm); struct media_subscription *call_get_media_subscription(subscription_ht ht, struct call_media * cm); -struct call_monologue * ml_medias_subscribed_to_single_ml(struct call_monologue *ml); +struct call_monologue *ml_medias_subscribed_to_single_ml(struct call_monologue *ml); + +__attribute__((nonnull(1))) +struct packet_stream *get_media_component(struct call_media *media, unsigned int component); void free_sink_handler(struct sink_handler *); void __add_sink_handler(sink_handler_q *, struct packet_stream *, const struct sink_attrs *); diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index f5b87e7de..7e2e75478 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -340,7 +340,6 @@ struct rtpengine_target { rwlock_t outputs_lock; struct rtpengine_output *outputs; - unsigned int num_rtp_destinations; unsigned int outputs_unfilled; // only ever decreases }; @@ -1715,10 +1714,11 @@ static int proc_list_show(struct seq_file *f, void *v) { (unsigned long long) atomic64_read(&g->target.stats->packets), (unsigned long long) atomic64_read(&g->target.stats->errors)); for (i = 0; i < g->target.num_payload_types; i++) { - seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n", + seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets [group #%u]\n", g->target.pt_stats[i]->payload_type, (unsigned long long) atomic64_read(&g->target.pt_stats[i]->bytes), - (unsigned long long) atomic64_read(&g->target.pt_stats[i]->packets)); + (unsigned long long) atomic64_read(&g->target.pt_stats[i]->packets), + g->target.pt_media_idx[i]); } seq_printf(f, " last packet: %lli", @@ -1767,12 +1767,26 @@ static int proc_list_show(struct seq_file *f, void *v) { seq_printf(f, " forward-RTCP-FB"); seq_printf(f, "\n"); + seq_printf(f, " output groups:"); + for (i = 0; i < RTPE_NUM_OUTPUT_MEDIA; i++) { + if (g->target.media_output_idxs[i].rtp_start_idx + != g->target.media_output_idxs[i].rtp_end_idx + || g->target.media_output_idxs[i].rtcp_start_idx + != g->target.media_output_idxs[i].rtcp_end_idx) + seq_printf(f, " [%u]=(%u->%u/%u->%u)", i, + g->target.media_output_idxs[i].rtp_start_idx, + g->target.media_output_idxs[i].rtp_end_idx, + g->target.media_output_idxs[i].rtcp_start_idx, + g->target.media_output_idxs[i].rtcp_end_idx); + } + seq_printf(f, "\n"); + for (i = 0; i < g->target.num_destinations; i++) { struct rtpengine_output *o = &g->outputs[i]; - if (i < g->num_rtp_destinations) - seq_printf(f, " output #%u\n", i); - else - seq_printf(f, " output #%u (RTCP)\n", i); + + seq_printf(f, " output #%u (media #%u%s)\n", i, o->output.media_idx, + o->output.rtcp ? ", RTCP" : ""); + proc_list_addr_print(f, "src", &o->output.src_addr); proc_list_addr_print(f, "dst", &o->output.dst_addr); @@ -2459,8 +2473,20 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i return -EINVAL; if (i->num_destinations > RTPE_MAX_FORWARD_DESTINATIONS) return -EINVAL; - if (i->num_rtcp_destinations > i->num_destinations) - return -EINVAL; + for (u = 0; u < RTPE_NUM_OUTPUT_MEDIA; u++) { + if (i->media_output_idxs[u].rtp_start_idx >= i->num_destinations) + return -EINVAL; + if (i->media_output_idxs[u].rtp_end_idx > i->num_destinations) + return -EINVAL; + if (i->media_output_idxs[u].rtp_end_idx < i->media_output_idxs[u].rtp_start_idx) + return -EINVAL; + if (i->media_output_idxs[u].rtcp_start_idx >= i->num_destinations) + return -EINVAL; + if (i->media_output_idxs[u].rtcp_end_idx > i->num_destinations) + return -EINVAL; + if (i->media_output_idxs[u].rtcp_end_idx < i->media_output_idxs[u].rtcp_start_idx) + return -EINVAL; + } if (i->num_payload_types > RTPE_NUM_PAYLOAD_TYPES) return -EINVAL; if (!i->non_forwarding) { @@ -2484,6 +2510,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i pt_stats[u] = shm_map_resolve(i->pt_stats[u], sizeof(*pt_stats[u])); if (!pt_stats[u]) return -EFAULT; + if (i->pt_media_idx[u] > RTPE_NUM_OUTPUT_MEDIA) + return -EINVAL; } for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { if (!i->ssrc[u]) @@ -2526,7 +2554,6 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i if (!g->outputs) goto fail2; g->outputs_unfilled = i->num_destinations; - g->num_rtp_destinations = i->num_destinations - i->num_rtcp_destinations; } err = gen_rtp_session_keys(&g->decrypt_rtp, &g->target.decrypt); @@ -6276,6 +6303,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb, unsigned int start_idx, end_idx; enum {NOT_RTCP = 0, RTCP, RTCP_FORWARD} is_rtcp; ktime_t packet_ts; + struct rtpengine_output_group *output_group; skb_reset_transport_header(skb); uh = udp_hdr(skb); @@ -6446,11 +6474,9 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb, } // output - start_idx = (is_rtcp != NOT_RTCP) ? g->num_rtp_destinations : 0; - end_idx = (is_rtcp != NOT_RTCP) ? g->target.num_destinations : g->num_rtp_destinations; - - if (start_idx == end_idx) - goto out; // pass to userspace + output_group = &g->target.media_output_idxs[0]; + start_idx = (is_rtcp != NOT_RTCP) ? output_group->rtcp_start_idx : output_group->rtp_start_idx; + end_idx = (is_rtcp != NOT_RTCP) ? output_group->rtcp_end_idx : output_group->rtp_end_idx; for (i = start_idx; i < end_idx; i++) { struct rtpengine_output *o = &g->outputs[i]; diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 28db4d172..b7c327cb1 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -9,6 +9,7 @@ #define RTPE_MAX_FORWARD_DESTINATIONS 32 #define RTPE_NUM_SSRC_TRACKING 4 #define RTPE_NUM_EXTMAP_FILTER 32 +#define RTPE_NUM_OUTPUT_MEDIA 8 @@ -82,19 +83,28 @@ struct rtpengine_pt_output { unsigned int blackhole:1; }; +struct rtpengine_output_group { + unsigned int rtp_start_idx; + unsigned int rtp_end_idx; + unsigned int rtcp_start_idx; + unsigned int rtcp_end_idx; +}; + struct rtpengine_target_info { struct re_address local; struct re_address expected_src; /* for incoming packets */ enum rtpengine_src_mismatch src_mismatch; unsigned int num_destinations; // total - unsigned int num_rtcp_destinations; unsigned int intercept_stream_idx; + struct rtpengine_output_group media_output_idxs[RTPE_NUM_OUTPUT_MEDIA]; + struct rtpengine_srtp decrypt; uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // Expose the SSRC to userspace when we resync. struct ssrc_stats *ssrc_stats[RTPE_NUM_SSRC_TRACKING]; struct rtp_stats *pt_stats[RTPE_NUM_PAYLOAD_TYPES]; // must be sorted by PT + unsigned int pt_media_idx[RTPE_NUM_PAYLOAD_TYPES]; // same idx as pt_stats unsigned int num_payload_types; struct interface_stats_block *iface_stats; // for ingress stats @@ -121,6 +131,8 @@ struct rtpengine_output_info { struct re_address src_addr; /* for outgoing packets */ struct re_address dst_addr; + unsigned int media_idx; + struct rtpengine_srtp encrypt; uint32_t ssrc_out[RTPE_NUM_SSRC_TRACKING]; // Rewrite SSRC uint32_t seq_offset[RTPE_NUM_SSRC_TRACKING]; // Rewrite output seq @@ -139,7 +151,8 @@ struct rtpengine_output_info { unsigned char tos; unsigned int ssrc_subst:1, - extmap:1; + extmap:1, + rtcp:1; }; struct rtpengine_destination_info {