From ef2e31b69d038272b758634b3af9c5c5d7df2cdb Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 7 Mar 2025 09:41:34 -0400 Subject: [PATCH] MT#55283 split up kernelize_one Split function into one part handling the "target" (i.e. ingress) portion of the forwarding chain, and one function handling just the outputs. Change-Id: I3766da3c4bc5caee4eb6bae8978f177a83cc231a --- daemon/media_socket.c | 289 ++++++++++++++++++++++-------------------- 1 file changed, 153 insertions(+), 136 deletions(-) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index c3faed65a..2c36dde07 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1516,6 +1516,8 @@ typedef struct { kernel_output_q outputs; rtp_stats_arr *payload_types; bool ignore_payload_types; // temporary until refactor + bool blackhole; + bool non_forwarding; } kernelize_state; static void kernelize_state_clear(kernelize_state *s) { @@ -1527,61 +1529,22 @@ static void kernelize_state_clear(kernelize_state *s) { G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(kernelize_state, kernelize_state_clear) -/** - * The linkage between userspace and kernel module is in the kernelize_one(). - * - * Called with in_lock held. - * sink_handler can be NULL. - */ -static const char *kernelize_one(kernelize_state *s, - struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks) -{ - call_t *call = stream->call; +__attribute__((nonnull(1, 2))) +static const char *kernelize_target(kernelize_state *s, struct packet_stream *stream) { struct call_media *media = stream->media; - struct packet_stream *sink = sink_handler ? sink_handler->sink : NULL; - bool non_forwarding = false; - bool blackhole = false; - - if (sink_handler) { - if (MEDIA_ISSET(sink->media, BLOCK_EGRESS)) - return NULL; - sink_handler->kernel_output_idx = -1; - } if (MEDIA_ISSET(media, BLACKHOLE)) - blackhole = true; - else if (!sink_handler) - blackhole = true; + s->blackhole = true; - if (blackhole) - non_forwarding = true; + if (s->blackhole) + s->non_forwarding = true; - if (sink && !sink->endpoint.address.family) - return NULL; - - if (sink && sink->selected_sfd) - ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s | %s -> %s%s%s", - FMT_M(endpoint_print_buf(&stream->endpoint)), - endpoint_print_buf(&stream->selected_sfd->socket.local), - endpoint_print_buf(&sink->selected_sfd->socket.local), - FMT_M(endpoint_print_buf(&sink->endpoint))); - else - ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s -> void", - FMT_M(endpoint_print_buf(&stream->endpoint)), - endpoint_print_buf(&stream->selected_sfd->socket.local)); - - const struct streamhandler *handler = __determine_handler(stream, sink_handler); - - if (!handler->in->kernel || !handler->out->kernel) - return "protocol not supported by kernel module"; - - // fill input if needed + ilog(LOG_INFO, "Kernelizing media stream: remote %s%s%s -> local %s", + FMT_M(endpoint_print_buf(&stream->endpoint)), + endpoint_print_buf(&stream->selected_sfd->socket.local)); + // fill input __auto_type reti = &s->reti; - __auto_type payload_types = s->ignore_payload_types ? NULL : &s->payload_types; - - if (reti->local.family) - goto output; if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { mutex_lock(&stream->out_lock); @@ -1599,11 +1562,40 @@ static const char *kernelize_one(kernelize_state *s, reti->rtcp = PS_ISSET(stream, RTCP); reti->dtls = MEDIA_ISSET(media, DTLS); reti->stun = media->ice_agent ? 1 : 0; - reti->non_forwarding = non_forwarding ? 1 : 0; - reti->blackhole = blackhole ? 1 : 0; + reti->non_forwarding = s->non_forwarding ? 1 : 0; + reti->blackhole = s->blackhole ? 1 : 0; reti->rtp_stats = (rtpe_config.measure_rtp || MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0; + // Grab the first stream handler for our decryption function. + // __determine_handler is in charge of only returning a NULL decrypter if it is + // in fact a pure passthrough for all sinks. + const struct streamhandler *handler = NULL; + + for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) { + handler = __determine_handler(stream, l->data); + if (handler) + break; + } + + if (!handler) { + for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { + handler = __determine_handler(stream, l->data); + if (handler) + break; + } + } + + if (!handler) { + // nothing to forward + s->non_forwarding = true; + s->blackhole = true; + return NULL; + } + + if (!handler->in->kernel) + return "protocol not supported by kernel module"; + handler->in->kernel(&reti->decrypt, stream); if (!reti->decrypt.cipher || !reti->decrypt.hmac) return "decryption cipher or HMAC not supported by kernel module"; @@ -1616,71 +1608,102 @@ static const char *kernelize_one(kernelize_state *s, } } - if (proto_is_rtp(media->protocol)) { - reti->rtp = 1; - reti->ssrc_req = 1; - if (!MEDIA_ISSET(media, TRANSCODING)) { - reti->rtcp_fw = 1; - if (media->protocol->avpf) - reti->rtcp_fb_fw = 1; - } - } + recording_stream_kernel_info(stream, reti); - if (reti->rtp && sinks && sinks->length && payload_types) { - struct rtp_stats *rs; - - // this code is execute only once: list therefore must be empty - assert(*payload_types == NULL); - // create sorted list of payload types - unsigned int num_pts = t_hash_table_size(stream->rtp_stats); - *payload_types = rtp_stats_arr_new_sized(num_pts); - (*payload_types)->len = num_pts; - rtp_stats_ht_iter iter; - t_hash_table_iter_init(&iter, stream->rtp_stats); - unsigned int i = 0; - while (t_hash_table_iter_next(&iter, NULL, &rs)) - (*payload_types)->pdata[i++] = rs; - t_ptr_array_sort(*payload_types, __rtp_stats_pt_sort); - for (i = 0; i < num_pts; i++) { - if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) { - ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module"); - break; - } - rs = (*payload_types)->pdata[i]; - // only add payload types that are passthrough for all sinks - bool can_kernelize = true; - for (__auto_type k = sinks->head; k; k = k->next) { - struct sink_handler *ksh = k->data; - struct packet_stream *ksink = ksh->sink; - struct codec_handler *ch = codec_handler_get(media, rs->payload_type, - ksink->media, ksh); - if (ch->kernelize) - continue; - can_kernelize = false; - break; - } - if (!can_kernelize) { - reti->pt_filter = 1; - // ensure that the final list in *payload_types reflects the payload - // types populated in reti->payload_types - t_ptr_array_remove_index(*payload_types, i); + if (!proto_is_rtp(media->protocol)) + return NULL; // everything below is RTP-specific + + reti->rtp = 1; + reti->ssrc_req = 1; + if (!MEDIA_ISSET(media, TRANSCODING)) { + reti->rtcp_fw = 1; + if (media->protocol->avpf) + reti->rtcp_fb_fw = 1; + } + + // handle known RTP payload types: + // create sorted list of payload types + unsigned int num_pts = t_hash_table_size(stream->rtp_stats); + s->payload_types = rtp_stats_arr_new_sized(num_pts); + s->payload_types->len = num_pts; + + rtp_stats_ht_iter iter; + t_hash_table_iter_init(&iter, stream->rtp_stats); + unsigned int i = 0; + struct rtp_stats *rs; + while (t_hash_table_iter_next(&iter, NULL, &rs)) + s->payload_types->pdata[i++] = rs; + t_ptr_array_sort(s->payload_types, __rtp_stats_pt_sort); + + for (i = 0; i < num_pts; i++) { + if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module"); + break; + } + rs = s->payload_types->pdata[i]; + // only add payload types that are passthrough for all sinks + bool can_kernelize = true; + for (__auto_type k = stream->rtp_sinks.head; k; k = k->next) { + struct sink_handler *ksh = k->data; + struct packet_stream *ksink = ksh->sink; + struct codec_handler *ch = codec_handler_get(media, rs->payload_type, + ksink->media, ksh); + if (ch->kernelize) continue; - } - - reti->pt_stats[reti->num_payload_types] = rs; - reti->num_payload_types++; + can_kernelize = false; + break; } - } - else { - if (sink_handler && sink_handler->attrs.transcoding) - return NULL; + if (!can_kernelize) { + reti->pt_filter = 1; + // ensure that the final list in *payload_types reflects the payload + // types populated in reti->payload_types + t_ptr_array_remove_index(s->payload_types, i); + continue; + } + + reti->pt_stats[reti->num_payload_types] = rs; + reti->num_payload_types++; } - recording_stream_kernel_info(stream, reti); + return NULL; +} -output: - // output section: any output at all? - if (non_forwarding || !sink || !sink->selected_sfd) +__attribute__((nonnull(1, 2, 3))) +static const char *kernelize_one(kernelize_state *s, + struct packet_stream *stream, struct sink_handler *sink_handler) +{ + call_t *call = stream->call; + struct call_media *media = stream->media; + struct packet_stream *sink = sink_handler->sink; + + if (MEDIA_ISSET(sink->media, BLOCK_EGRESS)) + return NULL; + sink_handler->kernel_output_idx = -1; + + if (!sink->endpoint.address.family) + return NULL; + + if (sink->selected_sfd) + ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s | %s -> %s%s%s", + FMT_M(endpoint_print_buf(&stream->endpoint)), + endpoint_print_buf(&stream->selected_sfd->socket.local), + endpoint_print_buf(&sink->selected_sfd->socket.local), + FMT_M(endpoint_print_buf(&sink->endpoint))); + else + ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s -> void", + FMT_M(endpoint_print_buf(&stream->endpoint)), + endpoint_print_buf(&stream->selected_sfd->socket.local)); + + const struct streamhandler *handler = __determine_handler(stream, sink_handler); + + if (!handler->out->kernel) + return "protocol not supported by kernel module"; + + __auto_type reti = &s->reti; + __auto_type payload_types = s->ignore_payload_types ? NULL : &s->payload_types; + + // any output at all? + if (s->non_forwarding || !sink->selected_sfd) return NULL; // no output if (!PS_ISSET(sink, FILLED)) return NULL; @@ -1766,12 +1789,12 @@ output: } // helper function for kernelize() static void kernelize_one_sink_handler(kernelize_state *s, - struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks) + struct packet_stream *stream, struct sink_handler *sink_handler) { struct packet_stream *sink = sink_handler->sink; if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) return; - const char *err = kernelize_one(s, stream, sink_handler, &stream->rtp_sinks); + const char *err = kernelize_one(s, stream, sink_handler); if (err) ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); } @@ -1801,34 +1824,28 @@ void kernelize(struct packet_stream *stream) { if (!stream->endpoint.address.family) goto no_kernel; - unsigned int num_sinks = stream->rtp_sinks.length + stream->rtcp_sinks.length; + const char *err = kernelize_target(&s, stream); + if (err) + ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); - if (num_sinks == 0) { - // add blackhole kernel rule - const char *err = kernelize_one(&s, stream, NULL, NULL); - if (err) - ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); + for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + if (sh->attrs.block_media) + continue; + kernelize_one_sink_handler(&s, stream, sh); } - else { - for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) { - struct sink_handler *sh = l->data; - if (sh->attrs.block_media) - continue; - kernelize_one_sink_handler(&s, stream, sh, &stream->rtp_sinks); - } - for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) { - struct sink_handler *sh = l->data; - kernelize_one_sink_handler(&s, stream, sh, &stream->rtp_sinks); - } - // record number of RTP destinations - unsigned int num_rtp_dests = s.reti.num_destinations; - for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { - struct sink_handler *sh = l->data; - s.ignore_payload_types = true; - kernelize_one_sink_handler(&s, stream, sh, &stream->rtp_sinks); - } - s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests; + for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) { + struct sink_handler *sh = l->data; + kernelize_one_sink_handler(&s, stream, sh); + } + // record number of RTP destinations + unsigned int num_rtp_dests = s.reti.num_destinations; + for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + s.ignore_payload_types = true; + kernelize_one_sink_handler(&s, stream, sh); } + s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests; if (!s.reti.local.family) goto no_kernel;