diff --git a/daemon/aux.h b/daemon/aux.h index 96e7df160..3552393da 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -510,4 +510,46 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) { unsigned int in6_addr_hash(const void *p); int in6_addr_eq(const void *a, const void *b); + + +#if GLIB_SIZEOF_VOID_P >= 8 + +typedef struct { + void *p; +} atomic_uint64; + +INLINE u_int64_t atomic_uint64_get(const atomic_uint64 *u) { + return (u_int64_t) g_atomic_pointer_get(&u->p); +} +INLINE u_int64_t atomic_uint64_get_na(const atomic_uint64 *u) { + return (u_int64_t) u->p; +} +INLINE void atomic_uint64_set(atomic_uint64 *u, u_int64_t a) { + g_atomic_pointer_set(&u->p, (void *) a); +} +INLINE void atomic_uint64_set_na(atomic_uint64 *u, u_int64_t a) { + u->p = (void *) a; +} +INLINE void atomic_uint64_inc(atomic_uint64 *u) { + g_atomic_pointer_add(&u->p, 1); +} +INLINE void atomic_uint64_add(atomic_uint64 *u, u_int64_t a) { + g_atomic_pointer_add(&u->p, a); +} +INLINE void atomic_uint64_add_na(atomic_uint64 *u, u_int64_t a) { + u->p = (void *) (((u_int64_t) u->p) + a); +} +INLINE u_int64_t atomic_uint64_get_set(atomic_uint64 *u, u_int64_t a) { + u_int64_t old; + do { + old = atomic_uint64_get(u); + if (g_atomic_pointer_compare_and_exchange(&u->p, (void *) old, (void *) a)) + return old; + } while (1); +} + +#endif + + + #endif diff --git a/daemon/call.c b/daemon/call.c index 6492a0abb..990f45d60 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -687,12 +687,9 @@ loop_ok: if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) { ilog(LOG_WARNING, "RTP packet from %s discarded", addr); - mutex_lock(&stream->in_lock); - stream->stats.errors++; - mutex_lock(&cm->statspslock); - cm->statsps.errors++; - mutex_unlock(&cm->statspslock); - goto done; + atomic_uint64_inc(&stream->stats.errors); + atomic_uint64_inc(&cm->statsps.errors); + goto unlock_out; } mutex_lock(&in_srtp->in_lock); @@ -755,7 +752,7 @@ use_cand: mutex_unlock(&stream->out_lock); if (tmp && PS_ISSET(stream, STRICT_SOURCE)) { - stream->stats.errors++; + atomic_uint64_inc(&stream->stats.errors); goto drop; } } @@ -844,10 +841,8 @@ forward: if (ret == -1) { ret = -errno; ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); - stream->stats.errors++; - mutex_lock(&cm->statspslock); - cm->statsps.errors++; - mutex_unlock(&cm->statspslock); + atomic_uint64_inc(&stream->stats.errors); + atomic_uint64_inc(&cm->statsps.errors); goto out; } @@ -857,13 +852,11 @@ drop: if (sink) mutex_unlock(&sink->out_lock); ret = 0; - stream->stats.packets++; - stream->stats.bytes += s->len; - stream->last_packet = poller_now; - mutex_lock(&cm->statspslock); - cm->statsps.packets++; - cm->statsps.bytes += s->len; - mutex_unlock(&cm->statspslock); + atomic_uint64_inc(&stream->stats.packets); + atomic_uint64_add(&stream->stats.bytes, s->len); + atomic_uint64_set(&stream->last_packet, poller_now); + atomic_uint64_inc(&cm->statsps.packets); + atomic_uint64_add(&cm->statsps.bytes, s->len); out: if (ret == 0 && update) @@ -1078,7 +1071,7 @@ no_sfd: tmp_t_reason = 2; } - if (poller_now - ps->last_packet < check) + if (poller_now - atomic_uint64_get(&ps->last_packet) < check) good = 1; next: @@ -1288,16 +1281,14 @@ destroy: #define DS(x) do { \ - mutex_lock(&ps->in_lock); \ - if (ke->stats.x < ps->kernel_stats.x) \ + u_int64_t ks_val, d; \ + ks_val = atomic_uint64_get(&ps->kernel_stats.x); \ + if (ke->stats.x < ks_val) \ d = 0; \ else \ - d = ke->stats.x - ps->kernel_stats.x; \ - ps->stats.x += d; \ - mutex_unlock(&ps->in_lock); \ - mutex_lock(&m->statspslock); \ - m->statsps.x += d; \ - mutex_unlock(&m->statspslock); \ + d = ke->stats.x - ks_val; \ + atomic_uint64_add(&ps->stats.x, d); \ + atomic_uint64_add(&m->statsps.x, d); \ } while (0) static void callmaster_timer(void *ptr) { struct callmaster *m = ptr; @@ -1305,7 +1296,6 @@ static void callmaster_timer(void *ptr) { GList *i; struct rtpengine_list_entry *ke; struct packet_stream *ps, *sink; - u_int64_t d; struct stats tmpstats; int j, update; struct stream_fd *sfd; @@ -1316,13 +1306,13 @@ static void callmaster_timer(void *ptr) { g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp); rwlock_unlock_r(&m->hashlock); - mutex_lock(&m->statspslock); - memcpy(&tmpstats, &m->statsps, sizeof(tmpstats)); - ZERO(m->statsps); - mutex_unlock(&m->statspslock); - mutex_lock(&m->statslock); - memcpy(&m->stats, &tmpstats, sizeof(m->stats)); - mutex_unlock(&m->statslock); + atomic_uint64_set_na(&tmpstats.bytes, atomic_uint64_get_set(&m->statsps.bytes, 0)); + atomic_uint64_set_na(&tmpstats.packets, atomic_uint64_get_set(&m->statsps.packets, 0)); + atomic_uint64_set_na(&tmpstats.errors, atomic_uint64_get_set(&m->statsps.errors, 0)); + + atomic_uint64_set(&m->stats.bytes, atomic_uint64_get_na(&tmpstats.bytes)); + atomic_uint64_set(&m->stats.packets, atomic_uint64_get_na(&tmpstats.packets)); + atomic_uint64_set(&m->stats.errors, atomic_uint64_get_na(&tmpstats.errors)); i = (m->conf.kernelid >= 0) ? kernel_list(m->conf.kernelid) : NULL; while (i) { @@ -1346,12 +1336,12 @@ static void callmaster_timer(void *ptr) { mutex_lock(&ps->in_lock); - if (ke->stats.packets != ps->kernel_stats.packets) - ps->last_packet = poller_now; + if (ke->stats.packets != atomic_uint64_get(&ps->kernel_stats.packets)) + atomic_uint64_set(&ps->last_packet, poller_now); - ps->kernel_stats.packets = ke->stats.packets; - ps->kernel_stats.bytes = ke->stats.bytes; - ps->kernel_stats.errors = ke->stats.errors; + atomic_uint64_set(&ps->kernel_stats.bytes, ke->stats.bytes); + atomic_uint64_set(&ps->kernel_stats.packets, ke->stats.packets); + atomic_uint64_set(&ps->kernel_stats.errors, ke->stats.errors); update = 0; @@ -1764,7 +1754,7 @@ struct packet_stream *__packet_stream_new(struct call *call) { mutex_init(&stream->in_lock); mutex_init(&stream->out_lock); stream->call = call; - stream->last_packet = poller_now; + atomic_uint64_set_na(&stream->last_packet, poller_now); call->streams = g_slist_prepend(call->streams, stream); return stream; @@ -2498,10 +2488,14 @@ void call_destroy(struct call *c) { cdrlinecnt, md->index, protocol, addr, cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet); + cdrlinecnt, md->index, protocol, + (unsigned long long) atomic_uint64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + (unsigned long long) atomic_uint64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + (unsigned long long) atomic_uint64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + (unsigned long long) atomic_uint64_get(&ps->last_packet)); } ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " @@ -2510,16 +2504,16 @@ void call_destroy(struct call *c) { (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), addr, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - (unsigned long long) ps->stats.packets, - (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors, - (unsigned long long) ps->last_packet); + (unsigned long long) atomic_uint64_get(&ps->stats.packets), + (unsigned long long) atomic_uint64_get(&ps->stats.bytes), + (unsigned long long) atomic_uint64_get(&ps->stats.errors), + (unsigned long long) atomic_uint64_get(&ps->last_packet)); mutex_lock(&m->totalstats_lock); - m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; - m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets; - m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; - m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors; + m->totalstats.total_relayed_packets += atomic_uint64_get(&ps->stats.packets); + m->totalstats_interval.total_relayed_packets += atomic_uint64_get(&ps->stats.packets); + m->totalstats.total_relayed_errors += atomic_uint64_get(&ps->stats.errors); + m->totalstats_interval.total_relayed_errors += atomic_uint64_get(&ps->stats.errors); mutex_unlock(&m->totalstats_lock); } } @@ -2569,9 +2563,9 @@ void call_destroy(struct call *c) { } } - if (ps && ps2 && ps2->stats.packets==0) { + if (ps && ps2 && atomic_uint64_get(&ps2->stats.packets)==0) { mutex_lock(&m->totalstats_lock); - if (ps->stats.packets!=0) { + if (atomic_uint64_get(&ps->stats.packets)!=0) { m->totalstats.total_oneway_stream_sess++; m->totalstats_interval.total_oneway_stream_sess++; } diff --git a/daemon/call.h b/daemon/call.h index 8be512487..b1954b88c 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -12,6 +12,7 @@ #include #include "compat.h" #include "control_ng.h" +#include "aux.h" enum termination_reason { UNKNOWN=0, @@ -187,9 +188,9 @@ extern const struct transport_protocol transport_protocols[]; struct stats { - u_int64_t packets; - u_int64_t bytes; - u_int64_t errors; + atomic_uint64 packets; + atomic_uint64 bytes; + atomic_uint64 errors; }; struct totalstats { @@ -268,9 +269,9 @@ struct packet_stream { struct endpoint advertised_endpoint; /* RO */ struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct stats stats; /* LOCK: in_lock */ - struct stats kernel_stats; /* LOCK: in_lock */ - time_t last_packet; /* LOCK: in_lock */ + struct stats stats; + struct stats kernel_stats; + atomic_uint64 last_packet; #if RTP_LOOP_PROTECT /* LOCK: in_lock: */ @@ -406,9 +407,7 @@ struct callmaster { BIT_ARRAY_DECLARE(ports_used, 0x10000); /* XXX rework these */ - mutex_t statspslock; struct stats statsps; /* per second stats, running timer */ - mutex_t statslock; struct stats stats; /* copied from statsps once a second */ mutex_t totalstats_lock; /* for both of them */ struct totalstats totalstats; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 67ce881f5..651b80a71 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -386,8 +386,8 @@ str *call_query_udp(char **out, struct callmaster *m) { ret = str_sprintf("%s %lld "UINT64F" "UINT64F" "UINT64F" "UINT64F"\n", out[RE_UDP_COOKIE], (long long int) m->conf.silent_timeout - (poller_now - stats.last_packet), - stats.totals[0].packets, stats.totals[1].packets, - stats.totals[2].packets, stats.totals[3].packets); + atomic_uint64_get_na(&stats.totals[0].packets), atomic_uint64_get_na(&stats.totals[1].packets), + atomic_uint64_get_na(&stats.totals[2].packets), atomic_uint64_get_na(&stats.totals[3].packets)); goto out; err: @@ -431,20 +431,14 @@ static void call_status_iterator(struct call *c, struct control_stream *s) { } void calls_status_tcp(struct callmaster *m, struct control_stream *s) { - struct stats st; GQueue q = G_QUEUE_INIT; struct call *c; - mutex_lock(&m->statslock); - st = m->stats; - mutex_unlock(&m->statslock); - callmaster_get_all_calls(m, &q); - control_stream_printf(s, "proxy %u "UINT64F"/"UINT64F"/"UINT64F"\n", + control_stream_printf(s, "proxy %u "UINT64F"/%i/%i\n", g_queue_get_length(&q), - st.bytes, st.bytes - st.errors, - st.bytes * 2 - st.errors); + atomic_uint64_get(&m->stats.bytes), 0, 0); while (q.head) { c = g_queue_pop_head(&q); @@ -726,14 +720,14 @@ const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_ } static void ng_stats(bencode_item_t *d, const struct stats *s, struct stats *totals) { - bencode_dictionary_add_integer(d, "packets", s->packets); - bencode_dictionary_add_integer(d, "bytes", s->bytes); - bencode_dictionary_add_integer(d, "errors", s->errors); + bencode_dictionary_add_integer(d, "packets", atomic_uint64_get(&s->packets)); + bencode_dictionary_add_integer(d, "bytes", atomic_uint64_get(&s->bytes)); + bencode_dictionary_add_integer(d, "errors", atomic_uint64_get(&s->errors)); if (!totals) return; - totals->packets += s->packets; - totals->bytes += s->bytes; - totals->errors += s->errors; + atomic_uint64_add_na(&totals->packets, atomic_uint64_get(&s->packets)); + atomic_uint64_add_na(&totals->bytes, atomic_uint64_get(&s->bytes)); + atomic_uint64_add_na(&totals->errors, atomic_uint64_get(&s->errors)); } static void ng_stats_endpoint(bencode_item_t *dict, const struct endpoint *ep) { @@ -772,7 +766,7 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps if (ps->crypto.params.crypto_suite) bencode_dictionary_add_string(dict, "crypto suite", ps->crypto.params.crypto_suite->name); - bencode_dictionary_add_integer(dict, "last packet", ps->last_packet); + bencode_dictionary_add_integer(dict, "last packet", atomic_uint64_get(&ps->last_packet)); flags = bencode_dictionary_add_list(dict, "flags"); @@ -788,8 +782,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps BF_PS("media handover", MEDIA_HANDOVER); stats: - if (totals->last_packet < ps->last_packet) - totals->last_packet = ps->last_packet; + if (totals->last_packet < atomic_uint64_get(&ps->last_packet)) + totals->last_packet = atomic_uint64_get(&ps->last_packet); /* XXX distinguish between input and output */ s = &totals->totals[0]; diff --git a/daemon/cli.c b/daemon/cli.c index e6e4300ab..7941f0193 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -153,10 +153,10 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - (unsigned long long) ps->stats.packets, - (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors, - (unsigned long long) ps->last_packet); + (unsigned long long) atomic_uint64_get(&ps->stats.packets), + (unsigned long long) atomic_uint64_get(&ps->stats.bytes), + (unsigned long long) atomic_uint64_get(&ps->stats.errors), + (unsigned long long) atomic_uint64_get(&ps->last_packet)); ADJUSTLEN(printlen,outbufend,replybuffer); } }