Browse Source

MT#55283 move stream stats to shm

This leaves the TOS value as the only leftover to be not in shm.

The kernel_stats structures remain for the time being as they're needed
to determine whether kernel forwarding is done or not, as well as for
global stats.

Change-Id: I158f18098e018d2870b797f1a196baa03a0e0fb7
pull/1826/head
Richard Fuchs 2 years ago
parent
commit
065270ba49
3 changed files with 50 additions and 58 deletions
  1. +18
    -17
      daemon/media_socket.c
  2. +29
    -33
      kernel-module/xt_RTPENGINE.c
  3. +3
    -8
      kernel-module/xt_RTPENGINE.h

+ 18
- 17
daemon/media_socket.c View File

@ -45,19 +45,19 @@
#define MAX_RECV_LOOP_STRIKES 5
#endif
#define DS_io(x, ps, ke, io) do { \
uint64_t ks_val; \
ks_val = atomic64_get(&ps->kernel_stats_ ## io.x); \
if ((ke)->x < ks_val) \
#define DS_io(x, ps, io) do { \
uint64_t ks_val, cur_val; \
ks_val = atomic64_get_na(&ps->kernel_stats_ ## io.x); \
cur_val = atomic64_get_na(&ps->stats_ ## io->x); \
if (cur_val < ks_val) \
diff_ ## x ## _ ## io = 0; \
else \
diff_ ## x ## _ ## io = (ke)->x - ks_val; \
atomic64_add_na(&ps->stats_ ## io->x, diff_ ## x ## _ ## io); \
diff_ ## x ## _ ## io = cur_val - ks_val; \
RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \
} while (0)
#define DS(x) DS_io(x, ps, &ke->stats_in, in)
#define DSo(x) DS_io(x, sink, stats_o, out)
#define DS(x) DS_io(x, ps, in)
#define DSo(x) DS_io(x, sink, out)
struct intf_rr {
@ -1486,6 +1486,7 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
__re_address_translate_ep(&reti->local, &stream->selected_sfd->socket.local);
reti->iface_stats = stream->selected_sfd->local_intf->stats;
reti->stats = stream->stats_in;
reti->rtcp_mux = MEDIA_ISSET(media, RTCP_MUX);
reti->rtcp = PS_ISSET(stream, RTCP);
reti->dtls = MEDIA_ISSET(media, DTLS);
@ -1622,6 +1623,7 @@ output:
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
redi->output.iface_stats = sink->selected_sfd->local_intf->stats;
redi->output.stats = sink->stats_out;
if (reti->track_ssrc) {
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
@ -3525,16 +3527,16 @@ enum thread_looper_action kernel_stats_updater(void) {
DS(bytes);
DS(errors);
if (ke->stats_in.packets != atomic64_get(&ps->kernel_stats_in.packets)) {
if (diff_packets_in != 0) {
atomic64_set(&ps->last_packet, rtpe_now.tv_sec);
count_stream_stats_kernel(ps);
}
ps->in_tos_tclass = ke->stats_in.tos;
ps->in_tos_tclass = ke->tos;
atomic64_set(&ps->kernel_stats_in.bytes, ke->stats_in.bytes);
atomic64_set(&ps->kernel_stats_in.packets, ke->stats_in.packets);
atomic64_set(&ps->kernel_stats_in.errors, ke->stats_in.errors);
atomic64_set_na(&ps->kernel_stats_in.bytes, atomic64_get_na(&ps->stats_in->bytes));
atomic64_set_na(&ps->kernel_stats_in.packets, atomic64_get_na(&ps->stats_in->packets));
atomic64_set_na(&ps->kernel_stats_in.errors, atomic64_get_na(&ps->stats_in->errors));
uint64_t max_diff = 0;
int max_pt = -1;
@ -3573,15 +3575,14 @@ enum thread_looper_action kernel_stats_updater(void) {
continue;
struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx];
struct rtpengine_stats *stats_o = &ke->stats_out[sh->kernel_output_idx];
DSo(bytes);
DSo(packets);
DSo(errors);
atomic64_set(&sink->kernel_stats_out.bytes, stats_o->bytes);
atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets);
atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors);
atomic64_set_na(&sink->kernel_stats_out.bytes, atomic64_get_na(&sink->stats_out->bytes));
atomic64_set_na(&sink->kernel_stats_out.packets, atomic64_get_na(&sink->stats_out->packets));
atomic64_set_na(&sink->kernel_stats_out.errors, atomic64_get_na(&sink->stats_out->errors));
mutex_lock(&sink->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {


+ 29
- 33
kernel-module/xt_RTPENGINE.c View File

@ -302,12 +302,6 @@ struct re_crypto_context {
const struct re_hmac *hmac;
};
struct rtpengine_stats_a {
atomic64_t packets;
atomic64_t bytes;
atomic64_t errors;
atomic_t tos;
};
struct rtpengine_rtp_stats_a {
atomic64_t packets;
atomic64_t bytes;
@ -316,7 +310,6 @@ struct rtpengine_output {
struct rtpengine_output_info output;
struct re_crypto_context encrypt_rtp;
struct re_crypto_context encrypt_rtcp;
struct rtpengine_stats_a stats_out;
};
struct rtpengine_target {
atomic_t refcnt;
@ -324,7 +317,7 @@ struct rtpengine_target {
struct rtpengine_target_info target;
unsigned int last_pt; // index into pt_input[] and pt_output[]
struct rtpengine_stats_a stats_in;
atomic_t tos;
struct rtpengine_rtp_stats_a rtp_stats[RTPE_NUM_PAYLOAD_TYPES];
spinlock_t ssrc_stats_lock;
struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING];
@ -1476,10 +1469,7 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
memcpy(&opp->target, &g->target, sizeof(opp->target));
opp->stats_in.packets = atomic64_read(&g->stats_in.packets);
opp->stats_in.bytes = atomic64_read(&g->stats_in.bytes);
opp->stats_in.errors = atomic64_read(&g->stats_in.errors);
opp->stats_in.tos = atomic_read(&g->stats_in.tos);
opp->tos = atomic_read(&g->tos);
for (i = 0; i < g->target.num_payload_types; i++) {
opp->rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets);
@ -1499,10 +1489,6 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
spin_lock_irqsave(&o->encrypt_rtp.lock, flags);
opp->outputs[i] = o->output;
spin_unlock_irqrestore(&o->encrypt_rtp.lock, flags);
opp->stats_out[i].packets = atomic64_read(&o->stats_out.packets);
opp->stats_out[i].bytes = atomic64_read(&o->stats_out.bytes);
opp->stats_out[i].errors = atomic64_read(&o->stats_out.errors);
}
}
else
@ -1700,9 +1686,9 @@ static int proc_list_show(struct seq_file *f, void *v) {
if (g->target.src_mismatch > 0 && g->target.src_mismatch <= ARRAY_SIZE(re_msm_strings))
seq_printf(f, " src mismatch action: %s\n", re_msm_strings[g->target.src_mismatch]);
seq_printf(f, " stats: %20llu bytes, %20llu packets, %20llu errors\n",
(unsigned long long) atomic64_read(&g->stats_in.bytes),
(unsigned long long) atomic64_read(&g->stats_in.packets),
(unsigned long long) atomic64_read(&g->stats_in.errors));
(unsigned long long) atomic64_read(&g->target.stats->bytes),
(unsigned long long) atomic64_read(&g->target.stats->packets),
(unsigned long long) atomic64_read(&g->target.stats->errors));
for (i = 0; i < g->target.num_payload_types; i++) {
seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n",
g->target.pt_input[i].pt_num,
@ -1763,9 +1749,9 @@ static int proc_list_show(struct seq_file *f, void *v) {
proc_list_addr_print(f, "dst", &o->output.dst_addr);
seq_printf(f, " stats: %20llu bytes, %20llu packets, %20llu errors\n",
(unsigned long long) atomic64_read(&o->stats_out.bytes),
(unsigned long long) atomic64_read(&o->stats_out.packets),
(unsigned long long) atomic64_read(&o->stats_out.errors));
(unsigned long long) atomic64_read(&o->output.stats->bytes),
(unsigned long long) atomic64_read(&o->output.stats->packets),
(unsigned long long) atomic64_read(&o->output.stats->errors));
seq_printf(f, " SSRC out:");
for (j = 0; j < ARRAY_SIZE(o->output.ssrc_out); j++) {
@ -2425,6 +2411,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
unsigned long flags;
unsigned int u;
struct interface_stats_block *iface_stats;
struct stream_stats *stats;
/* validation */
@ -2448,6 +2435,9 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
iface_stats = shm_map_resolve(i->iface_stats, sizeof(*iface_stats));
if (!iface_stats)
return -EFAULT;
stats = shm_map_resolve(i->stats, sizeof(*stats));
if (!stats)
return -EFAULT;
DBG("Creating new target\n");
@ -2470,6 +2460,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
g->ssrc_stats[u].lost_bits = -1;
rwlock_init(&g->outputs_lock);
g->target.iface_stats = iface_stats;
g->target.stats = stats;
if (i->num_destinations) {
err = -ENOMEM;
@ -2589,6 +2580,7 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des
int err;
struct rtpengine_target *g;
struct interface_stats_block *iface_stats;
struct stream_stats *stats;
// validate input
@ -2604,6 +2596,10 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des
iface_stats = shm_map_resolve(i->output.iface_stats, sizeof(*iface_stats));
if (!iface_stats)
return -EFAULT;
stats = shm_map_resolve(i->output.stats, sizeof(*stats));
if (!stats)
return -EFAULT;
g = get_target(t, &i->local);
if (!g)
@ -2629,6 +2625,7 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des
g->outputs[i->num].output = i->output;
g->outputs[i->num].output.iface_stats = iface_stats;
g->outputs[i->num].output.stats = stats;
// init crypto stuff lock free: the "output" is already filled so we
// know it's there, but outputs_unfilled hasn't been decreased yet, so
@ -5488,7 +5485,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
skb2 = skb_copy_expand(skb, MAX_HEADER, MAX_SKB_TAIL_ROOM, GFP_ATOMIC);
if (!skb2) {
log_err("out of memory while creating skb copy");
atomic64_inc(&g->stats_in.errors);
atomic64_inc(&g->target.stats->errors);
atomic64_inc(&g->target.iface_stats->in.errors);
continue;
}
@ -5505,25 +5502,24 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
err = send_proxy_packet_output(skb2, g, rtp_pt_idx, o, &rtp2, ssrc_idx, par);
if (err) {
atomic64_inc(&g->stats_in.errors);
atomic64_inc(&g->target.stats->errors);
atomic64_inc(&g->target.iface_stats->in.errors);
atomic64_inc(&o->stats_out.errors);
atomic64_inc(&o->output.stats->errors);
atomic64_inc(&o->output.iface_stats->out.errors);
}
else {
atomic64_inc(&o->stats_out.packets);
atomic64_add(datalen_out, &o->stats_out.bytes);
atomic64_inc(&o->output.stats->packets);
atomic64_add(datalen_out, &o->output.stats->bytes);
atomic64_inc(&o->output.iface_stats->out.packets);
atomic64_add(datalen_out, &o->output.iface_stats->out.bytes);
}
}
do_stats:
if (atomic64_read(&g->stats_in.packets)==0)
atomic_set(&g->stats_in.tos,in_tos);
atomic_set(&g->tos, in_tos);
atomic64_inc(&g->stats_in.packets);
atomic64_add(datalen, &g->stats_in.bytes);
atomic64_inc(&g->target.stats->packets);
atomic64_add(datalen, &g->target.stats->bytes);
atomic64_inc(&g->target.iface_stats->in.packets);
atomic64_add(datalen, &g->target.iface_stats->in.bytes);
@ -5534,7 +5530,7 @@ do_stats:
else if (rtp_pt_idx == -2)
/* not RTP */ ;
else if (rtp_pt_idx == -1) {
atomic64_inc(&g->stats_in.errors);
atomic64_inc(&g->target.stats->errors);
atomic64_inc(&g->target.iface_stats->in.errors);
}
@ -5547,7 +5543,7 @@ do_stats:
out_error:
log_err("x_tables action failed: %s", errstr);
atomic64_inc(&g->stats_in.errors);
atomic64_inc(&g->target.stats->errors);
atomic64_inc(&g->target.iface_stats->in.errors);
out:
target_put(g);


+ 3
- 8
kernel-module/xt_RTPENGINE.h View File

@ -15,12 +15,6 @@ struct xt_rtpengine_info {
unsigned int id;
};
struct rtpengine_stats {
uint64_t packets;
uint64_t bytes;
uint64_t errors;
uint8_t tos;
};
struct rtpengine_rtp_stats {
uint64_t packets;
uint64_t bytes;
@ -119,6 +113,7 @@ struct rtpengine_target_info {
unsigned int num_payload_types;
struct interface_stats_block *iface_stats; // for ingress stats
struct stream_stats *stats; // for ingress stats
unsigned int rtcp_mux:1,
dtls:1,
@ -146,6 +141,7 @@ struct rtpengine_output_info {
struct rtpengine_pt_output pt_output[RTPE_NUM_PAYLOAD_TYPES]; // same indexes as pt_input
struct interface_stats_block *iface_stats; // for egress stats
struct stream_stats *stats; // for egress stats
unsigned char tos;
unsigned int ssrc_subst:1;
@ -273,10 +269,9 @@ struct rtpengine_command_send_packet {
struct rtpengine_list_entry {
struct rtpengine_target_info target;
struct rtpengine_stats stats_in;
struct rtpengine_rtp_stats rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; // same index as pt_input
struct rtpengine_output_info outputs[RTPE_MAX_FORWARD_DESTINATIONS];
struct rtpengine_stats stats_out[RTPE_MAX_FORWARD_DESTINATIONS];
int tos;
};


Loading…
Cancel
Save