diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 351754d34..02f21da76 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -45,19 +45,19 @@ #define MAX_RECV_LOOP_STRIKES 5 #endif -#define DS_io(x, ps, ke, io) do { \ - uint64_t ks_val; \ - ks_val = atomic64_get(&ps->kernel_stats_ ## io.x); \ - if ((ke)->x < ks_val) \ +#define DS_io(x, ps, io) do { \ + uint64_t ks_val, cur_val; \ + ks_val = atomic64_get_na(&ps->kernel_stats_ ## io.x); \ + cur_val = atomic64_get_na(&ps->stats_ ## io->x); \ + if (cur_val < ks_val) \ diff_ ## x ## _ ## io = 0; \ else \ - diff_ ## x ## _ ## io = (ke)->x - ks_val; \ - atomic64_add_na(&ps->stats_ ## io->x, diff_ ## x ## _ ## io); \ + diff_ ## x ## _ ## io = cur_val - ks_val; \ RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \ } while (0) -#define DS(x) DS_io(x, ps, &ke->stats_in, in) -#define DSo(x) DS_io(x, sink, stats_o, out) +#define DS(x) DS_io(x, ps, in) +#define DSo(x) DS_io(x, sink, out) struct intf_rr { @@ -1486,6 +1486,7 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out __re_address_translate_ep(&reti->local, &stream->selected_sfd->socket.local); reti->iface_stats = stream->selected_sfd->local_intf->stats; + reti->stats = stream->stats_in; reti->rtcp_mux = MEDIA_ISSET(media, RTCP_MUX); reti->rtcp = PS_ISSET(stream, RTCP); reti->dtls = MEDIA_ISSET(media, DTLS); @@ -1622,6 +1623,7 @@ output: __re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint); __re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local); redi->output.iface_stats = sink->selected_sfd->local_intf->stats; + redi->output.stats = sink->stats_out; if (reti->track_ssrc) { for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { @@ -3525,16 +3527,16 @@ enum thread_looper_action kernel_stats_updater(void) { DS(bytes); DS(errors); - if (ke->stats_in.packets != atomic64_get(&ps->kernel_stats_in.packets)) { + if (diff_packets_in != 0) { atomic64_set(&ps->last_packet, rtpe_now.tv_sec); count_stream_stats_kernel(ps); } - ps->in_tos_tclass = ke->stats_in.tos; + ps->in_tos_tclass = ke->tos; - atomic64_set(&ps->kernel_stats_in.bytes, ke->stats_in.bytes); - atomic64_set(&ps->kernel_stats_in.packets, ke->stats_in.packets); - atomic64_set(&ps->kernel_stats_in.errors, ke->stats_in.errors); + atomic64_set_na(&ps->kernel_stats_in.bytes, atomic64_get_na(&ps->stats_in->bytes)); + atomic64_set_na(&ps->kernel_stats_in.packets, atomic64_get_na(&ps->stats_in->packets)); + atomic64_set_na(&ps->kernel_stats_in.errors, atomic64_get_na(&ps->stats_in->errors)); uint64_t max_diff = 0; int max_pt = -1; @@ -3573,15 +3575,14 @@ enum thread_looper_action kernel_stats_updater(void) { continue; struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx]; - struct rtpengine_stats *stats_o = &ke->stats_out[sh->kernel_output_idx]; DSo(bytes); DSo(packets); DSo(errors); - atomic64_set(&sink->kernel_stats_out.bytes, stats_o->bytes); - atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets); - atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors); + atomic64_set_na(&sink->kernel_stats_out.bytes, atomic64_get_na(&sink->stats_out->bytes)); + atomic64_set_na(&sink->kernel_stats_out.packets, atomic64_get_na(&sink->stats_out->packets)); + atomic64_set_na(&sink->kernel_stats_out.errors, atomic64_get_na(&sink->stats_out->errors)); mutex_lock(&sink->out_lock); for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index 3f5d2abf8..3abf4a944 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -302,12 +302,6 @@ struct re_crypto_context { const struct re_hmac *hmac; }; -struct rtpengine_stats_a { - atomic64_t packets; - atomic64_t bytes; - atomic64_t errors; - atomic_t tos; -}; struct rtpengine_rtp_stats_a { atomic64_t packets; atomic64_t bytes; @@ -316,7 +310,6 @@ struct rtpengine_output { struct rtpengine_output_info output; struct re_crypto_context encrypt_rtp; struct re_crypto_context encrypt_rtcp; - struct rtpengine_stats_a stats_out; }; struct rtpengine_target { atomic_t refcnt; @@ -324,7 +317,7 @@ struct rtpengine_target { struct rtpengine_target_info target; unsigned int last_pt; // index into pt_input[] and pt_output[] - struct rtpengine_stats_a stats_in; + atomic_t tos; struct rtpengine_rtp_stats_a rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; spinlock_t ssrc_stats_lock; struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING]; @@ -1476,10 +1469,7 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t memcpy(&opp->target, &g->target, sizeof(opp->target)); - opp->stats_in.packets = atomic64_read(&g->stats_in.packets); - opp->stats_in.bytes = atomic64_read(&g->stats_in.bytes); - opp->stats_in.errors = atomic64_read(&g->stats_in.errors); - opp->stats_in.tos = atomic_read(&g->stats_in.tos); + opp->tos = atomic_read(&g->tos); for (i = 0; i < g->target.num_payload_types; i++) { opp->rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets); @@ -1499,10 +1489,6 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t spin_lock_irqsave(&o->encrypt_rtp.lock, flags); opp->outputs[i] = o->output; spin_unlock_irqrestore(&o->encrypt_rtp.lock, flags); - - opp->stats_out[i].packets = atomic64_read(&o->stats_out.packets); - opp->stats_out[i].bytes = atomic64_read(&o->stats_out.bytes); - opp->stats_out[i].errors = atomic64_read(&o->stats_out.errors); } } else @@ -1700,9 +1686,9 @@ static int proc_list_show(struct seq_file *f, void *v) { if (g->target.src_mismatch > 0 && g->target.src_mismatch <= ARRAY_SIZE(re_msm_strings)) seq_printf(f, " src mismatch action: %s\n", re_msm_strings[g->target.src_mismatch]); seq_printf(f, " stats: %20llu bytes, %20llu packets, %20llu errors\n", - (unsigned long long) atomic64_read(&g->stats_in.bytes), - (unsigned long long) atomic64_read(&g->stats_in.packets), - (unsigned long long) atomic64_read(&g->stats_in.errors)); + (unsigned long long) atomic64_read(&g->target.stats->bytes), + (unsigned long long) atomic64_read(&g->target.stats->packets), + (unsigned long long) atomic64_read(&g->target.stats->errors)); for (i = 0; i < g->target.num_payload_types; i++) { seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n", g->target.pt_input[i].pt_num, @@ -1763,9 +1749,9 @@ static int proc_list_show(struct seq_file *f, void *v) { proc_list_addr_print(f, "dst", &o->output.dst_addr); seq_printf(f, " stats: %20llu bytes, %20llu packets, %20llu errors\n", - (unsigned long long) atomic64_read(&o->stats_out.bytes), - (unsigned long long) atomic64_read(&o->stats_out.packets), - (unsigned long long) atomic64_read(&o->stats_out.errors)); + (unsigned long long) atomic64_read(&o->output.stats->bytes), + (unsigned long long) atomic64_read(&o->output.stats->packets), + (unsigned long long) atomic64_read(&o->output.stats->errors)); seq_printf(f, " SSRC out:"); for (j = 0; j < ARRAY_SIZE(o->output.ssrc_out); j++) { @@ -2425,6 +2411,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i unsigned long flags; unsigned int u; struct interface_stats_block *iface_stats; + struct stream_stats *stats; /* validation */ @@ -2448,6 +2435,9 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i iface_stats = shm_map_resolve(i->iface_stats, sizeof(*iface_stats)); if (!iface_stats) return -EFAULT; + stats = shm_map_resolve(i->stats, sizeof(*stats)); + if (!stats) + return -EFAULT; DBG("Creating new target\n"); @@ -2470,6 +2460,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i g->ssrc_stats[u].lost_bits = -1; rwlock_init(&g->outputs_lock); g->target.iface_stats = iface_stats; + g->target.stats = stats; if (i->num_destinations) { err = -ENOMEM; @@ -2589,6 +2580,7 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des int err; struct rtpengine_target *g; struct interface_stats_block *iface_stats; + struct stream_stats *stats; // validate input @@ -2604,6 +2596,10 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des iface_stats = shm_map_resolve(i->output.iface_stats, sizeof(*iface_stats)); if (!iface_stats) return -EFAULT; + stats = shm_map_resolve(i->output.stats, sizeof(*stats)); + if (!stats) + return -EFAULT; + g = get_target(t, &i->local); if (!g) @@ -2629,6 +2625,7 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des g->outputs[i->num].output = i->output; g->outputs[i->num].output.iface_stats = iface_stats; + g->outputs[i->num].output.stats = stats; // init crypto stuff lock free: the "output" is already filled so we // know it's there, but outputs_unfilled hasn't been decreased yet, so @@ -5488,7 +5485,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb, skb2 = skb_copy_expand(skb, MAX_HEADER, MAX_SKB_TAIL_ROOM, GFP_ATOMIC); if (!skb2) { log_err("out of memory while creating skb copy"); - atomic64_inc(&g->stats_in.errors); + atomic64_inc(&g->target.stats->errors); atomic64_inc(&g->target.iface_stats->in.errors); continue; } @@ -5505,25 +5502,24 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb, err = send_proxy_packet_output(skb2, g, rtp_pt_idx, o, &rtp2, ssrc_idx, par); if (err) { - atomic64_inc(&g->stats_in.errors); + atomic64_inc(&g->target.stats->errors); atomic64_inc(&g->target.iface_stats->in.errors); - atomic64_inc(&o->stats_out.errors); + atomic64_inc(&o->output.stats->errors); atomic64_inc(&o->output.iface_stats->out.errors); } else { - atomic64_inc(&o->stats_out.packets); - atomic64_add(datalen_out, &o->stats_out.bytes); + atomic64_inc(&o->output.stats->packets); + atomic64_add(datalen_out, &o->output.stats->bytes); atomic64_inc(&o->output.iface_stats->out.packets); atomic64_add(datalen_out, &o->output.iface_stats->out.bytes); } } do_stats: - if (atomic64_read(&g->stats_in.packets)==0) - atomic_set(&g->stats_in.tos,in_tos); + atomic_set(&g->tos, in_tos); - atomic64_inc(&g->stats_in.packets); - atomic64_add(datalen, &g->stats_in.bytes); + atomic64_inc(&g->target.stats->packets); + atomic64_add(datalen, &g->target.stats->bytes); atomic64_inc(&g->target.iface_stats->in.packets); atomic64_add(datalen, &g->target.iface_stats->in.bytes); @@ -5534,7 +5530,7 @@ do_stats: else if (rtp_pt_idx == -2) /* not RTP */ ; else if (rtp_pt_idx == -1) { - atomic64_inc(&g->stats_in.errors); + atomic64_inc(&g->target.stats->errors); atomic64_inc(&g->target.iface_stats->in.errors); } @@ -5547,7 +5543,7 @@ do_stats: out_error: log_err("x_tables action failed: %s", errstr); - atomic64_inc(&g->stats_in.errors); + atomic64_inc(&g->target.stats->errors); atomic64_inc(&g->target.iface_stats->in.errors); out: target_put(g); diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 9931c1384..c2f2f2d80 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -15,12 +15,6 @@ struct xt_rtpengine_info { unsigned int id; }; -struct rtpengine_stats { - uint64_t packets; - uint64_t bytes; - uint64_t errors; - uint8_t tos; -}; struct rtpengine_rtp_stats { uint64_t packets; uint64_t bytes; @@ -119,6 +113,7 @@ struct rtpengine_target_info { unsigned int num_payload_types; struct interface_stats_block *iface_stats; // for ingress stats + struct stream_stats *stats; // for ingress stats unsigned int rtcp_mux:1, dtls:1, @@ -146,6 +141,7 @@ struct rtpengine_output_info { struct rtpengine_pt_output pt_output[RTPE_NUM_PAYLOAD_TYPES]; // same indexes as pt_input struct interface_stats_block *iface_stats; // for egress stats + struct stream_stats *stats; // for egress stats unsigned char tos; unsigned int ssrc_subst:1; @@ -273,10 +269,9 @@ struct rtpengine_command_send_packet { struct rtpengine_list_entry { struct rtpengine_target_info target; - struct rtpengine_stats stats_in; struct rtpengine_rtp_stats rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; // same index as pt_input struct rtpengine_output_info outputs[RTPE_MAX_FORWARD_DESTINATIONS]; - struct rtpengine_stats stats_out[RTPE_MAX_FORWARD_DESTINATIONS]; + int tos; };