diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 118ed4758..3a297b5c8 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -350,7 +350,7 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, // update interval statistics RTPE_STATS_INC(ng_commands[command]); - RTPE_GAUGE_SET(ng_command_times[command], timeval_us(&cmd_process_time)); + RTPE_STATS_SAMPLE(ng_command_times[command], timeval_us(&cmd_process_time)); goto send_resp; diff --git a/daemon/graphite.c b/daemon/graphite.c index edd7083ae..8eb1e6b3c 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -36,13 +36,18 @@ static struct timeval graphite_interval_tv; struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases static struct global_stats_counter rtpe_stats_graphite_intv; // copied out when graphite stats run - -struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max; -struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval; +struct global_gauge_min_max rtpe_gauge_graphite_min_max; +struct global_gauge_min_max rtpe_gauge_graphite_min_max_sampled; struct global_rate_min_max rtpe_rate_graphite_min_max; struct global_rate_min_max_avg rtpe_rate_graphite_min_max_avg_sampled; +struct global_sampled_min_max rtpe_sampled_graphite_min_max; +struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; +static struct global_stats_sampled rtpe_sampled_graphite_min_max_diff; +static struct global_stats_sampled rtpe_sampled_graphite_min_max_intv; +struct global_sampled_avg rtpe_sampled_graphite_avg; + void set_graphite_interval_tv(struct timeval *tv) { graphite_interval_tv = *tv; @@ -93,8 +98,16 @@ GString *print_graphite_data(void) { rtpe_latest_graphite_interval_start = rtpe_now; stats_counters_calc_diff(&rtpe_stats, &rtpe_stats_graphite_intv, &rtpe_stats_graphite_diff); - stats_rate_min_max_avg_sample(&rtpe_rate_graphite_min_max, &rtpe_rate_graphite_min_max_avg_sampled, time_diff_us, &rtpe_stats_graphite_diff); - stats_gauge_calc_avg_reset(&rtpe_stats_gauge_graphite_min_max_interval, &rtpe_stats_gauge_graphite_min_max); + stats_rate_min_max_avg_sample(&rtpe_rate_graphite_min_max, &rtpe_rate_graphite_min_max_avg_sampled, + time_diff_us, &rtpe_stats_graphite_diff); + + stats_gauge_min_max_sample(&rtpe_gauge_graphite_min_max_sampled, &rtpe_gauge_graphite_min_max, + &rtpe_stats_gauge); + + stats_sampled_calc_diff(&rtpe_stats_sampled, &rtpe_sampled_graphite_min_max_intv, + &rtpe_sampled_graphite_min_max_diff); + stats_sampled_min_max_sample(&rtpe_sampled_graphite_min_max, &rtpe_sampled_graphite_min_max_sampled); + stats_sampled_avg(&rtpe_sampled_graphite_avg, &rtpe_sampled_graphite_min_max_diff); GString *graph_str = g_string_new(""); @@ -105,11 +118,11 @@ GString *print_graphite_data(void) { for (int i = 0; i < NGC_COUNT; i++) { GPF("%s_time_min %.6f", ng_command_strings[i], - (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.min.ng_command_times[i]) / 1000000.0); + (double) atomic64_get(&rtpe_sampled_graphite_min_max_sampled.min.ng_command_times[i]) / 1000000.0); GPF("%s_time_max %.6f", ng_command_strings[i], - (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.ng_command_times[i]) / 1000000.0); + (double) atomic64_get(&rtpe_sampled_graphite_min_max_sampled.max.ng_command_times[i]) / 1000000.0); GPF("%s_time_avg %.6f", ng_command_strings[i], - (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.avg.ng_command_times[i]) / 1000000.0); + (double) atomic64_get(&rtpe_sampled_graphite_avg.avg.ng_command_times[i]) / 1000000.0); GPF("%ss_ps_min " UINT64F, ng_command_strings[i], atomic64_get(&rtpe_rate_graphite_min_max_avg_sampled.min.ng_commands[i])); GPF("%ss_ps_max " UINT64F, ng_command_strings[i], atomic64_get(&rtpe_rate_graphite_min_max_avg_sampled.max.ng_commands[i])); @@ -117,9 +130,9 @@ GString *print_graphite_data(void) { ilog(LOG_DEBUG, "Min/Max/Avg %s processing delay: %.6f/%.6f/%.6f sec", ng_command_strings[i], - (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.min.ng_command_times[i]) / 1000000.0, - (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.ng_command_times[i]) / 1000000.0, - (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.avg.ng_command_times[i]) / 1000000.0); + (double) atomic64_get(&rtpe_sampled_graphite_min_max_sampled.min.ng_command_times[i]) / 1000000.0, + (double) atomic64_get(&rtpe_sampled_graphite_min_max_sampled.max.ng_command_times[i]) / 1000000.0, + (double) atomic64_get(&rtpe_sampled_graphite_avg.avg.ng_command_times[i]) / 1000000.0); GPF("%s_count %" PRIu64, ng_command_strings[i], atomic64_get(&rtpe_stats.ng_commands[i])); } @@ -134,8 +147,8 @@ GString *print_graphite_data(void) { GPF("average_call_dur %llu.%06llu",(unsigned long long)avg_duration.tv_sec,(unsigned long long)avg_duration.tv_usec); GPF("forced_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.forced_term_sess)); GPF("managed_sess "UINT64F, atomic64_get(&rtpe_stats.managed_sess)); - GPF("managed_sess_min "UINT64F, atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions)); - GPF("managed_sess_max "UINT64F, atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions)); + GPF("managed_sess_min "UINT64F, atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.min.total_sessions)); + GPF("managed_sess_max "UINT64F, atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.max.total_sessions)); GPF("current_sessions_total "UINT64F, atomic64_get(&rtpe_stats_gauge.total_sessions)); GPF("current_sessions_own "UINT64F, atomic64_get(&rtpe_stats_gauge.total_sessions) - atomic64_get(&rtpe_stats_gauge.foreign_sessions)); GPF("current_sessions_foreign "UINT64F, atomic64_get(&rtpe_stats_gauge.foreign_sessions)); @@ -204,8 +217,8 @@ GString *print_graphite_data(void) { ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%.6f at time %llu\n", - (unsigned long long) atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions), - (unsigned long long) atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions), + (unsigned long long) atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.min.total_sessions), + (unsigned long long) atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.max.total_sessions), (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0, (unsigned long long ) rtpe_now.tv_sec); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 788763522..1c40cc5b8 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -396,17 +396,17 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor .packetloss = (unsigned int) rr->fraction_lost * 100 / 256, }; - RTPE_GAUGE_SET(jitter, jitter); - RTPE_GAUGE_SET(rtt_e2e, rtt_end2end); - RTPE_GAUGE_SET(rtt_dsct, rtt); - RTPE_GAUGE_SET(packetloss, ssb->packetloss); + RTPE_STATS_SAMPLE(jitter, jitter); + RTPE_STATS_SAMPLE(rtt_e2e, rtt_end2end); + RTPE_STATS_SAMPLE(rtt_dsct, rtt); + RTPE_STATS_SAMPLE(packetloss, ssb->packetloss); other_e->packets_lost = rr->packets_lost; mos_calc(ssb); if (ssb->mos) { ilog(LOG_DEBUG, "Calculated MOS from RR for %s%x%s is %.1f", FMT_M(rr->from), (double) ssb->mos / 10.0); - RTPE_GAUGE_SET(mos, ssb->mos); + RTPE_STATS_SAMPLE(mos, ssb->mos); } // got a new stats block, add it to reporting ssrc @@ -623,6 +623,6 @@ void ssrc_collect_metrics(struct call_media *media) { e->jitter = e->jitter * 1000 / rpt->clock_rate; } - RTPE_GAUGE_SET(jitter_measured, e->jitter); + RTPE_STATS_SAMPLE(jitter_measured, e->jitter); } } diff --git a/daemon/statistics.c b/daemon/statistics.c index f5b4c6428..14afacf51 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -14,8 +14,11 @@ mutex_t rtpe_codec_stats_lock; GHashTable *rtpe_codec_stats; -struct global_stats_gauge rtpe_stats_gauge; -struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative; +struct global_stats_gauge rtpe_stats_gauge; // master values +struct global_gauge_min_max rtpe_gauge_min_max; // master lifetime min/max + +struct global_stats_sampled rtpe_stats_sampled; // master cumulative values +struct global_sampled_min_max rtpe_sampled_min_max; // master lifetime min/max struct global_stats_counter rtpe_stats; // total, cumulative, master struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run @@ -456,8 +459,8 @@ GQueue *statistics_gather_metrics(void) { METRICva("totalcallsduration_stddev", "Total calls duration standard deviation", "%.6f", "%.6f seconds", sqrt(variance) / 1000.0); calls_dur_iv = (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0; - min_sess_iv = atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions); - max_sess_iv = atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions); + min_sess_iv = atomic64_get(&rtpe_gauge_graphite_min_max_sampled.min.total_sessions); + max_sess_iv = atomic64_get(&rtpe_gauge_graphite_min_max_sampled.max.total_sessions); HEADER(NULL, ""); HEADER("}", ""); @@ -471,9 +474,9 @@ GQueue *statistics_gather_metrics(void) { METRIC("maxmanagedsessions", "Max managed sessions", UINT64F, UINT64F, max_sess_iv); for (int i = 0; i < NGC_COUNT; i++) { - double min = (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.min.ng_command_times[i]) / 1000000.0; - double max = (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.ng_command_times[i]) / 1000000.0; - double avg = (double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.avg.ng_command_times[i]) / 1000000.0; + double min = (double) atomic64_get(&rtpe_sampled_graphite_min_max_sampled.min.ng_command_times[i]) / 1000000.0; + double max = (double) atomic64_get(&rtpe_sampled_graphite_min_max_sampled.max.ng_command_times[i]) / 1000000.0; + double avg = (double) atomic64_get(&rtpe_sampled_graphite_avg.avg.ng_command_times[i]) / 1000000.0; AUTO_CLEANUP(char *min_label, free_gbuf) = g_strdup_printf("min%sdelay", ng_command_strings[i]); AUTO_CLEANUP(char *max_label, free_gbuf) = g_strdup_printf("max%sdelay", ng_command_strings[i]); AUTO_CLEANUP(char *avg_label, free_gbuf) = g_strdup_printf("avg%sdelay", ng_command_strings[i]); @@ -501,27 +504,23 @@ GQueue *statistics_gather_metrics(void) { HEADER(NULL, ""); HEADER("}", ""); - uint64_t metric_num, metric_tot, metric2_tot; - double metric_mean, metric_variance; + struct global_sampled_avg sampled_avgs; + stats_sampled_avg(&sampled_avgs, &rtpe_stats_sampled); #define STAT_GET_PRINT(stat_name, name, divisor) \ - metric_num = atomic64_get(&rtpe_stats_gauge_cumulative.count.stat_name); \ - metric_tot = atomic64_get(&rtpe_stats_gauge_cumulative.avg.stat_name); \ - metric2_tot = atomic64_get(&rtpe_stats_gauge_cumulative.stddev.stat_name); \ - metric_mean = metric_num ? (double) metric_tot / (double) metric_num : 0.0; \ - metric_variance = metric_num \ - ? fabs((double) metric2_tot / (double) metric_num - metric_mean * metric_mean) \ - : 0.0; \ METRIC(#stat_name "_total", "Sum of all " name " values sampled", "%.6f", "%.6f", \ - (double) metric_tot / (divisor)); \ + (double) atomic64_get(&rtpe_stats_sampled.sums.stat_name) / (divisor)); \ PROM(#stat_name "_total", "counter"); \ METRIC(#stat_name "2_total", "Sum of all " name " square values sampled", "%.6f", "%.6f", \ - (double) metric2_tot / (divisor * divisor)); \ + (double) atomic64_get(&rtpe_stats_sampled.sums_squared.stat_name) / (divisor * divisor)); \ PROM(#stat_name "2_total", "counter"); \ - METRIC(#stat_name "_samples_total", "Total number of " name " samples", UINT64F, UINT64F, metric_num); \ + METRIC(#stat_name "_samples_total", "Total number of " name " samples", UINT64F, UINT64F, \ + atomic64_get(&rtpe_stats_sampled.counts.stat_name)); \ PROM(#stat_name "_samples_total", "counter"); \ - METRIC(#stat_name "_average", "Average " name, "%.6f", "%.6f", metric_mean / 10.0); \ - METRIC(#stat_name "_stddev", name " standard deviation", "%.6f", "%.6f", sqrt(metric_variance) / 10.0); \ + METRIC(#stat_name "_average", "Average " name, "%.6f", "%.6f", \ + (double) atomic64_get(&sampled_avgs.avg.stat_name) / (divisor)); \ + METRIC(#stat_name "_stddev", name " standard deviation", "%.6f", "%.6f", \ + (double) atomic64_get(&sampled_avgs.stddev.stat_name) / (divisor * divisor)); HEADER("mos", "MOS statistics:"); HEADER("{", ""); diff --git a/include/aux.h b/include/aux.h index 30b5b752e..a075fe704 100644 --- a/include/aux.h +++ b/include/aux.h @@ -529,7 +529,7 @@ INLINE void atomic64_calc_rate(const atomic64 *ax_var, long long run_diff_us, uint64_t ax = atomic64_get(ax_var); uint64_t old_intv = atomic64_get(intv_var); atomic64_set(intv_var, ax); - atomic64_set(rate_var, (ax - old_intv) * 1000000LL / run_diff_us); + atomic64_set(rate_var, run_diff_us ? (ax - old_intv) * 1000000LL / run_diff_us : 0); } INLINE void atomic64_calc_diff(const atomic64 *ax_var, atomic64 *intv_var, atomic64 *diff_var) { uint64_t ax = atomic64_get(ax_var); @@ -543,7 +543,41 @@ INLINE void atomic64_mina(atomic64 *min, atomic64 *inp) { INLINE void atomic64_maxa(atomic64 *max, atomic64 *inp) { atomic64_max(max, atomic64_get(inp)); } +INLINE double atomic64_div(const atomic64 *n, const atomic64 *d) { + int64_t dd = atomic64_get(d); + if (!dd) + return 0.; + return (double) atomic64_get(n) / (double) dd; +} + + + +/*** STATS HELPERS ***/ + +#define STAT_MIN_MAX_RESET_ZERO(x, mm, loc) \ + atomic64_set(&loc->min.x, atomic64_get_set(&mm->min.x, 0)); \ + atomic64_set(&loc->max.x, atomic64_get_set(&mm->max.x, 0)); +#define STAT_MIN_MAX(x, loc, mm, cur) \ + atomic64_set(&loc->min.x, atomic64_get_set(&mm->min.x, atomic64_get(&cur->x))); \ + atomic64_set(&loc->max.x, atomic64_get_set(&mm->max.x, atomic64_get(&cur->x))); + +#define STAT_MIN_MAX_AVG(x, mm, loc, run_diff_us, counter_diff) \ + atomic64_set(&loc->min.x, atomic64_get_set(&mm->min.x, 0)); \ + atomic64_set(&loc->max.x, atomic64_get_set(&mm->max.x, 0)); \ + atomic64_set(&loc->avg.x, run_diff_us ? atomic64_get(&counter_diff->x) * 1000000LL / run_diff_us : 0); + +#define STAT_SAMPLED_CALC_DIFF(x, stats, intv, diff) \ + atomic64_calc_diff(&stats->sums.x, &intv->sums.x, &diff->sums.x); \ + atomic64_calc_diff(&stats->sums_squared.x, &intv->sums_squared.x, &diff->sums_squared.x); \ + atomic64_calc_diff(&stats->counts.x, &intv->counts.x, &diff->counts.x); + +#define STAT_SAMPLED_AVG_STDDEV(x, loc, diff) { \ + double __mean = atomic64_div(&diff->sums.x, &diff->counts.x); \ + atomic64_set(&loc->avg.x, __mean); \ + atomic64_set(&loc->stddev.x, sqrt(fabs(atomic64_div(&diff->sums_squared.x, &diff->counts.x) \ + - __mean * __mean))); \ + } diff --git a/include/gauge_stats_fields.inc b/include/gauge_stats_fields.inc index 9a88047ef..ee212b8ff 100644 --- a/include/gauge_stats_fields.inc +++ b/include/gauge_stats_fields.inc @@ -4,13 +4,6 @@ F(transcoded_media) F(ipv4_sessions) F(ipv6_sessions) F(mixed_sessions) -FdA(ng_command_times, NGC_COUNT) F(userspace_streams) F(kernel_only_streams) F(kernel_user_streams) -F(mos) -F(jitter) -F(rtt_e2e) -F(rtt_dsct) -F(packetloss) -F(jitter_measured) diff --git a/include/graphite.h b/include/graphite.h index e655b19b9..2f0ad4633 100644 --- a/include/graphite.h +++ b/include/graphite.h @@ -18,13 +18,17 @@ enum connection_state { extern struct timeval rtpe_latest_graphite_interval_start; -extern struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max; -extern struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval; - extern struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases extern struct global_rate_min_max rtpe_rate_graphite_min_max; // running min/max, reset when graphite runs extern struct global_rate_min_max_avg rtpe_rate_graphite_min_max_avg_sampled; // updated once per graphite run +extern struct global_gauge_min_max rtpe_gauge_graphite_min_max; // running min/max, reset when graphite runs +extern struct global_gauge_min_max rtpe_gauge_graphite_min_max_sampled; // updated once per graphite run + +extern struct global_sampled_min_max rtpe_sampled_graphite_min_max; // running min/max, reset when graphite runs +extern struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; // updated once per graphite run +extern struct global_sampled_avg rtpe_sampled_graphite_avg; // updated once per graphite run + void set_prefix(char* prefix); void free_prefix(void); diff --git a/include/sampled_stats_fields.inc b/include/sampled_stats_fields.inc new file mode 100644 index 000000000..0c3895d2a --- /dev/null +++ b/include/sampled_stats_fields.inc @@ -0,0 +1,7 @@ +FA(ng_command_times, NGC_COUNT) +F(mos) +F(jitter) +F(rtt_e2e) +F(rtt_dsct) +F(packetloss) +F(jitter_measured) diff --git a/include/statistics.h b/include/statistics.h index 45956c08b..17416fe95 100644 --- a/include/statistics.h +++ b/include/statistics.h @@ -27,26 +27,38 @@ struct stream_stats { // "gauge" style stats struct global_stats_gauge { -// F(x) : real gauge that has a continuous value -// Fd(x) : gauge that receives values in discreet samples -// FA(x, n) / FdA(x, n) : array of the above #define F(x) atomic64 x; -#define Fd(x) F(x) -#define FA(x, n) atomic64 x[n]; -#define FdA(x, n) FA(x, n) #include "gauge_stats_fields.inc" #undef F -#undef Fd -#undef FA -#undef FdA }; -struct global_stats_gauge_min_max { +// high/low water marks +struct global_gauge_min_max { struct global_stats_gauge min; struct global_stats_gauge max; - struct global_stats_gauge avg; // sum while accumulation is running - struct global_stats_gauge stddev; // sum^2 while accumulation is running - struct global_stats_gauge count; +}; + +// "sampled" style stats +struct global_stats_sampled_fields { +#define F(x) atomic64 x; +#define FA(x, n) atomic64 x[n]; +#include "sampled_stats_fields.inc" +#undef F +#undef FA +}; + +struct global_stats_sampled { + struct global_stats_sampled_fields sums; + struct global_stats_sampled_fields sums_squared; + struct global_stats_sampled_fields counts; +}; +struct global_sampled_min_max { + struct global_stats_sampled_fields min; + struct global_stats_sampled_fields max; +}; +struct global_sampled_avg { + struct global_stats_sampled_fields avg; + struct global_stats_sampled_fields stddev; }; // "counter" style stats that are incremental and are kept cumulative or per-interval @@ -120,23 +132,44 @@ extern mutex_t rtpe_codec_stats_lock; extern GHashTable *rtpe_codec_stats; -extern struct global_stats_gauge rtpe_stats_gauge; -extern struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative; // lifetime min/max/average/sums +extern struct global_stats_gauge rtpe_stats_gauge; // master values +extern struct global_gauge_min_max rtpe_gauge_min_max; // master lifetime min/max +#define RTPE_GAUGE_SET_MIN_MAX(field, min_max_struct, val) \ + do { \ + atomic64_min(&min_max_struct.min.field, val); \ + atomic64_max(&min_max_struct.max.field, val); \ + } while (0) #define RTPE_GAUGE_SET(field, num) \ do { \ atomic64_set(&rtpe_stats_gauge.field, num); \ - RTPE_GAUGE_SET_MIN_MAX(field, rtpe_stats_gauge_cumulative, num); \ - RTPE_GAUGE_SET_MIN_MAX(field, rtpe_stats_gauge_graphite_min_max, num); \ + RTPE_GAUGE_SET_MIN_MAX(field, rtpe_gauge_min_max, num); \ + RTPE_GAUGE_SET_MIN_MAX(field, rtpe_gauge_graphite_min_max, num); \ } while (0) #define RTPE_GAUGE_ADD(field, num) \ do { \ uint64_t __old = atomic64_add(&rtpe_stats_gauge.field, num); \ - RTPE_GAUGE_SET_MIN_MAX(field, rtpe_stats_gauge_graphite_min_max, __old + num); \ + RTPE_GAUGE_SET_MIN_MAX(field, rtpe_gauge_min_max, __old + num); \ + RTPE_GAUGE_SET_MIN_MAX(field, rtpe_gauge_graphite_min_max, __old + num); \ } while (0) +// TODO: ^ skip doing this for graphite if it's not actually enabled #define RTPE_GAUGE_INC(field) RTPE_GAUGE_ADD(field, 1) #define RTPE_GAUGE_DEC(field) RTPE_GAUGE_ADD(field, -1) + +extern struct global_stats_sampled rtpe_stats_sampled; // master cumulative values +extern struct global_sampled_min_max rtpe_sampled_min_max; // master lifetime min/max + +#define RTPE_STATS_SAMPLE(field, num) \ + do { \ + atomic64_add(&rtpe_stats_sampled.sums.field, num); \ + atomic64_add(&rtpe_stats_sampled.sums_squared.field, num * num); \ + atomic64_inc(&rtpe_stats_sampled.counts.field); \ + RTPE_GAUGE_SET_MIN_MAX(field, rtpe_sampled_min_max, num); \ + RTPE_GAUGE_SET_MIN_MAX(field, rtpe_sampled_graphite_min_max, num); \ + } while (0) +// TODO: ^ skip doing this for graphite if it's not actually enabled + extern struct global_stats_counter rtpe_stats; // total, cumulative, master extern struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run @@ -157,13 +190,11 @@ const char *statistics_ng(bencode_item_t *input, bencode_item_t *output); INLINE void stats_counters_calc_rate(const struct global_stats_counter *stats, long long run_diff_us, struct global_stats_counter *intv, struct global_stats_counter *rate) { - if (run_diff_us <= 0) - return; - #define F(x) atomic64_calc_rate(&stats->x, run_diff_us, &intv->x, &rate->x); #define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } #include "counter_stats_fields.inc" #undef F +#undef FA } INLINE void stats_counters_calc_diff(const struct global_stats_counter *stats, @@ -173,6 +204,7 @@ INLINE void stats_counters_calc_diff(const struct global_stats_counter *stats, #define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } #include "counter_stats_fields.inc" #undef F +#undef FA } // update the running min/max counter `mm` with the newly calculated per-sec rate values `inp` @@ -180,55 +212,56 @@ INLINE void stats_rate_min_max(struct global_rate_min_max *mm, struct global_sta #define F(x) \ atomic64_mina(&mm->min.x, &inp->x); \ atomic64_maxa(&mm->max.x, &inp->x); +#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } #include "counter_stats_fields.inc" #undef F +#undef FA } // sample running min/max from `mm` into `loc` and reset `mm` to zero. // calculate average values in `loc` from `counter_diff` and `time_diff_us` INLINE void stats_rate_min_max_avg_sample(struct global_rate_min_max *mm, struct global_rate_min_max_avg *loc, long long run_diff_us, const struct global_stats_counter *counter_diff) { -#define F(x) \ - atomic64_set(&loc->min.x, atomic64_get_set(&mm->min.x, 0)); \ - atomic64_set(&loc->max.x, atomic64_get_set(&mm->max.x, 0)); \ - atomic64_set(&loc->avg.x, run_diff_us ? atomic64_get(&counter_diff->x) * 1000000LL / run_diff_us : 0); +#define F(x) STAT_MIN_MAX_AVG(x, mm, loc, run_diff_us, counter_diff) +#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } #include "counter_stats_fields.inc" #undef F +#undef FA } -#define RTPE_GAUGE_SET_MIN_MAX(field, min_max_struct, val) \ - do { \ - atomic64_min(&min_max_struct.min.field, val); \ - atomic64_max(&min_max_struct.max.field, val); \ - atomic64_add(&min_max_struct.avg.field, val); \ - atomic64_add(&min_max_struct.stddev.field, (val) * (val)); \ - atomic64_inc(&min_max_struct.count.field); \ - } while (0) - -extern struct global_stats_gauge rtpe_stats_gauge; +INLINE void stats_sampled_calc_diff(const struct global_stats_sampled *stats, + struct global_stats_sampled *intv, struct global_stats_sampled *diff) +{ +#define F(x) STAT_SAMPLED_CALC_DIFF(x, stats, intv, diff) +#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } +#include "sampled_stats_fields.inc" +#undef F +#undef FA +} +// sample running min/max from `mm` into `loc` and reset `mm` to zero. +INLINE void stats_sampled_min_max_sample(struct global_sampled_min_max *mm, + struct global_sampled_min_max *loc) { +#define F(x) STAT_MIN_MAX_RESET_ZERO(x, mm, loc) +#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } +#include "sampled_stats_fields.inc" +#undef F +#undef FA +} +INLINE void stats_sampled_avg(struct global_sampled_avg *loc, + const struct global_stats_sampled *diff) { +#define F(x) STAT_SAMPLED_AVG_STDDEV(x, loc, diff) +#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) } +#include "sampled_stats_fields.inc" +#undef F +#undef FA +} -INLINE void stats_gauge_calc_avg_reset(struct global_stats_gauge_min_max *out, - struct global_stats_gauge_min_max *in_reset) +// sample running min/max from `in_reset` into `out` and reset `in_reset` to the current value. +INLINE void stats_gauge_min_max_sample(struct global_gauge_min_max *out, + struct global_gauge_min_max *in_reset, const struct global_stats_gauge *cur) { - uint64_t cur, count; - -#define Fc(x) \ - atomic64_set(&out->min.x, atomic64_get_set(&in_reset->min.x, cur)); \ - atomic64_set(&out->max.x, atomic64_get_set(&in_reset->max.x, cur)); \ - count = atomic64_get_set(&in_reset->count.x, 0); \ - atomic64_set(&out->count.x, count); \ - atomic64_set(&out->avg.x, count ? atomic64_get_set(&in_reset->avg.x, 0) / count : 0); -#define F(x) \ - cur = atomic64_get(&rtpe_stats_gauge.x); \ - Fc(x) -#define Fd(x) \ - cur = 0; \ - Fc(x) -#define FdA(x, n) for (int i = 0; i < n; i++) { Fd(x[i]) } +#define F(x) STAT_MIN_MAX(x, out, in_reset, cur) #include "gauge_stats_fields.inc" -#undef Fc #undef F -#undef Fd -#undef FdA } diff --git a/t/test-payload-tracker.c b/t/test-payload-tracker.c index 59b1b2f3a..81c4b3c86 100644 --- a/t/test-payload-tracker.c +++ b/t/test-payload-tracker.c @@ -7,15 +7,13 @@ struct rtpengine_config rtpe_config; struct global_stats_gauge rtpe_stats_gauge; -struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative; -struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max; -struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval; +struct global_gauge_min_max rtpe_gauge_min_max; struct global_stats_counter rtpe_stats; struct global_stats_counter rtpe_stats_rate; -struct global_stats_counter rtpe_stats_graphite_diff; -struct global_rate_min_max rtpe_rate_graphite_min_max; -struct global_rate_min_max_avg rtpe_rate_graphite_min_max_avg_sampled; - +struct global_stats_sampled rtpe_stats_sampled; +struct global_sampled_min_max rtpe_sampled_min_max; +struct global_sampled_min_max rtpe_sampled_graphite_min_max; +struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) { char buf[1024] = ""; diff --git a/t/test-stats.c b/t/test-stats.c index 59444ad09..2ee2d0974 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -1122,10 +1122,10 @@ int main(void) { "}\n"); RTPE_STATS_INC(ng_commands[NGC_OFFER]); - RTPE_GAUGE_SET(ng_command_times[NGC_OFFER], 2500000LL); + RTPE_STATS_SAMPLE(ng_command_times[NGC_OFFER], 2500000LL); RTPE_STATS_INC(ng_commands[NGC_OFFER]); - RTPE_GAUGE_SET(ng_command_times[NGC_OFFER], 3200000LL); + RTPE_STATS_SAMPLE(ng_command_times[NGC_OFFER], 3200000LL); graph_str = print_graphite_data(); assert_g_string_eq(graph_str, @@ -2175,7 +2175,7 @@ int main(void) { "}\n"); RTPE_STATS_INC(ng_commands[NGC_ANSWER]); - RTPE_GAUGE_SET(ng_command_times[NGC_ANSWER], 3200000LL); + RTPE_STATS_SAMPLE(ng_command_times[NGC_ANSWER], 3200000LL); graph_str = print_graphite_data(); assert_g_string_eq(graph_str,