From dddaa60afb0f3626ca99ad8cb5b9d15aba072734 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 30 Jan 2023 15:41:42 -0500 Subject: [PATCH] MT#55283 simplify/clarify stats gathering Perform accumulation of stats only once (i.e. increasing an actual counter) and report stats based on differences to previous values, instead of carrying multiple stats counters for each metric and resetting each counter to zero whenever stats are reported. `rtpe_stats` is the global master accumulator. `_intv` variables are intermediate and local storage for values sampled from `rtpe_stats` at regular intervals. `_rate` and `_diff` variables hold stats calculated from `rtpe_stats` and the respective `_intv` variable whenever the sampling and reporting occurs. `stats_counters_calc_diff` is used to calculate stats as differences between `rtpe_stats` and the last sampled `_intv` `stats_counters_calc_rate` does the same but calculates a per-second rate, based on a microsecond duration. Eliminate now-useless struct global_stats_ax Change-Id: Ic4ca630161787025219b67e49b41995204d60573 --- daemon/call.c | 12 +++--- daemon/call_interfaces.c | 6 +-- daemon/cli.c | 24 ++++++------ daemon/graphite.c | 60 ++++++++++++++-------------- daemon/statistics.c | 84 ++++++++++++++++++++-------------------- include/aux.h | 20 ++++++++++ include/call.h | 15 ++----- include/statistics.h | 36 +++++++---------- t/test-payload-tracker.c | 8 ++-- 9 files changed, 133 insertions(+), 132 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 29c2c663f..d1cad5641 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -69,11 +69,9 @@ struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative; struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max; struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval; -struct global_stats_ax rtpe_stats; -struct global_stats_counter rtpe_stats_interval; -struct global_stats_counter rtpe_stats_cumulative; -struct global_stats_ax rtpe_stats_graphite; -struct global_stats_counter rtpe_stats_graphite_interval; +struct global_stats_counter rtpe_stats; // total, cumulative, master +static struct global_stats_counter rtpe_stats_intv; // copied out once per timer run +struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run struct global_stats_min_max rtpe_stats_graphite_min_max; struct global_stats_min_max rtpe_stats_graphite_min_max_interval; @@ -578,9 +576,9 @@ void call_timer(void *ptr) { call_timer_iterator(c, &hlp); ITERATE_CALL_LIST_NEXT_END(c); - stats_counters_ax_calc_avg(&rtpe_stats, run_diff_us, &rtpe_stats_interval); + stats_counters_calc_rate(&rtpe_stats, run_diff_us, &rtpe_stats_intv, &rtpe_stats_rate); - stats_counters_min_max(&rtpe_stats_graphite_min_max, &rtpe_stats.intv); + stats_counters_min_max(&rtpe_stats_graphite_min_max, &rtpe_stats_rate); // stats derived while iterating calls RTPE_GAUGE_SET(transcoded_media, hlp.transcoded_media); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 0751394ad..c7f9dc669 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -463,7 +463,7 @@ void calls_status_tcp(struct streambuf_stream *s) { rwlock_lock_r(&rtpe_callhash_lock); streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n", g_hash_table_size(rtpe_callhash), - atomic64_get(&rtpe_stats.intv.bytes_user) + atomic64_get(&rtpe_stats.intv.bytes_kernel), 0, 0); + atomic64_get(&rtpe_stats_rate.bytes_user) + atomic64_get(&rtpe_stats_rate.bytes_kernel), 0, 0); rwlock_unlock_r(&rtpe_callhash_lock); ITERATE_CALL_LIST_START(CALL_ITERATOR_MAIN, c); @@ -1658,8 +1658,8 @@ static enum load_limit_reasons call_offer_session_limit(void) { } if (ret == LOAD_LIMIT_NONE && rtpe_config.bw_limit) { - uint64_t bw = atomic64_get(&rtpe_stats.intv.bytes_user) + - atomic64_get(&rtpe_stats.intv.bytes_kernel); + uint64_t bw = atomic64_get(&rtpe_stats_rate.bytes_user) + + atomic64_get(&rtpe_stats_rate.bytes_kernel); if (bw >= rtpe_config.bw_limit) { ilog(LOG_WARN, "Bandwidth limit exceeded (%" PRIu64 " > %" PRIu64 ")", bw, rtpe_config.bw_limit); diff --git a/daemon/cli.c b/daemon/cli.c index dbba70b34..281ef1d8d 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -450,26 +450,26 @@ static void cli_incoming_params_revert(str *instr, struct cli_writer *cw) { static void cli_incoming_list_counters(str *instr, struct cli_writer *cw) { cw->cw_printf(cw, "\nCurrent per-second counters:\n\n"); cw->cw_printf(cw, " Packets per second (userspace) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.packets_user)); + atomic64_get(&rtpe_stats_rate.packets_user)); cw->cw_printf(cw, " Bytes per second (userspace) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.bytes_user)); + atomic64_get(&rtpe_stats_rate.bytes_user)); cw->cw_printf(cw, " Errors per second (userspace) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.errors_user)); + atomic64_get(&rtpe_stats_rate.errors_user)); cw->cw_printf(cw, " Packets per second (kernel) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.packets_kernel)); + atomic64_get(&rtpe_stats_rate.packets_kernel)); cw->cw_printf(cw, " Bytes per second (kernel) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.bytes_kernel)); + atomic64_get(&rtpe_stats_rate.bytes_kernel)); cw->cw_printf(cw, " Errors per second (kernel) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.errors_kernel)); + atomic64_get(&rtpe_stats_rate.errors_kernel)); cw->cw_printf(cw, " Packets per second (total) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.packets_user) + - atomic64_get(&rtpe_stats.intv.packets_kernel)); + atomic64_get(&rtpe_stats_rate.packets_user) + + atomic64_get(&rtpe_stats_rate.packets_kernel)); cw->cw_printf(cw, " Bytes per second (total) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.bytes_user) + - atomic64_get(&rtpe_stats.intv.bytes_kernel)); + atomic64_get(&rtpe_stats_rate.bytes_user) + + atomic64_get(&rtpe_stats_rate.bytes_kernel)); cw->cw_printf(cw, " Errors per second (total) :%" PRIu64 "\n", - atomic64_get(&rtpe_stats.intv.errors_user) + - atomic64_get(&rtpe_stats.intv.errors_kernel)); + atomic64_get(&rtpe_stats_rate.errors_user) + + atomic64_get(&rtpe_stats_rate.errors_kernel)); } static void cli_incoming_list_totals(str *instr, struct cli_writer *cw) { diff --git a/daemon/graphite.c b/daemon/graphite.c index b502851ea..450ee8fe3 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -33,6 +33,9 @@ static time_t next_run; static char* graphite_prefix = NULL; static struct timeval graphite_interval_tv; +struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases +static struct global_stats_counter rtpe_stats_graphite_intv; // copied out when graphite stats run + void set_graphite_interval_tv(struct timeval *tv) { graphite_interval_tv = *tv; } @@ -78,8 +81,7 @@ static int connect_to_graphite_server(const endpoint_t *graphite_ep) { GString *print_graphite_data(void) { - long long time_diff_us = timeval_diff(&rtpe_now, &rtpe_latest_graphite_interval_start); - stats_counters_ax_calc_avg(&rtpe_stats_graphite, time_diff_us, &rtpe_stats_graphite_interval); + stats_counters_calc_diff(&rtpe_stats, &rtpe_stats_graphite_intv, &rtpe_stats_graphite_diff); stats_counters_min_max_reset(&rtpe_stats_graphite_min_max, &rtpe_stats_graphite_min_max_interval); stats_gauge_calc_avg_reset(&rtpe_stats_gauge_graphite_min_max_interval, &rtpe_stats_gauge_graphite_min_max); @@ -108,19 +110,19 @@ GString *print_graphite_data(void) { (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.ng_command_times[i]) / 1000000.0, (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.avg.ng_command_times[i]) / 1000000.0); - GPF("%s_count %" PRIu64, ng_command_strings[i], atomic64_get(&rtpe_stats_cumulative.ng_commands[i])); + GPF("%s_count %" PRIu64, ng_command_strings[i], atomic64_get(&rtpe_stats.ng_commands[i])); } - GPF("call_dur %.6f", (double) atomic64_get_na(&rtpe_stats_graphite_interval.total_calls_duration_intv) / 1000000.0); + GPF("call_dur %.6f", (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0); struct timeval avg_duration; - uint64_t managed_sess = atomic64_get_na(&rtpe_stats_graphite_interval.managed_sess); + uint64_t managed_sess = atomic64_get_na(&rtpe_stats_graphite_diff.managed_sess); if (managed_sess) - timeval_from_us(&avg_duration, atomic64_get_na(&rtpe_stats_graphite_interval.call_duration) / managed_sess); + timeval_from_us(&avg_duration, atomic64_get_na(&rtpe_stats_graphite_diff.call_duration) / managed_sess); else avg_duration = (struct timeval) {0,0}; GPF("average_call_dur %llu.%06llu",(unsigned long long)avg_duration.tv_sec,(unsigned long long)avg_duration.tv_usec); - GPF("forced_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.forced_term_sess)); - GPF("managed_sess "UINT64F, atomic64_get(&rtpe_stats.ax.managed_sess)); + GPF("forced_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.forced_term_sess)); + GPF("managed_sess "UINT64F, atomic64_get(&rtpe_stats.managed_sess)); GPF("managed_sess_min "UINT64F, atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions)); GPF("managed_sess_max "UINT64F, atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions)); GPF("current_sessions_total "UINT64F, atomic64_get(&rtpe_stats_gauge.total_sessions)); @@ -130,26 +132,26 @@ GString *print_graphite_data(void) { GPF("current_sessions_ipv4 "UINT64F, atomic64_get(&rtpe_stats_gauge.ipv4_sessions)); GPF("current_sessions_ipv6 "UINT64F, atomic64_get(&rtpe_stats_gauge.ipv6_sessions)); GPF("current_sessions_mixed "UINT64F, atomic64_get(&rtpe_stats_gauge.mixed_sessions)); - GPF("nopacket_relayed_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.nopacket_relayed_sess)); - GPF("oneway_stream_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.oneway_stream_sess)); - GPF("regular_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.regular_term_sess)); - GPF("relayed_errors_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.errors_user)); - GPF("relayed_packets_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.packets_user)); - GPF("relayed_bytes_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.bytes_user)); - GPF("relayed_errors_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.errors_kernel)); - GPF("relayed_packets_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.packets_kernel)); - GPF("relayed_bytes_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.bytes_kernel)); - GPF("relayed_errors "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.errors_user) + - atomic64_get_na(&rtpe_stats_graphite_interval.errors_kernel)); - GPF("relayed_packets "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.packets_user) + - atomic64_get_na(&rtpe_stats_graphite_interval.packets_kernel)); - GPF("relayed_bytes "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.bytes_user) + - atomic64_get_na(&rtpe_stats_graphite_interval.bytes_kernel)); - GPF("silent_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.silent_timeout_sess)); - GPF("final_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.final_timeout_sess)); - GPF("offer_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.offer_timeout_sess)); - GPF("timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.timeout_sess)); - GPF("reject_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.rejected_sess)); + GPF("nopacket_relayed_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.nopacket_relayed_sess)); + GPF("oneway_stream_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.oneway_stream_sess)); + GPF("regular_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.regular_term_sess)); + GPF("relayed_errors_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.errors_user)); + GPF("relayed_packets_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.packets_user)); + GPF("relayed_bytes_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_user)); + GPF("relayed_errors_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.errors_kernel)); + GPF("relayed_packets_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.packets_kernel)); + GPF("relayed_bytes_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_kernel)); + GPF("relayed_errors "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.errors_user) + + atomic64_get_na(&rtpe_stats_graphite_diff.errors_kernel)); + GPF("relayed_packets "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.packets_user) + + atomic64_get_na(&rtpe_stats_graphite_diff.packets_kernel)); + GPF("relayed_bytes "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_user) + + atomic64_get_na(&rtpe_stats_graphite_diff.bytes_kernel)); + GPF("silent_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.silent_timeout_sess)); + GPF("final_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.final_timeout_sess)); + GPF("offer_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.offer_timeout_sess)); + GPF("timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.timeout_sess)); + GPF("reject_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.rejected_sess)); for (GList *l = all_local_interfaces.head; l; l = l->next) { struct local_intf *lif = l->data; @@ -193,7 +195,7 @@ GString *print_graphite_data(void) { ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%.6f at time %llu\n", (unsigned long long) atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions), (unsigned long long) atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions), - (double) atomic64_get_na(&rtpe_stats_graphite_interval.total_calls_duration_intv) / 1000000.0, + (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0, (unsigned long long ) rtpe_now.tv_sec); return graph_str; diff --git a/daemon/statistics.c b/daemon/statistics.c index 645ba7e9e..1860ecfda 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -324,26 +324,26 @@ GQueue *statistics_gather_metrics(void) { PROM("transcoded_media", "gauge"); METRIC("packetrate_user", "Packets per second (userspace)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.packets_user)); + atomic64_get(&rtpe_stats_rate.packets_user)); METRIC("byterate_user", "Bytes per second (userspace)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.bytes_user)); + atomic64_get(&rtpe_stats_rate.bytes_user)); METRIC("errorrate_user", "Errors per second (userspace)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.errors_user)); + atomic64_get(&rtpe_stats_rate.errors_user)); METRIC("packetrate_kernel", "Packets per second (kernel)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.packets_kernel)); + atomic64_get(&rtpe_stats_rate.packets_kernel)); METRIC("byterate_kernel", "Bytes per second (kernel)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.bytes_kernel)); + atomic64_get(&rtpe_stats_rate.bytes_kernel)); METRIC("errorrate_kernel", "Errors per second (kernel)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.errors_kernel)); + atomic64_get(&rtpe_stats_rate.errors_kernel)); METRIC("packetrate", "Packets per second (total)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.packets_user) + - atomic64_get(&rtpe_stats.intv.packets_kernel)); + atomic64_get(&rtpe_stats_rate.packets_user) + + atomic64_get(&rtpe_stats_rate.packets_kernel)); METRIC("byterate", "Bytes per second (total)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.bytes_user) + - atomic64_get(&rtpe_stats.intv.bytes_kernel)); + atomic64_get(&rtpe_stats_rate.bytes_user) + + atomic64_get(&rtpe_stats_rate.bytes_kernel)); METRIC("errorrate", "Errors per second (total)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats.intv.errors_user) + - atomic64_get(&rtpe_stats.intv.errors_kernel)); + atomic64_get(&rtpe_stats_rate.errors_user) + + atomic64_get(&rtpe_stats_rate.errors_kernel)); METRIC("media_userspace", "Userspace-only media streams", UINT64F, UINT64F, atomic64_get(&rtpe_stats_gauge.userspace_streams)); @@ -360,8 +360,8 @@ GQueue *statistics_gather_metrics(void) { PROM("mediastreams", "gauge"); PROMLAB("type=\"mixed\""); - num_sessions = atomic64_get(&rtpe_stats_cumulative.managed_sess); - uint64_t total_duration = atomic64_get(&rtpe_stats_cumulative.call_duration); + num_sessions = atomic64_get(&rtpe_stats.managed_sess); + uint64_t total_duration = atomic64_get(&rtpe_stats.call_duration); uint64_t avg_us = num_sessions ? total_duration / num_sessions : 0; HEADER("}", ""); @@ -373,67 +373,67 @@ GQueue *statistics_gather_metrics(void) { METRIC("managedsessions", "Total managed sessions", UINT64F, UINT64F, num_sessions); PROM("sessions_total", "counter"); - METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.rejected_sess)); + METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.rejected_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"rejected\""); - METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.timeout_sess)); + METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get(&rtpe_stats.timeout_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"timeout\""); - METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.silent_timeout_sess)); + METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats.silent_timeout_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"silent_timeout\""); - METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.final_timeout_sess)); + METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats.final_timeout_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"final_timeout\""); - METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.offer_timeout_sess)); + METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats.offer_timeout_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"offer_timeout\""); - METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.regular_term_sess)); + METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.regular_term_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"terminated\""); - METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.forced_term_sess)); + METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.forced_term_sess)); PROM("closed_sessions_total", "counter"); PROMLAB("reason=\"force_terminated\""); METRIC("relayedpackets_user", "Total relayed packets (userspace)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.packets_user)); + atomic64_get(&rtpe_stats.packets_user)); PROM("packets_total", "counter"); PROMLAB("type=\"userspace\""); METRIC("relayedpacketerrors_user", "Total relayed packet errors (userspace)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.errors_user)); + atomic64_get(&rtpe_stats.errors_user)); PROM("packet_errors_total", "counter"); PROMLAB("type=\"userspace\""); METRIC("relayedbytes_user", "Total relayed bytes (userspace)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.bytes_user)); + atomic64_get(&rtpe_stats.bytes_user)); PROM("bytes_total", "counter"); PROMLAB("type=\"userspace\""); METRIC("relayedpackets_kernel", "Total relayed packets (kernel)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.packets_kernel)); + atomic64_get(&rtpe_stats.packets_kernel)); PROM("packets_total", "counter"); PROMLAB("type=\"kernel\""); METRIC("relayedpacketerrors_kernel", "Total relayed packet errors (kernel)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.errors_kernel)); + atomic64_get(&rtpe_stats.errors_kernel)); PROM("packet_errors_total", "counter"); PROMLAB("type=\"kernel\""); METRIC("relayedbytes_kernel", "Total relayed bytes (kernel)", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.bytes_kernel)); + atomic64_get(&rtpe_stats.bytes_kernel)); PROM("bytes_total", "counter"); PROMLAB("type=\"kernel\""); METRIC("relayedpackets", "Total relayed packets", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.packets_kernel) + - atomic64_get(&rtpe_stats_cumulative.packets_user)); + atomic64_get(&rtpe_stats.packets_kernel) + + atomic64_get(&rtpe_stats.packets_user)); METRIC("relayedpacketerrors", "Total relayed packet errors", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.errors_kernel) + - atomic64_get(&rtpe_stats_cumulative.errors_user)); + atomic64_get(&rtpe_stats.errors_kernel) + + atomic64_get(&rtpe_stats.errors_user)); METRIC("relayedbytes", "Total relayed bytes", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.bytes_kernel) + - atomic64_get(&rtpe_stats_cumulative.bytes_user)); + atomic64_get(&rtpe_stats.bytes_kernel) + + atomic64_get(&rtpe_stats.bytes_user)); - METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.nopacket_relayed_sess)); + METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_stats.nopacket_relayed_sess)); PROM("zero_packet_streams_total", "counter"); - METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.oneway_stream_sess)); + METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get(&rtpe_stats.oneway_stream_sess)); PROM("one_way_sessions_total", "counter"); METRICva("avgcallduration", "Average call duration", "%.6f", "%.6f seconds", (double) avg_us / 1000000.0); PROM("call_duration_avg", "gauge"); @@ -441,14 +441,14 @@ GQueue *statistics_gather_metrics(void) { METRICva("totalcallsduration", "Total calls duration", "%.6f", "%.6f seconds", (double) total_duration / 1000000.0); PROM("call_duration_total", "counter"); - total_duration = atomic64_get(&rtpe_stats_cumulative.call_duration2); + total_duration = atomic64_get(&rtpe_stats.call_duration2); METRICva("totalcallsduration2", "Total calls duration squared", "%.6f", "%.6f seconds squared", (double) total_duration / 1000000.0); PROM("call_duration2_total", "counter"); double variance = num_sessions ? fabs((double) total_duration / (double) num_sessions - ((double) avg_us / 1000.0) * ((double) avg_us / 1000.0)) : 0.0; METRICva("totalcallsduration_stddev", "Total calls duration standard deviation", "%.6f", "%.6f seconds", sqrt(variance) / 1000.0); - calls_dur_iv = (double) atomic64_get_na(&rtpe_stats_graphite_interval.total_calls_duration_intv) / 1000000.0; + calls_dur_iv = (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0; min_sess_iv = atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions); max_sess_iv = atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions); @@ -530,19 +530,19 @@ GQueue *statistics_gather_metrics(void) { STAT_GET_PRINT(packetloss, "packet loss", 1.0); STAT_GET_PRINT(jitter_measured, "jitter (measured)", 1.0); METRIC("packets_lost", "Packets lost", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.packets_lost)); + atomic64_get(&rtpe_stats.packets_lost)); PROM("packets_lost", "counter"); METRIC("rtp_duplicates", "Duplicate RTP packets", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.rtp_duplicates)); + atomic64_get(&rtpe_stats.rtp_duplicates)); PROM("rtp_duplicates", "counter"); METRIC("rtp_skips", "RTP sequence skips", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.rtp_skips)); + atomic64_get(&rtpe_stats.rtp_skips)); PROM("rtp_skips", "counter"); METRIC("rtp_seq_resets", "RTP sequence resets", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.rtp_seq_resets)); + atomic64_get(&rtpe_stats.rtp_seq_resets)); PROM("rtp_seq_resets", "counter"); METRIC("rtp_reordered", "Out-of-order RTP packets", UINT64F, UINT64F, - atomic64_get(&rtpe_stats_cumulative.rtp_reordered)); + atomic64_get(&rtpe_stats.rtp_reordered)); PROM("rtp_reordered", "counter"); HEADER(NULL, ""); HEADER("}", ""); diff --git a/include/aux.h b/include/aux.h index 37cff0341..30b5b752e 100644 --- a/include/aux.h +++ b/include/aux.h @@ -523,6 +523,26 @@ INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) { } while (1); \ } while (0) +INLINE void atomic64_calc_rate(const atomic64 *ax_var, long long run_diff_us, + atomic64 *intv_var, atomic64 *rate_var) +{ + uint64_t ax = atomic64_get(ax_var); + uint64_t old_intv = atomic64_get(intv_var); + atomic64_set(intv_var, ax); + atomic64_set(rate_var, (ax - old_intv) * 1000000LL / run_diff_us); +} +INLINE void atomic64_calc_diff(const atomic64 *ax_var, atomic64 *intv_var, atomic64 *diff_var) { + uint64_t ax = atomic64_get(ax_var); + uint64_t old_intv = atomic64_get(intv_var); + atomic64_set(intv_var, ax); + atomic64_set(diff_var, ax - old_intv); +} +INLINE void atomic64_mina(atomic64 *min, atomic64 *inp) { + atomic64_min(min, atomic64_get(inp)); +} +INLINE void atomic64_maxa(atomic64 *max, atomic64 *inp) { + atomic64_max(max, atomic64_get(inp)); +} diff --git a/include/call.h b/include/call.h index 94ebcc1ca..6ef3d7866 100644 --- a/include/call.h +++ b/include/call.h @@ -704,20 +704,13 @@ extern struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_inter #define RTPE_GAUGE_INC(field) RTPE_GAUGE_ADD(field, 1) #define RTPE_GAUGE_DEC(field) RTPE_GAUGE_ADD(field, -1) -extern struct global_stats_ax rtpe_stats; -extern struct global_stats_counter rtpe_stats_interval; // accumulators copied out once per interval -extern struct global_stats_counter rtpe_stats_cumulative; // total, cumulative -extern struct global_stats_ax rtpe_stats_graphite; -extern struct global_stats_counter rtpe_stats_graphite_interval; // copied out when graphite stats run +extern struct global_stats_counter rtpe_stats; // total, cumulative, master +extern struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run +extern struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases extern struct global_stats_min_max rtpe_stats_graphite_min_max; // running min/max extern struct global_stats_min_max rtpe_stats_graphite_min_max_interval; // updated once per graphite run -#define RTPE_STATS_ADD(field, num) \ - do { \ - atomic64_add(&rtpe_stats.ax.field, num); \ - atomic64_add(&rtpe_stats_cumulative.field, num); \ - atomic64_add(&rtpe_stats_graphite.ax.field, num); \ - } while (0) +#define RTPE_STATS_ADD(field, num) atomic64_add(&rtpe_stats.field, num) #define RTPE_STATS_INC(field) RTPE_STATS_ADD(field, 1) diff --git a/include/statistics.h b/include/statistics.h index fb1a19fa0..05d8f7c3f 100644 --- a/include/statistics.h +++ b/include/statistics.h @@ -55,10 +55,6 @@ struct global_stats_counter { #undef FA }; -struct global_stats_ax { - struct global_stats_counter ax; // running accumulator - struct global_stats_counter intv; // last per-interval values -}; struct global_stats_min_max { struct global_stats_counter min; struct global_stats_counter max; @@ -126,37 +122,31 @@ GQueue *statistics_gather_metrics(void); void statistics_free_metrics(GQueue **); const char *statistics_ng(bencode_item_t *input, bencode_item_t *output); -INLINE void stats_counters_ax_calc_avg1(atomic64 *ax_var, atomic64 *intv_var, atomic64 *loc_var, - long long run_diff_us) -{ - uint64_t tmp = atomic64_get_set(ax_var, 0); - if (loc_var) - atomic64_set(loc_var, tmp); - atomic64_set(intv_var, tmp * 1000000LL / run_diff_us); -} - -INLINE void stats_counters_ax_calc_avg(struct global_stats_ax *stats, long long run_diff_us, - struct global_stats_counter *loc) +INLINE void stats_counters_calc_rate(const struct global_stats_counter *stats, long long run_diff_us, + struct global_stats_counter *intv, struct global_stats_counter *rate) { if (run_diff_us <= 0) return; -#define F(x) stats_counters_ax_calc_avg1(&stats->ax.x, &stats->intv.x, loc ? &loc->x : NULL, run_diff_us); +#define F(x) atomic64_calc_rate(&stats->x, run_diff_us, &intv->x, &rate->x); #define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } #include "counter_stats_fields.inc" #undef F } -INLINE void stats_counters_min1(atomic64 *min, atomic64 *inp) { - atomic64_min(min, atomic64_get(inp)); -} -INLINE void stats_counters_max1(atomic64 *max, atomic64 *inp) { - atomic64_max(max, atomic64_get(inp)); +INLINE void stats_counters_calc_diff(const struct global_stats_counter *stats, + struct global_stats_counter *intv, struct global_stats_counter *diff) +{ +#define F(x) atomic64_calc_diff(&stats->x, &intv->x, &diff->x); +#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } +#include "counter_stats_fields.inc" +#undef F } + INLINE void stats_counters_min_max(struct global_stats_min_max *mm, struct global_stats_counter *inp) { #define F(x) \ - stats_counters_min1(&mm->min.x, &inp->x); \ - stats_counters_max1(&mm->max.x, &inp->x); \ + atomic64_mina(&mm->min.x, &inp->x); \ + atomic64_maxa(&mm->max.x, &inp->x); \ atomic64_add(&mm->avg.x, atomic64_get(&inp->x)); #include "counter_stats_fields.inc" #undef F diff --git a/t/test-payload-tracker.c b/t/test-payload-tracker.c index 00c794f52..c86a501cc 100644 --- a/t/test-payload-tracker.c +++ b/t/test-payload-tracker.c @@ -10,11 +10,9 @@ struct global_stats_gauge rtpe_stats_gauge; struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative; struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max; struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval; -struct global_stats_ax rtpe_stats; -struct global_stats_counter rtpe_stats_interval; -struct global_stats_counter rtpe_stats_cumulative; -struct global_stats_ax rtpe_stats_graphite; -struct global_stats_counter rtpe_stats_graphite_interval; +struct global_stats_counter rtpe_stats; +struct global_stats_counter rtpe_stats_rate; +struct global_stats_counter rtpe_stats_graphite_diff; struct global_stats_min_max rtpe_stats_graphite_min_max; struct global_stats_min_max rtpe_stats_graphite_min_max_interval;