Browse Source

MT#55283 split up kernelize_one

Split function into one part handling the "target" (i.e. ingress)
portion of the forwarding chain, and one function handling just the
outputs.

Change-Id: I3766da3c4bc5caee4eb6bae8978f177a83cc231a
pull/1918/head
Richard Fuchs 9 months ago
parent
commit
ef2e31b69d
1 changed files with 153 additions and 136 deletions
  1. +153
    -136
      daemon/media_socket.c

+ 153
- 136
daemon/media_socket.c View File

@ -1516,6 +1516,8 @@ typedef struct {
kernel_output_q outputs; kernel_output_q outputs;
rtp_stats_arr *payload_types; rtp_stats_arr *payload_types;
bool ignore_payload_types; // temporary until refactor bool ignore_payload_types; // temporary until refactor
bool blackhole;
bool non_forwarding;
} kernelize_state; } kernelize_state;
static void kernelize_state_clear(kernelize_state *s) { static void kernelize_state_clear(kernelize_state *s) {
@ -1527,61 +1529,22 @@ static void kernelize_state_clear(kernelize_state *s) {
G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(kernelize_state, kernelize_state_clear) G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(kernelize_state, kernelize_state_clear)
/**
* The linkage between userspace and kernel module is in the kernelize_one().
*
* Called with in_lock held.
* sink_handler can be NULL.
*/
static const char *kernelize_one(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks)
{
call_t *call = stream->call;
__attribute__((nonnull(1, 2)))
static const char *kernelize_target(kernelize_state *s, struct packet_stream *stream) {
struct call_media *media = stream->media; struct call_media *media = stream->media;
struct packet_stream *sink = sink_handler ? sink_handler->sink : NULL;
bool non_forwarding = false;
bool blackhole = false;
if (sink_handler) {
if (MEDIA_ISSET(sink->media, BLOCK_EGRESS))
return NULL;
sink_handler->kernel_output_idx = -1;
}
if (MEDIA_ISSET(media, BLACKHOLE)) if (MEDIA_ISSET(media, BLACKHOLE))
blackhole = true;
else if (!sink_handler)
blackhole = true;
s->blackhole = true;
if (blackhole)
non_forwarding = true;
if (s->blackhole)
s->non_forwarding = true;
if (sink && !sink->endpoint.address.family)
return NULL;
if (sink && sink->selected_sfd)
ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s | %s -> %s%s%s",
FMT_M(endpoint_print_buf(&stream->endpoint)),
endpoint_print_buf(&stream->selected_sfd->socket.local),
endpoint_print_buf(&sink->selected_sfd->socket.local),
FMT_M(endpoint_print_buf(&sink->endpoint)));
else
ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s -> void",
FMT_M(endpoint_print_buf(&stream->endpoint)),
endpoint_print_buf(&stream->selected_sfd->socket.local));
const struct streamhandler *handler = __determine_handler(stream, sink_handler);
if (!handler->in->kernel || !handler->out->kernel)
return "protocol not supported by kernel module";
// fill input if needed
ilog(LOG_INFO, "Kernelizing media stream: remote %s%s%s -> local %s",
FMT_M(endpoint_print_buf(&stream->endpoint)),
endpoint_print_buf(&stream->selected_sfd->socket.local));
// fill input
__auto_type reti = &s->reti; __auto_type reti = &s->reti;
__auto_type payload_types = s->ignore_payload_types ? NULL : &s->payload_types;
if (reti->local.family)
goto output;
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock); mutex_lock(&stream->out_lock);
@ -1599,11 +1562,40 @@ static const char *kernelize_one(kernelize_state *s,
reti->rtcp = PS_ISSET(stream, RTCP); reti->rtcp = PS_ISSET(stream, RTCP);
reti->dtls = MEDIA_ISSET(media, DTLS); reti->dtls = MEDIA_ISSET(media, DTLS);
reti->stun = media->ice_agent ? 1 : 0; reti->stun = media->ice_agent ? 1 : 0;
reti->non_forwarding = non_forwarding ? 1 : 0;
reti->blackhole = blackhole ? 1 : 0;
reti->non_forwarding = s->non_forwarding ? 1 : 0;
reti->blackhole = s->blackhole ? 1 : 0;
reti->rtp_stats = (rtpe_config.measure_rtp reti->rtp_stats = (rtpe_config.measure_rtp
|| MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0; || MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0;
// Grab the first stream handler for our decryption function.
// __determine_handler is in charge of only returning a NULL decrypter if it is
// in fact a pure passthrough for all sinks.
const struct streamhandler *handler = NULL;
for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) {
handler = __determine_handler(stream, l->data);
if (handler)
break;
}
if (!handler) {
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
handler = __determine_handler(stream, l->data);
if (handler)
break;
}
}
if (!handler) {
// nothing to forward
s->non_forwarding = true;
s->blackhole = true;
return NULL;
}
if (!handler->in->kernel)
return "protocol not supported by kernel module";
handler->in->kernel(&reti->decrypt, stream); handler->in->kernel(&reti->decrypt, stream);
if (!reti->decrypt.cipher || !reti->decrypt.hmac) if (!reti->decrypt.cipher || !reti->decrypt.hmac)
return "decryption cipher or HMAC not supported by kernel module"; return "decryption cipher or HMAC not supported by kernel module";
@ -1616,71 +1608,102 @@ static const char *kernelize_one(kernelize_state *s,
} }
} }
if (proto_is_rtp(media->protocol)) {
reti->rtp = 1;
reti->ssrc_req = 1;
if (!MEDIA_ISSET(media, TRANSCODING)) {
reti->rtcp_fw = 1;
if (media->protocol->avpf)
reti->rtcp_fb_fw = 1;
}
}
recording_stream_kernel_info(stream, reti);
if (reti->rtp && sinks && sinks->length && payload_types) {
struct rtp_stats *rs;
// this code is execute only once: list therefore must be empty
assert(*payload_types == NULL);
// create sorted list of payload types
unsigned int num_pts = t_hash_table_size(stream->rtp_stats);
*payload_types = rtp_stats_arr_new_sized(num_pts);
(*payload_types)->len = num_pts;
rtp_stats_ht_iter iter;
t_hash_table_iter_init(&iter, stream->rtp_stats);
unsigned int i = 0;
while (t_hash_table_iter_next(&iter, NULL, &rs))
(*payload_types)->pdata[i++] = rs;
t_ptr_array_sort(*payload_types, __rtp_stats_pt_sort);
for (i = 0; i < num_pts; i++) {
if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module");
break;
}
rs = (*payload_types)->pdata[i];
// only add payload types that are passthrough for all sinks
bool can_kernelize = true;
for (__auto_type k = sinks->head; k; k = k->next) {
struct sink_handler *ksh = k->data;
struct packet_stream *ksink = ksh->sink;
struct codec_handler *ch = codec_handler_get(media, rs->payload_type,
ksink->media, ksh);
if (ch->kernelize)
continue;
can_kernelize = false;
break;
}
if (!can_kernelize) {
reti->pt_filter = 1;
// ensure that the final list in *payload_types reflects the payload
// types populated in reti->payload_types
t_ptr_array_remove_index(*payload_types, i);
if (!proto_is_rtp(media->protocol))
return NULL; // everything below is RTP-specific
reti->rtp = 1;
reti->ssrc_req = 1;
if (!MEDIA_ISSET(media, TRANSCODING)) {
reti->rtcp_fw = 1;
if (media->protocol->avpf)
reti->rtcp_fb_fw = 1;
}
// handle known RTP payload types:
// create sorted list of payload types
unsigned int num_pts = t_hash_table_size(stream->rtp_stats);
s->payload_types = rtp_stats_arr_new_sized(num_pts);
s->payload_types->len = num_pts;
rtp_stats_ht_iter iter;
t_hash_table_iter_init(&iter, stream->rtp_stats);
unsigned int i = 0;
struct rtp_stats *rs;
while (t_hash_table_iter_next(&iter, NULL, &rs))
s->payload_types->pdata[i++] = rs;
t_ptr_array_sort(s->payload_types, __rtp_stats_pt_sort);
for (i = 0; i < num_pts; i++) {
if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module");
break;
}
rs = s->payload_types->pdata[i];
// only add payload types that are passthrough for all sinks
bool can_kernelize = true;
for (__auto_type k = stream->rtp_sinks.head; k; k = k->next) {
struct sink_handler *ksh = k->data;
struct packet_stream *ksink = ksh->sink;
struct codec_handler *ch = codec_handler_get(media, rs->payload_type,
ksink->media, ksh);
if (ch->kernelize)
continue; continue;
}
reti->pt_stats[reti->num_payload_types] = rs;
reti->num_payload_types++;
can_kernelize = false;
break;
} }
}
else {
if (sink_handler && sink_handler->attrs.transcoding)
return NULL;
if (!can_kernelize) {
reti->pt_filter = 1;
// ensure that the final list in *payload_types reflects the payload
// types populated in reti->payload_types
t_ptr_array_remove_index(s->payload_types, i);
continue;
}
reti->pt_stats[reti->num_payload_types] = rs;
reti->num_payload_types++;
} }
recording_stream_kernel_info(stream, reti);
return NULL;
}
output:
// output section: any output at all?
if (non_forwarding || !sink || !sink->selected_sfd)
__attribute__((nonnull(1, 2, 3)))
static const char *kernelize_one(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler)
{
call_t *call = stream->call;
struct call_media *media = stream->media;
struct packet_stream *sink = sink_handler->sink;
if (MEDIA_ISSET(sink->media, BLOCK_EGRESS))
return NULL;
sink_handler->kernel_output_idx = -1;
if (!sink->endpoint.address.family)
return NULL;
if (sink->selected_sfd)
ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s | %s -> %s%s%s",
FMT_M(endpoint_print_buf(&stream->endpoint)),
endpoint_print_buf(&stream->selected_sfd->socket.local),
endpoint_print_buf(&sink->selected_sfd->socket.local),
FMT_M(endpoint_print_buf(&sink->endpoint)));
else
ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s -> void",
FMT_M(endpoint_print_buf(&stream->endpoint)),
endpoint_print_buf(&stream->selected_sfd->socket.local));
const struct streamhandler *handler = __determine_handler(stream, sink_handler);
if (!handler->out->kernel)
return "protocol not supported by kernel module";
__auto_type reti = &s->reti;
__auto_type payload_types = s->ignore_payload_types ? NULL : &s->payload_types;
// any output at all?
if (s->non_forwarding || !sink->selected_sfd)
return NULL; // no output return NULL; // no output
if (!PS_ISSET(sink, FILLED)) if (!PS_ISSET(sink, FILLED))
return NULL; return NULL;
@ -1766,12 +1789,12 @@ output:
} }
// helper function for kernelize() // helper function for kernelize()
static void kernelize_one_sink_handler(kernelize_state *s, static void kernelize_one_sink_handler(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks)
struct packet_stream *stream, struct sink_handler *sink_handler)
{ {
struct packet_stream *sink = sink_handler->sink; struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED)) if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return; return;
const char *err = kernelize_one(s, stream, sink_handler, &stream->rtp_sinks);
const char *err = kernelize_one(s, stream, sink_handler);
if (err) if (err)
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
} }
@ -1801,34 +1824,28 @@ void kernelize(struct packet_stream *stream) {
if (!stream->endpoint.address.family) if (!stream->endpoint.address.family)
goto no_kernel; goto no_kernel;
unsigned int num_sinks = stream->rtp_sinks.length + stream->rtcp_sinks.length;
const char *err = kernelize_target(&s, stream);
if (err)
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
if (num_sinks == 0) {
// add blackhole kernel rule
const char *err = kernelize_one(&s, stream, NULL, NULL);
if (err)
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&s, stream, sh);
} }
else {
for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&s, stream, sh, &stream->rtp_sinks);
}
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&s, stream, sh, &stream->rtp_sinks);
}
// record number of RTP destinations
unsigned int num_rtp_dests = s.reti.num_destinations;
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
s.ignore_payload_types = true;
kernelize_one_sink_handler(&s, stream, sh, &stream->rtp_sinks);
}
s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests;
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&s, stream, sh);
}
// record number of RTP destinations
unsigned int num_rtp_dests = s.reti.num_destinations;
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
s.ignore_payload_types = true;
kernelize_one_sink_handler(&s, stream, sh);
} }
s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests;
if (!s.reti.local.family) if (!s.reti.local.family)
goto no_kernel; goto no_kernel;


Loading…
Cancel
Save