Browse Source

MT#55283 move RTP stats into shm

Allocate per-payload-type stats buffers in shared bufferpool. Push these
to the kernel module for direct tracking of bytes/packets stats. This
eliminates the need to return these values in the /blist data and
continuously update the userspace stats.

The accomodate updating the payload-type tracker, we use the "last
received" timestamp from the kernel module together with the "last seen
payload type" to give similar results as before.

Change-Id: I524791a1940b5d1ff2d82716c3b7a262f64e106c
pull/1826/head
Richard Fuchs 2 years ago
parent
commit
89984967e2
4 changed files with 50 additions and 72 deletions
  1. +9
    -35
      daemon/media_socket.c
  2. +4
    -0
      kernel-module/common_stats.h
  3. +36
    -31
      kernel-module/xt_RTPENGINE.c
  4. +1
    -6
      kernel-module/xt_RTPENGINE.h

+ 9
- 35
daemon/media_socket.c View File

@ -1510,20 +1510,18 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
*payload_types = g_hash_table_get_values(stream->rtp_stats);
*payload_types = g_list_sort(*payload_types, __rtp_stats_pt_sort);
for (l = *payload_types; l; ) {
if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_input)) {
if (reti->num_payload_types >= G_N_ELEMENTS(reti->pt_stats)) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Too many RTP payload types for kernel module");
break;
}
rs = l->data;
// only add payload types that are passthrough for all sinks
bool can_kernelize = true;
unsigned int clockrate = 0;
for (__auto_type k = sinks->head; k; k = k->next) {
struct sink_handler *ksh = k->data;
struct packet_stream *ksink = ksh->sink;
struct codec_handler *ch = codec_handler_get(media, rs->payload_type,
ksink->media, ksh);
clockrate = ch->source_pt.clock_rate;
if (ch->kernelize)
continue;
can_kernelize = false;
@ -1539,9 +1537,8 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
continue;
}
struct rtpengine_pt_input *rpt = &reti->pt_input[reti->num_payload_types++];
rpt->pt_num = rs->payload_type;
rpt->clock_rate = clockrate;
reti->pt_stats[reti->num_payload_types] = rs;
reti->num_payload_types++;
l = l->next;
}
@ -3480,9 +3477,6 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_
enum thread_looper_action kernel_stats_updater(void) {
struct rtpengine_list_entry *ke;
struct packet_stream *ps;
int j;
struct rtp_stats *rs;
unsigned int pt;
endpoint_t ep;
/* TODO: should we realy check the count of call timers? `call_timer_iterator()` */
@ -3512,28 +3506,6 @@ enum thread_looper_action kernel_stats_updater(void) {
ps->in_tos_tclass = ke->tos;
uint64_t max_diff = 0;
int max_pt = -1;
for (j = 0; j < ke->target.num_payload_types; j++) {
pt = ke->target.pt_input[j].pt_num;
rs = g_hash_table_lookup(ps->rtp_stats, GINT_TO_POINTER(pt));
if (!rs)
continue;
if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) {
uint64_t diff = ke->rtp_stats[j].packets - atomic64_get(&rs->packets);
atomic64_add(&rs->packets, diff);
if (diff > max_diff) {
max_diff = diff;
max_pt = pt;
}
}
if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes))
atomic64_add(&rs->bytes,
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
}
bool update = false;
bool active_media = (rtpe_now.tv_sec - packet_stream_last_packet(ps) < 1);
@ -3562,8 +3534,9 @@ enum thread_looper_action kernel_stats_updater(void) {
sink->ssrc_out, 0);
if (!ctx)
continue;
if (max_pt != -1)
payload_tracker_add(&ctx->tracker, max_pt);
if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2)
payload_tracker_add(&ctx->tracker,
atomic_get_na(&ps->stats_in->last_pt));
if (sink->crypto.params.crypto_suite
&& o->encrypt.last_rtp_index[u] - ctx->srtp_index > 0x4000)
{
@ -3594,8 +3567,9 @@ enum thread_looper_action kernel_stats_updater(void) {
// TODO: add in SSRC stats similar to __stream_update_stats
atomic64_set(&ctx->last_seq, ke->target.decrypt.last_rtp_index[u]);
if (max_pt != -1)
payload_tracker_add(&ctx->tracker, max_pt);
if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2)
payload_tracker_add(&ctx->tracker,
atomic_get_na(&ps->stats_in->last_pt));
if (sfd->crypto.params.crypto_suite
&& ke->target.decrypt.last_rtp_index[u]


+ 4
- 0
kernel-module/common_stats.h View File

@ -5,6 +5,9 @@
#ifdef __KERNEL__
typedef atomic64_t atomic64;
static_assert(sizeof(atomic64_t) == sizeof(int64_t), "atomic64_t != int64_t");
static_assert(sizeof(atomic_t) == sizeof(int), "atomic_t != int");
#else
typedef int atomic_t;
#endif
@ -44,6 +47,7 @@ struct stream_stats {
atomic64 bytes;
atomic64 errors;
atomic64 last_packet;
atomic_t last_pt;
};
struct rtp_stats {


+ 36
- 31
kernel-module/xt_RTPENGINE.c View File

@ -309,10 +309,6 @@ struct re_crypto_context {
const struct re_hmac *hmac;
};
struct rtpengine_rtp_stats_a {
atomic64_t packets;
atomic64_t bytes;
};
struct rtpengine_output {
struct rtpengine_output_info output;
struct re_crypto_context encrypt_rtp;
@ -325,7 +321,6 @@ struct rtpengine_target {
unsigned int last_pt; // index into pt_input[] and pt_output[]
atomic_t tos;
struct rtpengine_rtp_stats_a rtp_stats[RTPE_NUM_PAYLOAD_TYPES];
spinlock_t ssrc_stats_lock;
struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING];
@ -1480,11 +1475,6 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
opp->tos = atomic_read(&g->tos);
for (i = 0; i < g->target.num_payload_types; i++) {
opp->rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets);
opp->rtp_stats[i].bytes = atomic64_read(&g->rtp_stats[i].bytes);
}
spin_lock_irqsave(&g->decrypt_rtp.lock, flags);
for (i = 0; i < ARRAY_SIZE(opp->target.decrypt.last_rtp_index); i++)
opp->target.decrypt.last_rtp_index[i] = g->target.decrypt.last_rtp_index[i];
@ -1700,9 +1690,9 @@ static int proc_list_show(struct seq_file *f, void *v) {
(unsigned long long) atomic64_read(&g->target.stats->errors));
for (i = 0; i < g->target.num_payload_types; i++) {
seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n",
g->target.pt_input[i].pt_num,
(unsigned long long) atomic64_read(&g->rtp_stats[i].bytes),
(unsigned long long) atomic64_read(&g->rtp_stats[i].packets));
g->target.pt_stats[i]->payload_type,
(unsigned long long) atomic64_read(&g->target.pt_stats[i]->bytes),
(unsigned long long) atomic64_read(&g->target.pt_stats[i]->packets));
}
seq_printf(f, " SSRC in:");
@ -1775,7 +1765,7 @@ static int proc_list_show(struct seq_file *f, void *v) {
if (o->output.pt_output[j].replace_pattern_len || o->output.pt_output[j].min_payload_len)
seq_printf(f, " RTP payload type %3u: "
"%u bytes replacement payload, min payload len %u\n",
g->target.pt_input[j].pt_num,
g->target.pt_stats[j]->payload_type,
o->output.pt_output[j].replace_pattern_len,
o->output.pt_output[j].min_payload_len);
}
@ -2421,6 +2411,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
unsigned int u;
struct interface_stats_block *iface_stats;
struct stream_stats *stats;
struct rtp_stats *pt_stats[RTPE_NUM_PAYLOAD_TYPES];
/* validation */
@ -2432,6 +2423,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
return -EINVAL;
if (i->num_rtcp_destinations > i->num_destinations)
return -EINVAL;
if (i->num_payload_types > RTPE_NUM_PAYLOAD_TYPES)
return -EINVAL;
if (!i->non_forwarding) {
if (!i->num_destinations)
return -EINVAL;
@ -2449,6 +2442,11 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
stats = shm_map_resolve(i->stats, sizeof(*stats));
if (!stats)
return -EFAULT;
for (u = 0; u < i->num_payload_types; u++) {
pt_stats[u] = shm_map_resolve(i->pt_stats[u], sizeof(*pt_stats[u]));
if (!pt_stats[u])
return -EFAULT;
}
DBG("Creating new target\n");
@ -2472,6 +2470,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
rwlock_init(&g->outputs_lock);
g->target.iface_stats = iface_stats;
g->target.stats = stats;
for (u = 0; u < i->num_payload_types; u++)
g->target.pt_stats[u] = pt_stats[u];
if (i->num_destinations) {
err = -ENOMEM;
@ -5048,12 +5048,13 @@ static inline int is_dtls(struct sk_buff *skb) {
return 1;
}
static int rtp_payload_match(const void *a, const void *b) {
const struct rtpengine_pt_input *A = a, *B = b;
static int rtp_payload_match(const void *a, const void *bp) {
const struct rtp_stats *const *b = bp;
const struct rtp_stats *A = a, *B = *b;
if (A->pt_num < B->pt_num)
if (A->payload_type < B->payload_type)
return -1;
if (A->pt_num > B->pt_num)
if (A->payload_type > B->payload_type)
return 1;
return 0;
}
@ -5061,20 +5062,20 @@ static int rtp_payload_match(const void *a, const void *b) {
static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rtpengine_target_info *tg,
int *last_pt)
{
struct rtpengine_pt_input pt;
const struct rtpengine_pt_input *match;
struct rtp_stats pt;
struct rtp_stats *const *pmatch;
pt.pt_num = hdr->m_pt & 0x7f;
pt.payload_type = hdr->m_pt & 0x7f;
if (*last_pt < tg->num_payload_types) {
match = &tg->pt_input[*last_pt];
if (rtp_payload_match(match, &pt) == 0)
pmatch = &tg->pt_stats[*last_pt];
if (rtp_payload_match(&pt, pmatch) == 0)
goto found;
}
match = bsearch(&pt, tg->pt_input, tg->num_payload_types, sizeof(pt), rtp_payload_match);
if (!match)
pmatch = bsearch(&pt, tg->pt_stats, tg->num_payload_types, sizeof(*pmatch), rtp_payload_match);
if (!pmatch)
return -1;
found:
*last_pt = match - tg->pt_input;
*last_pt = pmatch - tg->pt_stats;
return *last_pt;
}
@ -5283,7 +5284,7 @@ static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 ar
// jitter
// RFC 3550 A.8
clockrate = g->target.pt_input[pt_idx].clock_rate;
clockrate = g->target.pt_stats[pt_idx]->clock_rate;
transit = ((uint32_t) (div64_s64(arrival_time, 1000) * clockrate) / 1000) - ts;
d = 0;
if (s->transit)
@ -5431,8 +5432,12 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
goto out_error;
// if RTP, only forward packets of known/passthrough payload types
if (g->target.pt_filter && rtp_pt_idx < 0)
goto out;
if (rtp_pt_idx < 0) {
if (g->target.pt_filter)
goto out;
}
else
atomic_set(&g->target.stats->last_pt, g->target.pt_stats[rtp_pt_idx]->payload_type);
errstr = "SRTP decryption failed";
err = srtp_decrypt(&g->decrypt_rtp, &g->target.decrypt, &rtp, &pkt_idx);
@ -5554,8 +5559,8 @@ do_stats:
atomic64_add(datalen, &t->rtpe_stats->bytes_kernel);
if (rtp_pt_idx >= 0) {
atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets);
atomic64_add(datalen, &g->rtp_stats[rtp_pt_idx].bytes);
atomic64_inc(&g->target.pt_stats[rtp_pt_idx]->packets);
atomic64_add(datalen, &g->target.pt_stats[rtp_pt_idx]->bytes);
}
else if (rtp_pt_idx == -2)
/* not RTP */ ;


+ 1
- 6
kernel-module/xt_RTPENGINE.h View File

@ -90,10 +90,6 @@ enum rtpengine_src_mismatch {
MSM_PROPAGATE, /* propagate to userspace daemon */
};
struct rtpengine_pt_input {
unsigned char pt_num;
uint32_t clock_rate;
};
struct rtpengine_pt_output {
unsigned int min_payload_len;
char replace_pattern[16];
@ -111,7 +107,7 @@ struct rtpengine_target_info {
struct rtpengine_srtp decrypt;
uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // Expose the SSRC to userspace when we resync.
struct rtpengine_pt_input pt_input[RTPE_NUM_PAYLOAD_TYPES]; /* must be sorted */
struct rtp_stats *pt_stats[RTPE_NUM_PAYLOAD_TYPES]; // must be sorted by PT
unsigned int num_payload_types;
struct interface_stats_block *iface_stats; // for ingress stats
@ -272,7 +268,6 @@ struct rtpengine_command_send_packet {
struct rtpengine_list_entry {
struct rtpengine_target_info target;
struct rtpengine_rtp_stats rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; // same index as pt_input
struct rtpengine_output_info outputs[RTPE_MAX_FORWARD_DESTINATIONS];
int tos;
};


Loading…
Cancel
Save