diff --git a/daemon/aux.c b/daemon/aux.c index ad34fc449..3331dd2f0 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -34,6 +34,10 @@ static cond_t threads_cond = COND_STATIC_INIT; static struct thread_buf __thread t_bufs[NUM_THREAD_BUFS]; static int __thread t_buf_idx; +#ifdef NEED_ATOMIC64_MUTEX +mutex_t __atomic64_mutex = MUTEX_STATIC_INIT; +#endif + GList *g_list_link(GList *list, GList *el) { diff --git a/daemon/aux.h b/daemon/aux.h index 96e7df160..0a9ad5893 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -510,4 +510,99 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) { unsigned int in6_addr_hash(const void *p); int in6_addr_eq(const void *a, const void *b); + + +#if GLIB_SIZEOF_VOID_P >= 8 + +typedef struct { + void *p; +} atomic64; + +INLINE u_int64_t atomic64_get(const atomic64 *u) { + return (u_int64_t) g_atomic_pointer_get(&u->p); +} +INLINE u_int64_t atomic64_get_na(const atomic64 *u) { + return (u_int64_t) u->p; +} +INLINE void atomic64_set(atomic64 *u, u_int64_t a) { + g_atomic_pointer_set(&u->p, (void *) a); +} +INLINE void atomic64_set_na(atomic64 *u, u_int64_t a) { + u->p = (void *) a; +} +INLINE void atomic64_add(atomic64 *u, u_int64_t a) { + g_atomic_pointer_add(&u->p, a); +} +INLINE void atomic64_add_na(atomic64 *u, u_int64_t a) { + u->p = (void *) (((u_int64_t) u->p) + a); +} +INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) { + u_int64_t old; + do { + old = atomic64_get(u); + if (g_atomic_pointer_compare_and_exchange(&u->p, (void *) old, (void *) a)) + return old; + } while (1); +} + +#else + +/* Simulate atomic u64 with a global mutex on non-64-bit platforms. + * Bad performance possible, thus not recommended. */ + +typedef struct { + u_int64_t u; +} atomic64; + +#define NEED_ATOMIC64_MUTEX +extern mutex_t __atomic64_mutex; + +INLINE u_int64_t atomic64_get(const atomic64 *u) { + u_int64_t ret; + mutex_lock(&__atomic64_mutex); + ret = u->u; + mutex_unlock(&__atomic64_mutex); + return ret; +} +INLINE u_int64_t atomic64_get_na(const atomic64 *u) { + return u->u; +} +INLINE void atomic64_set(atomic64 *u, u_int64_t a) { + mutex_lock(&__atomic64_mutex); + u->u = a; + mutex_unlock(&__atomic64_mutex); +} +INLINE void atomic64_set_na(atomic64 *u, u_int64_t a) { + u->u = a; +} +INLINE void atomic64_add(atomic64 *u, u_int64_t a) { + mutex_lock(&__atomic64_mutex); + u->u += a; + mutex_unlock(&__atomic64_mutex); +} +INLINE void atomic64_add_na(atomic64 *u, u_int64_t a) { + u->u += a; +} +INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) { + u_int64_t old; + mutex_lock(&__atomic64_mutex); + old = u->u; + u->u = a; + mutex_unlock(&__atomic64_mutex); + return old; +} + +#endif + +INLINE void atomic64_inc(atomic64 *u) { + atomic64_add(u, 1); +} +INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) { + atomic64_set_na(dst, atomic64_get_set(src, 0)); +} +#define atomic64_local_copy_zero_struct(d, s, member) \ + atomic64_local_copy_zero(&((d)->member), &((s)->member)) + + + #endif diff --git a/daemon/call.c b/daemon/call.c index 6492a0abb..b3652e99e 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -687,12 +687,9 @@ loop_ok: if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) { ilog(LOG_WARNING, "RTP packet from %s discarded", addr); - mutex_lock(&stream->in_lock); - stream->stats.errors++; - mutex_lock(&cm->statspslock); - cm->statsps.errors++; - mutex_unlock(&cm->statspslock); - goto done; + atomic64_inc(&stream->stats.errors); + atomic64_inc(&cm->statsps.errors); + goto unlock_out; } mutex_lock(&in_srtp->in_lock); @@ -755,7 +752,7 @@ use_cand: mutex_unlock(&stream->out_lock); if (tmp && PS_ISSET(stream, STRICT_SOURCE)) { - stream->stats.errors++; + atomic64_inc(&stream->stats.errors); goto drop; } } @@ -844,10 +841,8 @@ forward: if (ret == -1) { ret = -errno; ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); - stream->stats.errors++; - mutex_lock(&cm->statspslock); - cm->statsps.errors++; - mutex_unlock(&cm->statspslock); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&cm->statsps.errors); goto out; } @@ -857,13 +852,11 @@ drop: if (sink) mutex_unlock(&sink->out_lock); ret = 0; - stream->stats.packets++; - stream->stats.bytes += s->len; - stream->last_packet = poller_now; - mutex_lock(&cm->statspslock); - cm->statsps.packets++; - cm->statsps.bytes += s->len; - mutex_unlock(&cm->statspslock); + atomic64_inc(&stream->stats.packets); + atomic64_add(&stream->stats.bytes, s->len); + atomic64_set(&stream->last_packet, poller_now); + atomic64_inc(&cm->statsps.packets); + atomic64_add(&cm->statsps.bytes, s->len); out: if (ret == 0 && update) @@ -1078,7 +1071,7 @@ no_sfd: tmp_t_reason = 2; } - if (poller_now - ps->last_packet < check) + if (poller_now - atomic64_get(&ps->last_packet) < check) good = 1; next: @@ -1093,7 +1086,6 @@ next: for (i = c->monologues; i; i = i->next) { ml = i->data; - memset(&ml->terminated,0,sizeof(struct timeval)); gettimeofday(&(ml->terminated),NULL); if (tmp_t_reason==1) { ml->term_reason = TIMEOUT; @@ -1288,16 +1280,14 @@ destroy: #define DS(x) do { \ - mutex_lock(&ps->in_lock); \ - if (ke->stats.x < ps->kernel_stats.x) \ + u_int64_t ks_val, d; \ + ks_val = atomic64_get(&ps->kernel_stats.x); \ + if (ke->stats.x < ks_val) \ d = 0; \ else \ - d = ke->stats.x - ps->kernel_stats.x; \ - ps->stats.x += d; \ - mutex_unlock(&ps->in_lock); \ - mutex_lock(&m->statspslock); \ - m->statsps.x += d; \ - mutex_unlock(&m->statspslock); \ + d = ke->stats.x - ks_val; \ + atomic64_add(&ps->stats.x, d); \ + atomic64_add(&m->statsps.x, d); \ } while (0) static void callmaster_timer(void *ptr) { struct callmaster *m = ptr; @@ -1305,7 +1295,6 @@ static void callmaster_timer(void *ptr) { GList *i; struct rtpengine_list_entry *ke; struct packet_stream *ps, *sink; - u_int64_t d; struct stats tmpstats; int j, update; struct stream_fd *sfd; @@ -1316,13 +1305,13 @@ static void callmaster_timer(void *ptr) { g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp); rwlock_unlock_r(&m->hashlock); - mutex_lock(&m->statspslock); - memcpy(&tmpstats, &m->statsps, sizeof(tmpstats)); - ZERO(m->statsps); - mutex_unlock(&m->statspslock); - mutex_lock(&m->statslock); - memcpy(&m->stats, &tmpstats, sizeof(m->stats)); - mutex_unlock(&m->statslock); + atomic64_local_copy_zero_struct(&tmpstats, &m->statsps, bytes); + atomic64_local_copy_zero_struct(&tmpstats, &m->statsps, packets); + atomic64_local_copy_zero_struct(&tmpstats, &m->statsps, errors); + + atomic64_set(&m->stats.bytes, atomic64_get_na(&tmpstats.bytes)); + atomic64_set(&m->stats.packets, atomic64_get_na(&tmpstats.packets)); + atomic64_set(&m->stats.errors, atomic64_get_na(&tmpstats.errors)); i = (m->conf.kernelid >= 0) ? kernel_list(m->conf.kernelid) : NULL; while (i) { @@ -1346,12 +1335,12 @@ static void callmaster_timer(void *ptr) { mutex_lock(&ps->in_lock); - if (ke->stats.packets != ps->kernel_stats.packets) - ps->last_packet = poller_now; + if (ke->stats.packets != atomic64_get(&ps->kernel_stats.packets)) + atomic64_set(&ps->last_packet, poller_now); - ps->kernel_stats.packets = ke->stats.packets; - ps->kernel_stats.bytes = ke->stats.bytes; - ps->kernel_stats.errors = ke->stats.errors; + atomic64_set(&ps->kernel_stats.bytes, ke->stats.bytes); + atomic64_set(&ps->kernel_stats.packets, ke->stats.packets); + atomic64_set(&ps->kernel_stats.errors, ke->stats.errors); update = 0; @@ -1423,7 +1412,8 @@ struct callmaster *callmaster_new(struct poller *p) { poller_add_timer(p, callmaster_timer, &c->obj); - mutex_init(&c->totalstats_lock); + mutex_init(&c->totalstats.total_average_lock); + mutex_init(&c->totalstats_interval.total_average_lock); c->totalstats.started = poller_now; mutex_init(&c->cngs_lock); @@ -1764,7 +1754,7 @@ struct packet_stream *__packet_stream_new(struct call *call) { mutex_init(&stream->in_lock); mutex_init(&stream->out_lock); stream->call = call; - stream->last_packet = poller_now; + atomic64_set_na(&stream->last_packet, poller_now); call->streams = g_slist_prepend(call->streams, stream); return stream; @@ -2376,31 +2366,56 @@ static void unkernelize(struct packet_stream *p) { } void timeval_subtract (struct timeval *result, const struct timeval *a, const struct timeval *b) { - long microseconds=0; - microseconds = ((long)a->tv_sec - (long)b->tv_sec) * 1000000 + ((long)a->tv_usec - (long)b->tv_usec); - result->tv_sec = microseconds/(long)1000000; - result->tv_usec = microseconds%(long)1000000; + u_int64_t microseconds=0; + microseconds = ((u_int64_t)a->tv_sec - (u_int64_t)b->tv_sec) * 1000000LLU + (a->tv_usec - b->tv_usec); + result->tv_sec = microseconds/1000000LLU; + result->tv_usec = microseconds%1000000LLU; } void timeval_multiply(struct timeval *result, const struct timeval *a, const long multiplier) { - long microseconds=0; - microseconds = (((long)a->tv_sec * 1000000) + (long)a->tv_usec) * multiplier; - result->tv_sec = microseconds/(long)1000000; - result->tv_usec = microseconds%(long)1000000; + u_int64_t microseconds=0; + microseconds = (((u_int64_t)a->tv_sec * 1000000LLU) + a->tv_usec) * multiplier; + result->tv_sec = microseconds/1000000LLU; + result->tv_usec = microseconds%1000000LLU; } -void timeval_devide(struct timeval *result, const struct timeval *a, const long devisor) { - long microseconds=0; - microseconds = (((long)a->tv_sec * 1000000) + (long)a->tv_usec) / devisor; - result->tv_sec = microseconds/(long)1000000; - result->tv_usec = microseconds%(long)1000000; +void timeval_divide(struct timeval *result, const struct timeval *a, const long divisor) { + u_int64_t microseconds=0; + microseconds = (((u_int64_t)a->tv_sec * 1000000LLU) + a->tv_usec) / divisor; + result->tv_sec = microseconds/1000000LLU; + result->tv_usec = microseconds%1000000LLU; } void timeval_add(struct timeval *result, const struct timeval *a, const struct timeval *b) { - long microseconds=0; - microseconds = ((long)a->tv_sec + (long)b->tv_sec) * (long)1000000 + ((long)a->tv_usec + (long)b->tv_usec); - result->tv_sec = microseconds/(long)1000000; - result->tv_usec = microseconds%(long)1000000; + u_int64_t microseconds=0; + microseconds = ((u_int64_t)a->tv_sec + (u_int64_t)b->tv_sec) * 1000000LLU + (a->tv_usec + b->tv_usec); + result->tv_sec = microseconds/1000000LLU; + result->tv_usec = microseconds%1000000LLU; +} + +static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { + struct timeval dp, oa; + + mutex_lock(&s->total_average_lock); + + // new average = ((old average * old num sessions) + datapoint) / new num sessions + // ... but this will overflow when num sessions becomes very large + + // timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess); + // timeval_add(&t, &t, add); + // s->total_managed_sess++; + // timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess); + + // alternative: + // new average = old average + (datapoint / new num sessions) - (old average / new num sessions) + + s->total_managed_sess++; + timeval_divide(&dp, add, s->total_managed_sess); + timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess); + timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp); + timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa); + + mutex_unlock(&s->total_average_lock); } /* called lock-free, but must hold a reference to the call */ @@ -2447,9 +2462,8 @@ void call_destroy(struct call *c) { cdrbufcur += sprintf(cdrbufcur,"tos=%u, ", (unsigned int)c->tos); for (l = c->monologues; l; l = l->next) { ml = l->data; + timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started); if (_log_facility_cdr) { - memset(&tim_result_duration,0,sizeof(struct timeval)); - timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started); cdrbufcur += sprintf(cdrbufcur, "ml%i_start_time=%ld.%06lu, " "ml%i_end_time=%ld.%06ld, " "ml%i_duration=%ld.%06ld, " @@ -2491,36 +2505,42 @@ void call_destroy(struct call *c) { "ml%i_midx%u_%s_endpoint_ip=%s, " "ml%i_midx%u_%s_endpoint_port=%u, " "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets=%llu, " - "ml%i_midx%u_%s_relayed_bytes=%llu, " - "ml%i_midx%u_%s_relayed_errors=%llu, " - "ml%i_midx%u_%s_last_packet=%llu, ", + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", ", cdrlinecnt, md->index, protocol, addr, cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet); + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet)); } ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " - "%llu p, %llu b, %llu e, %llu last_packet", + ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet", md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), addr, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - (unsigned long long) ps->stats.packets, - (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors, - (unsigned long long) ps->last_packet); - - mutex_lock(&m->totalstats_lock); - m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; - m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets; - m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; - m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors; - mutex_unlock(&m->totalstats_lock); + atomic64_get(&ps->stats.packets), + atomic64_get(&ps->stats.bytes), + atomic64_get(&ps->stats.errors), + atomic64_get(&ps->last_packet)); + + atomic64_add(&m->totalstats.total_relayed_packets, + atomic64_get(&ps->stats.packets)); + atomic64_add(&m->totalstats_interval.total_relayed_packets, + atomic64_get(&ps->stats.packets)); + atomic64_add(&m->totalstats.total_relayed_errors, + atomic64_get(&ps->stats.errors)); + atomic64_add(&m->totalstats_interval.total_relayed_errors, + atomic64_get(&ps->stats.errors)); } } if (_log_facility_cdr) @@ -2528,10 +2548,7 @@ void call_destroy(struct call *c) { } // --- for statistics getting one way stream or no relay at all - mutex_lock(&m->totalstats_lock); - m->totalstats.total_nopacket_relayed_sess *= 2; - m->totalstats_interval.total_nopacket_relayed_sess *= 2; - mutex_unlock(&m->totalstats_lock); + int total_nopacket_relayed_sess = 0; for (l = c->monologues; l; l = l->next) { ml = l->data; @@ -2569,52 +2586,38 @@ void call_destroy(struct call *c) { } } - if (ps && ps2 && ps2->stats.packets==0) { - mutex_lock(&m->totalstats_lock); - if (ps->stats.packets!=0) { - m->totalstats.total_oneway_stream_sess++; - m->totalstats_interval.total_oneway_stream_sess++; + if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { + if (atomic64_get(&ps->stats.packets)!=0) { + atomic64_inc(&m->totalstats.total_oneway_stream_sess); + atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); } else { - m->totalstats.total_nopacket_relayed_sess++; - m->totalstats_interval.total_nopacket_relayed_sess++; + total_nopacket_relayed_sess++; } - mutex_unlock(&m->totalstats_lock); } } - mutex_lock(&m->totalstats_lock); - m->totalstats.total_nopacket_relayed_sess /= 2; - m->totalstats_interval.total_nopacket_relayed_sess /= 2; - - m->totalstats.total_managed_sess += 1; - m->totalstats_interval.total_managed_sess += 1; + atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); ml = c->monologues->data; if (ml->term_reason==TIMEOUT) { - m->totalstats.total_timeout_sess++; - m->totalstats_interval.total_timeout_sess++; + atomic64_inc(&m->totalstats.total_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_timeout_sess); } else if (ml->term_reason==SILENT_TIMEOUT) { - m->totalstats.total_silent_timeout_sess++; - m->totalstats_interval.total_silent_timeout_sess++; + atomic64_inc(&m->totalstats.total_silent_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); } else if (ml->term_reason==REGULAR) { - m->totalstats.total_regular_term_sess++; - m->totalstats_interval.total_regular_term_sess++; + atomic64_inc(&m->totalstats.total_regular_term_sess); + atomic64_inc(&m->totalstats_interval.total_regular_term_sess); } else if (ml->term_reason==FORCED) { - m->totalstats.total_forced_term_sess++; - m->totalstats_interval.total_forced_term_sess++; + atomic64_inc(&m->totalstats.total_forced_term_sess); + atomic64_inc(&m->totalstats_interval.total_forced_term_sess); } - timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1); - timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration); - timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess); - - timeval_multiply(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess-1); - timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration); - timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess); - - mutex_unlock(&m->totalstats_lock); + timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); + timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); if (_log_facility_cdr) /* log it */ @@ -2852,6 +2855,7 @@ struct call_monologue *__monologue_create(struct call *call) { ret->created = poller_now; ret->other_tags = g_hash_table_new(str_hash, str_equal); g_queue_init(&ret->medias); + gettimeofday(&ret->started, NULL); call->monologues = g_slist_prepend(call->monologues, ret); @@ -3026,7 +3030,6 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc for (i = c->monologues; i; i = i->next) { ml = i->data; - memset(&ml->terminated,0,sizeof(struct timeval)); gettimeofday(&(ml->terminated), NULL); ml->term_reason = REGULAR; } diff --git a/daemon/call.h b/daemon/call.h index 8be512487..67572d0de 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -12,6 +12,7 @@ #include #include "compat.h" #include "control_ng.h" +#include "aux.h" enum termination_reason { UNKNOWN=0, @@ -187,23 +188,25 @@ extern const struct transport_protocol transport_protocols[]; struct stats { - u_int64_t packets; - u_int64_t bytes; - u_int64_t errors; + atomic64 packets; + atomic64 bytes; + atomic64 errors; }; struct totalstats { time_t started; + atomic64 total_timeout_sess; + atomic64 total_silent_timeout_sess; + atomic64 total_regular_term_sess; + atomic64 total_forced_term_sess; + atomic64 total_relayed_packets; + atomic64 total_relayed_errors; + atomic64 total_nopacket_relayed_sess; + atomic64 total_oneway_stream_sess; + + mutex_t total_average_lock; /* for these two below */ u_int64_t total_managed_sess; - u_int64_t total_timeout_sess; - u_int64_t total_silent_timeout_sess; - u_int64_t total_regular_term_sess; - u_int64_t total_forced_term_sess; - u_int64_t total_relayed_packets; - u_int64_t total_relayed_errors; - u_int64_t total_nopacket_relayed_sess; - u_int64_t total_oneway_stream_sess; - struct timeval total_average_call_dur; + struct timeval total_average_call_dur; }; struct udp_fd { @@ -268,9 +271,9 @@ struct packet_stream { struct endpoint advertised_endpoint; /* RO */ struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct stats stats; /* LOCK: in_lock */ - struct stats kernel_stats; /* LOCK: in_lock */ - time_t last_packet; /* LOCK: in_lock */ + struct stats stats; + struct stats kernel_stats; + atomic64 last_packet; #if RTP_LOOP_PROTECT /* LOCK: in_lock: */ @@ -406,11 +409,8 @@ struct callmaster { BIT_ARRAY_DECLARE(ports_used, 0x10000); /* XXX rework these */ - mutex_t statspslock; struct stats statsps; /* per second stats, running timer */ - mutex_t statslock; struct stats stats; /* copied from statsps once a second */ - mutex_t totalstats_lock; /* for both of them */ struct totalstats totalstats; struct totalstats totalstats_interval; /* control_ng_stats stuff */ @@ -469,7 +469,7 @@ const struct transport_protocol *transport_protocol(const str *s); void timeval_subtract (struct timeval *result, const struct timeval *a, const struct timeval *b); void timeval_multiply(struct timeval *result, const struct timeval *a, const long multiplier); -void timeval_devide(struct timeval *result, const struct timeval *a, const long devisor); +void timeval_divide(struct timeval *result, const struct timeval *a, const long divisor); void timeval_add(struct timeval *result, const struct timeval *a, const struct timeval *b); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 67ce881f5..fa6af5243 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -386,8 +386,8 @@ str *call_query_udp(char **out, struct callmaster *m) { ret = str_sprintf("%s %lld "UINT64F" "UINT64F" "UINT64F" "UINT64F"\n", out[RE_UDP_COOKIE], (long long int) m->conf.silent_timeout - (poller_now - stats.last_packet), - stats.totals[0].packets, stats.totals[1].packets, - stats.totals[2].packets, stats.totals[3].packets); + atomic64_get_na(&stats.totals[0].packets), atomic64_get_na(&stats.totals[1].packets), + atomic64_get_na(&stats.totals[2].packets), atomic64_get_na(&stats.totals[3].packets)); goto out; err: @@ -431,20 +431,14 @@ static void call_status_iterator(struct call *c, struct control_stream *s) { } void calls_status_tcp(struct callmaster *m, struct control_stream *s) { - struct stats st; GQueue q = G_QUEUE_INIT; struct call *c; - mutex_lock(&m->statslock); - st = m->stats; - mutex_unlock(&m->statslock); - callmaster_get_all_calls(m, &q); - control_stream_printf(s, "proxy %u "UINT64F"/"UINT64F"/"UINT64F"\n", + control_stream_printf(s, "proxy %u "UINT64F"/%i/%i\n", g_queue_get_length(&q), - st.bytes, st.bytes - st.errors, - st.bytes * 2 - st.errors); + atomic64_get(&m->stats.bytes), 0, 0); while (q.head) { c = g_queue_pop_head(&q); @@ -726,14 +720,14 @@ const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_ } static void ng_stats(bencode_item_t *d, const struct stats *s, struct stats *totals) { - bencode_dictionary_add_integer(d, "packets", s->packets); - bencode_dictionary_add_integer(d, "bytes", s->bytes); - bencode_dictionary_add_integer(d, "errors", s->errors); + bencode_dictionary_add_integer(d, "packets", atomic64_get(&s->packets)); + bencode_dictionary_add_integer(d, "bytes", atomic64_get(&s->bytes)); + bencode_dictionary_add_integer(d, "errors", atomic64_get(&s->errors)); if (!totals) return; - totals->packets += s->packets; - totals->bytes += s->bytes; - totals->errors += s->errors; + atomic64_add_na(&totals->packets, atomic64_get(&s->packets)); + atomic64_add_na(&totals->bytes, atomic64_get(&s->bytes)); + atomic64_add_na(&totals->errors, atomic64_get(&s->errors)); } static void ng_stats_endpoint(bencode_item_t *dict, const struct endpoint *ep) { @@ -772,7 +766,7 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps if (ps->crypto.params.crypto_suite) bencode_dictionary_add_string(dict, "crypto suite", ps->crypto.params.crypto_suite->name); - bencode_dictionary_add_integer(dict, "last packet", ps->last_packet); + bencode_dictionary_add_integer(dict, "last packet", atomic64_get(&ps->last_packet)); flags = bencode_dictionary_add_list(dict, "flags"); @@ -788,8 +782,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps BF_PS("media handover", MEDIA_HANDOVER); stats: - if (totals->last_packet < ps->last_packet) - totals->last_packet = ps->last_packet; + if (totals->last_packet < atomic64_get(&ps->last_packet)) + totals->last_packet = atomic64_get(&ps->last_packet); /* XXX distinguish between input and output */ s = &totals->totals[0]; diff --git a/daemon/cli.c b/daemon/cli.c index 37e9a6aad..57cf70b7e 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -25,36 +25,39 @@ static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { int printlen=0; + struct timeval avg; + u_int64_t num_sessions; - mutex_lock(&m->totalstats_lock); + mutex_lock(&m->totalstats.total_average_lock); + avg = m->totalstats.total_average_call_dur; + num_sessions = m->totalstats.total_managed_sess; + mutex_unlock(&m->totalstats.total_average_lock); printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nTotal statistics (does not include current running sessions):\n\n"); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Uptime of rtpengine :%llu seconds\n", (unsigned long long)time(NULL)-m->totalstats.started); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :%llu\n", (unsigned long long)m->totalstats.total_managed_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :%llu\n",(unsigned long long)m->totalstats.total_timeout_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via SILENT_TIMEOUT :%llu\n",(unsigned long long)m->totalstats.total_silent_timeout_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via SILENT_TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_silent_timeout_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total regular terminated sessions :%llu\n",(unsigned long long)m->totalstats.total_regular_term_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total regular terminated sessions :"UINT64F"\n",atomic64_get(&m->totalstats.total_regular_term_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total forced terminated sessions :%llu\n",(unsigned long long)m->totalstats.total_forced_term_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total forced terminated sessions :"UINT64F"\n",atomic64_get(&m->totalstats.total_forced_term_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packets :%llu\n",(unsigned long long)m->totalstats.total_relayed_packets); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packets :"UINT64F"\n",atomic64_get(&m->totalstats.total_relayed_packets)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packet errors :%llu\n",(unsigned long long)m->totalstats.total_relayed_errors); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packet errors :"UINT64F"\n",atomic64_get(&m->totalstats.total_relayed_errors)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of streams with no relayed packets :%llu\n", (unsigned long long)m->totalstats.total_nopacket_relayed_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of streams with no relayed packets :"UINT64F"\n", atomic64_get(&m->totalstats.total_nopacket_relayed_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of 1-way streams :%llu\n",(unsigned long long)m->totalstats.total_oneway_stream_sess); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of 1-way streams :"UINT64F"\n",atomic64_get(&m->totalstats.total_oneway_stream_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",avg.tv_sec,avg.tv_usec); ADJUSTLEN(printlen,outbufend,replybuffer); - mutex_unlock(&m->totalstats_lock); - printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n"); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n", @@ -148,15 +151,15 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m continue; printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " - "%llu p, %llu b, %llu e, %llu last_packet\n", + ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n", md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - (unsigned long long) ps->stats.packets, - (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors, - (unsigned long long) ps->last_packet); + atomic64_get(&ps->stats.packets), + atomic64_get(&ps->stats.bytes), + atomic64_get(&ps->stats.errors), + atomic64_get(&ps->last_packet)); ADJUSTLEN(printlen,outbufend,replybuffer); } } @@ -309,7 +312,7 @@ next: sprintf(replybuffer, "Could currently not accept CLI commands. Reason:%s\n", strerror(errno)); goto cleanup; } - ilog(LOG_INFO, "Accept error:%s\n", strerror(errno)); + ilog(LOG_INFO, "Accept error:%s", strerror(errno)); goto next; } @@ -319,15 +322,15 @@ next: readbytes = read(nfd, inbuf+inlen, MAXINPUT); if (readbytes == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s\n", strerror(errno)); + ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s", strerror(errno)); goto cleanup; } - ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s\n", strerror(errno)); + ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s", strerror(errno)); } inlen += readbytes; } while (readbytes > 0); - ilog(LOG_INFO, "Got CLI command:%s\n",inbuf); + ilog(LOG_INFO, "Got CLI command:%s",inbuf); static const char* LIST = "list"; static const char* TERMINATE = "terminate"; diff --git a/daemon/graphite.c b/daemon/graphite.c index f886584af..ba96e2e6a 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -87,23 +87,36 @@ int send_graphite_data() { char data_to_send[8192]; memset(&data_to_send,0,8192); char* ptr = data_to_send; - mutex_lock(&cm->totalstats_lock); - - rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)g_now); ptr += rc; - rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)g_now); ptr += rc; - - ZERO(cm->totalstats_interval); - - mutex_unlock(&cm->totalstats_lock); + struct totalstats ts; + + /* atomically copy values to stack and reset to zero */ + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_timeout_sess); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_silent_timeout_sess); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_regular_term_sess); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_forced_term_sess); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_relayed_packets); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_relayed_errors); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_nopacket_relayed_sess); + atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_oneway_stream_sess); + + mutex_lock(&cm->totalstats_interval.total_average_lock); + ts.total_average_call_dur = cm->totalstats_interval.total_average_call_dur; + ts.total_managed_sess = cm->totalstats_interval.total_managed_sess; + ZERO(ts.total_average_call_dur); + ZERO(ts.total_managed_sess); + mutex_unlock(&cm->totalstats_interval.total_average_lock); + + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) ts.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %lu %llu\n",hostname, ts.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_forced_term_sess),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts.total_managed_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_nopacket_relayed_sess),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_oneway_stream_sess),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.regular_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_regular_term_sess),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_errors "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_errors),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_packets "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_packets),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.silent_timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_silent_timeout_sess),(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_timeout_sess),(unsigned long long)g_now); ptr += rc; rc = write(graphite_sock, data_to_send, strlen(data_to_send)); if (rc<0) { diff --git a/daemon/main.c b/daemon/main.c index e7f433c3f..b7e1aa575 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -30,7 +30,7 @@ -#define REDIS_MODULE_VERSION "redis/6" +#define REDIS_MODULE_VERSION "redis/7" diff --git a/daemon/sdp.c b/daemon/sdp.c index 497adc757..f0aa3030c 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -910,7 +910,7 @@ int sdp_parse(str *body, GQueue *sessions) { return 0; error: - ilog(LOG_WARNING, "Error parsing SDP at offset %li: %s", b - body->s, errstr); + ilog(LOG_WARNING, "Error parsing SDP at offset %li: %s", (long) (b - body->s), errstr); sdp_free(sessions); return -1; }