Browse Source

MT#63317 rework kernel output grouping

Change-Id: I3e798e5f1e5ae2f4af247819e3d323cdbf56b36b
rfuchs/dtls-ice
Richard Fuchs 3 months ago
parent
commit
8cf038e4fa
5 changed files with 139 additions and 52 deletions
  1. +10
    -1
      daemon/call.c
  2. +69
    -33
      daemon/media_socket.c
  3. +4
    -1
      include/call.h
  4. +41
    -15
      kernel-module/xt_RTPENGINE.c
  5. +15
    -2
      kernel-module/xt_RTPENGINE.h

+ 10
- 1
daemon/call.c View File

@ -3265,7 +3265,7 @@ static void media_update_transcoding_flag(struct call_media *media) {
* For handling sdp media level manipulations (media sessions remove).
* This function just adds a fictitious media for this side, pretending it had 0 port.
*/
static struct call_media * monologue_add_zero_media(struct call_monologue *sender_ml, struct stream_params *sp,
static struct call_media *monologue_add_zero_media(struct call_monologue *sender_ml, struct stream_params *sp,
unsigned int *num_ports_other, sdp_ng_flags *flags, str_ht tracker)
{
struct call_media *sender_media = NULL;
@ -3289,6 +3289,15 @@ static struct call_media * monologue_add_zero_media(struct call_monologue *sende
return sender_media;
}
struct packet_stream *get_media_component(struct call_media *media, unsigned int component) {
// XXX maybe turn into array?
for (__auto_type l = media->streams.head; l; l = l->next) {
if (l->data->component == component)
return l->data;
}
return NULL;
}
// reset all bundle state
__attribute__((nonnull(1)))
static void monologue_bundle_reset(struct call_monologue *ml) {


+ 69
- 33
daemon/media_socket.c View File

@ -1553,6 +1553,9 @@ typedef struct {
struct rtpengine_target_info reti;
struct ssrc_entry_call *ssrc[RTPE_NUM_SSRC_TRACKING];
kernel_output_q outputs;
sink_handler_q *rtp_sinks[RTPE_NUM_OUTPUT_MEDIA];
sink_handler_q *rtp_mirrors[RTPE_NUM_OUTPUT_MEDIA];
sink_handler_q *rtcp_sinks[RTPE_NUM_OUTPUT_MEDIA];
struct rtp_stats *payload_types[RTPE_NUM_PAYLOAD_TYPES];
unsigned int num_payload_types;
bool blackhole;
@ -1572,6 +1575,10 @@ G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(kernelize_state, kernelize_state_clear)
__attribute__((nonnull(1, 2)))
static const char *kernelize_target(kernelize_state *s, struct packet_stream *stream) {
struct call_media *media = stream->media;
unsigned int media_idx = media->index - 1;
if (media_idx >= RTPE_NUM_OUTPUT_MEDIA)
return "media index too large";
if (MEDIA_ISSET(media, BLACKHOLE))
s->blackhole = true;
@ -1643,7 +1650,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
reti->track_ssrc = 1;
unsigned int u = 0;
for (GList *l = stream->media->ssrc_hash_in.nq.head; l; l = l->next) {
for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
if (u >= G_N_ELEMENTS(reti->ssrc))
break;
@ -1655,6 +1662,11 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
recording_stream_kernel_info(stream, reti);
// record our outputs for this media
s->rtp_sinks[media_idx] = &stream->rtp_sinks;
s->rtp_mirrors[media_idx] = &stream->rtp_mirrors;
s->rtcp_sinks[media_idx] = &stream->rtcp_sinks;
if (!proto_is_rtp(media->protocol))
return NULL; // everything below is RTP-specific
@ -1712,6 +1724,8 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
}
reti->pt_stats[i] = rs;
reti->pt_media_idx[i] = media_idx;
i++;
}
@ -1727,7 +1741,8 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
*/
__attribute__((nonnull(1, 2, 3)))
static const char *kernelize_one(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler)
struct packet_stream *stream, struct sink_handler *sink_handler,
bool rtcp)
{
call_t *call = stream->call;
struct call_media *media = stream->media;
@ -1767,6 +1782,7 @@ static const char *kernelize_one(kernelize_state *s,
__auto_type redi = g_new0(struct rtpengine_destination_info, 1);
redi->local = reti->local;
redi->output.tos = call->tos;
redi->output.rtcp = rtcp;
// PT manipulations
bool silenced = s->silenced || sink_handler->attrs.silence_media;
@ -1850,13 +1866,15 @@ static const char *kernelize_one(kernelize_state *s,
}
// helper function for kernelize()
// called with stream->lock held
__attribute__((nonnull(1, 2, 3)))
static bool kernelize_one_sink_handler(kernelize_state *s,
struct packet_stream *stream, struct sink_handler *sink_handler)
struct packet_stream *stream, struct sink_handler *sink_handler,
bool rtcp)
{
struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return true;
const char *err = kernelize_one(s, stream, sink_handler);
const char *err = kernelize_one(s, stream, sink_handler, rtcp);
if (err) {
if (!*err)
return false; // indicate deadlock
@ -1864,11 +1882,21 @@ static bool kernelize_one_sink_handler(kernelize_state *s,
}
return true;
}
/* called with master_lock held */
static void kernelize(struct packet_stream *stream) {
call_t *call = stream->call;
struct call_media *media = stream->media;
// act on bundle head if there is one
if (media->bundle) {
unsigned int component = stream->component;
media = media->bundle;
stream = get_media_component(media, component);
if (!stream)
return; // nothing to do?
}
while (true) {
g_auto(kernelize_state) s = {0};
@ -1900,38 +1928,46 @@ static void kernelize(struct packet_stream *stream) {
if (err)
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
// primary RTP sinks
for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
continue; // retry
}
// RTP egress mirrors
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
continue; // retry
}
// RTP -> RTCP sinks
// record number of RTP destinations up to now
unsigned int num_rtp_dests = s.reti.num_destinations;
// ignore RTP payload types
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
bool ok = kernelize_one_sink_handler(&s, stream, sh);
if (!ok)
continue; // retry
}
// mark the start of RTCP outputs
s.reti.num_rtcp_destinations = s.reti.num_destinations - num_rtp_dests;
if (!s.reti.local.family)
goto no_kernel;
for (unsigned int mi = 0; mi < RTPE_NUM_OUTPUT_MEDIA; mi++) {
if (!s.rtp_sinks[mi])
continue; // not filled
// primary RTP sinks
s.reti.media_output_idxs[mi].rtp_start_idx = s.reti.num_destinations;
for (__auto_type l = s.rtp_sinks[mi]->head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
bool ok = kernelize_one_sink_handler(&s, stream, sh, false);
if (!ok)
continue; // retry
}
// RTP egress mirrors
for (__auto_type l = s.rtp_mirrors[mi]->head; l; l = l->next) {
struct sink_handler *sh = l->data;
bool ok = kernelize_one_sink_handler(&s, stream, sh, false);
if (!ok)
continue; // retry
}
// RTP -> RTCP sinks
// record number of RTP destinations up to now
s.reti.media_output_idxs[mi].rtp_end_idx = s.reti.num_destinations;
// also marks the start of RTCP outputs
s.reti.media_output_idxs[mi].rtcp_start_idx = s.reti.num_destinations;
// ignore RTP payload types
for (__auto_type l = s.rtcp_sinks[mi]->head; l; l = l->next) {
struct sink_handler *sh = l->data;
bool ok = kernelize_one_sink_handler(&s, stream, sh, true);
if (!ok)
continue; // retry
}
// mark the end of RTCP outputs
s.reti.media_output_idxs[mi].rtcp_end_idx = s.reti.num_destinations;
}
if (!s.outputs.length && !s.reti.non_forwarding) {
s.reti.non_forwarding = 1;
ilog(LOG_NOTICE | LOG_FLAG_LIMIT, "Setting 'non-forwarding' flag for kernel stream due to "


+ 4
- 1
include/call.h View File

@ -847,7 +847,10 @@ struct media_subscription *call_ml_get_top_ms(struct call_monologue *ml);
bool call_ml_sendonly_inactive(struct call_monologue *ml);
struct media_subscription *call_media_get_top_ms(struct call_media * cm);
struct media_subscription *call_get_media_subscription(subscription_ht ht, struct call_media * cm);
struct call_monologue * ml_medias_subscribed_to_single_ml(struct call_monologue *ml);
struct call_monologue *ml_medias_subscribed_to_single_ml(struct call_monologue *ml);
__attribute__((nonnull(1)))
struct packet_stream *get_media_component(struct call_media *media, unsigned int component);
void free_sink_handler(struct sink_handler *);
void __add_sink_handler(sink_handler_q *, struct packet_stream *, const struct sink_attrs *);


+ 41
- 15
kernel-module/xt_RTPENGINE.c View File

@ -340,7 +340,6 @@ struct rtpengine_target {
rwlock_t outputs_lock;
struct rtpengine_output *outputs;
unsigned int num_rtp_destinations;
unsigned int outputs_unfilled; // only ever decreases
};
@ -1715,10 +1714,11 @@ static int proc_list_show(struct seq_file *f, void *v) {
(unsigned long long) atomic64_read(&g->target.stats->packets),
(unsigned long long) atomic64_read(&g->target.stats->errors));
for (i = 0; i < g->target.num_payload_types; i++) {
seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n",
seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets [group #%u]\n",
g->target.pt_stats[i]->payload_type,
(unsigned long long) atomic64_read(&g->target.pt_stats[i]->bytes),
(unsigned long long) atomic64_read(&g->target.pt_stats[i]->packets));
(unsigned long long) atomic64_read(&g->target.pt_stats[i]->packets),
g->target.pt_media_idx[i]);
}
seq_printf(f, " last packet: %lli",
@ -1767,12 +1767,26 @@ static int proc_list_show(struct seq_file *f, void *v) {
seq_printf(f, " forward-RTCP-FB");
seq_printf(f, "\n");
seq_printf(f, " output groups:");
for (i = 0; i < RTPE_NUM_OUTPUT_MEDIA; i++) {
if (g->target.media_output_idxs[i].rtp_start_idx
!= g->target.media_output_idxs[i].rtp_end_idx
|| g->target.media_output_idxs[i].rtcp_start_idx
!= g->target.media_output_idxs[i].rtcp_end_idx)
seq_printf(f, " [%u]=(%u->%u/%u->%u)", i,
g->target.media_output_idxs[i].rtp_start_idx,
g->target.media_output_idxs[i].rtp_end_idx,
g->target.media_output_idxs[i].rtcp_start_idx,
g->target.media_output_idxs[i].rtcp_end_idx);
}
seq_printf(f, "\n");
for (i = 0; i < g->target.num_destinations; i++) {
struct rtpengine_output *o = &g->outputs[i];
if (i < g->num_rtp_destinations)
seq_printf(f, " output #%u\n", i);
else
seq_printf(f, " output #%u (RTCP)\n", i);
seq_printf(f, " output #%u (media #%u%s)\n", i, o->output.media_idx,
o->output.rtcp ? ", RTCP" : "");
proc_list_addr_print(f, "src", &o->output.src_addr);
proc_list_addr_print(f, "dst", &o->output.dst_addr);
@ -2459,8 +2473,20 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
return -EINVAL;
if (i->num_destinations > RTPE_MAX_FORWARD_DESTINATIONS)
return -EINVAL;
if (i->num_rtcp_destinations > i->num_destinations)
return -EINVAL;
for (u = 0; u < RTPE_NUM_OUTPUT_MEDIA; u++) {
if (i->media_output_idxs[u].rtp_start_idx >= i->num_destinations)
return -EINVAL;
if (i->media_output_idxs[u].rtp_end_idx > i->num_destinations)
return -EINVAL;
if (i->media_output_idxs[u].rtp_end_idx < i->media_output_idxs[u].rtp_start_idx)
return -EINVAL;
if (i->media_output_idxs[u].rtcp_start_idx >= i->num_destinations)
return -EINVAL;
if (i->media_output_idxs[u].rtcp_end_idx > i->num_destinations)
return -EINVAL;
if (i->media_output_idxs[u].rtcp_end_idx < i->media_output_idxs[u].rtcp_start_idx)
return -EINVAL;
}
if (i->num_payload_types > RTPE_NUM_PAYLOAD_TYPES)
return -EINVAL;
if (!i->non_forwarding) {
@ -2484,6 +2510,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
pt_stats[u] = shm_map_resolve(i->pt_stats[u], sizeof(*pt_stats[u]));
if (!pt_stats[u])
return -EFAULT;
if (i->pt_media_idx[u] > RTPE_NUM_OUTPUT_MEDIA)
return -EINVAL;
}
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!i->ssrc[u])
@ -2526,7 +2554,6 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
if (!g->outputs)
goto fail2;
g->outputs_unfilled = i->num_destinations;
g->num_rtp_destinations = i->num_destinations - i->num_rtcp_destinations;
}
err = gen_rtp_session_keys(&g->decrypt_rtp, &g->target.decrypt);
@ -6276,6 +6303,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
unsigned int start_idx, end_idx;
enum {NOT_RTCP = 0, RTCP, RTCP_FORWARD} is_rtcp;
ktime_t packet_ts;
struct rtpengine_output_group *output_group;
skb_reset_transport_header(skb);
uh = udp_hdr(skb);
@ -6446,11 +6474,9 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
}
// output
start_idx = (is_rtcp != NOT_RTCP) ? g->num_rtp_destinations : 0;
end_idx = (is_rtcp != NOT_RTCP) ? g->target.num_destinations : g->num_rtp_destinations;
if (start_idx == end_idx)
goto out; // pass to userspace
output_group = &g->target.media_output_idxs[0];
start_idx = (is_rtcp != NOT_RTCP) ? output_group->rtcp_start_idx : output_group->rtp_start_idx;
end_idx = (is_rtcp != NOT_RTCP) ? output_group->rtcp_end_idx : output_group->rtp_end_idx;
for (i = start_idx; i < end_idx; i++) {
struct rtpengine_output *o = &g->outputs[i];


+ 15
- 2
kernel-module/xt_RTPENGINE.h View File

@ -9,6 +9,7 @@
#define RTPE_MAX_FORWARD_DESTINATIONS 32
#define RTPE_NUM_SSRC_TRACKING 4
#define RTPE_NUM_EXTMAP_FILTER 32
#define RTPE_NUM_OUTPUT_MEDIA 8
@ -82,19 +83,28 @@ struct rtpengine_pt_output {
unsigned int blackhole:1;
};
struct rtpengine_output_group {
unsigned int rtp_start_idx;
unsigned int rtp_end_idx;
unsigned int rtcp_start_idx;
unsigned int rtcp_end_idx;
};
struct rtpengine_target_info {
struct re_address local;
struct re_address expected_src; /* for incoming packets */
enum rtpengine_src_mismatch src_mismatch;
unsigned int num_destinations; // total
unsigned int num_rtcp_destinations;
unsigned int intercept_stream_idx;
struct rtpengine_output_group media_output_idxs[RTPE_NUM_OUTPUT_MEDIA];
struct rtpengine_srtp decrypt;
uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // Expose the SSRC to userspace when we resync.
struct ssrc_stats *ssrc_stats[RTPE_NUM_SSRC_TRACKING];
struct rtp_stats *pt_stats[RTPE_NUM_PAYLOAD_TYPES]; // must be sorted by PT
unsigned int pt_media_idx[RTPE_NUM_PAYLOAD_TYPES]; // same idx as pt_stats
unsigned int num_payload_types;
struct interface_stats_block *iface_stats; // for ingress stats
@ -121,6 +131,8 @@ struct rtpengine_output_info {
struct re_address src_addr; /* for outgoing packets */
struct re_address dst_addr;
unsigned int media_idx;
struct rtpengine_srtp encrypt;
uint32_t ssrc_out[RTPE_NUM_SSRC_TRACKING]; // Rewrite SSRC
uint32_t seq_offset[RTPE_NUM_SSRC_TRACKING]; // Rewrite output seq
@ -139,7 +151,8 @@ struct rtpengine_output_info {
unsigned char tos;
unsigned int ssrc_subst:1,
extmap:1;
extmap:1,
rtcp:1;
};
struct rtpengine_destination_info {


Loading…
Cancel
Save