Browse Source

use atomic ops for stats

pull/81/head
Richard Fuchs 11 years ago
parent
commit
411a888f9b
5 changed files with 115 additions and 86 deletions
  1. +42
    -0
      daemon/aux.h
  2. +49
    -55
      daemon/call.c
  3. +7
    -8
      daemon/call.h
  4. +13
    -19
      daemon/call_interfaces.c
  5. +4
    -4
      daemon/cli.c

+ 42
- 0
daemon/aux.h View File

@ -510,4 +510,46 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) {
unsigned int in6_addr_hash(const void *p);
int in6_addr_eq(const void *a, const void *b);
#if GLIB_SIZEOF_VOID_P >= 8
typedef struct {
void *p;
} atomic_uint64;
INLINE u_int64_t atomic_uint64_get(const atomic_uint64 *u) {
return (u_int64_t) g_atomic_pointer_get(&u->p);
}
INLINE u_int64_t atomic_uint64_get_na(const atomic_uint64 *u) {
return (u_int64_t) u->p;
}
INLINE void atomic_uint64_set(atomic_uint64 *u, u_int64_t a) {
g_atomic_pointer_set(&u->p, (void *) a);
}
INLINE void atomic_uint64_set_na(atomic_uint64 *u, u_int64_t a) {
u->p = (void *) a;
}
INLINE void atomic_uint64_inc(atomic_uint64 *u) {
g_atomic_pointer_add(&u->p, 1);
}
INLINE void atomic_uint64_add(atomic_uint64 *u, u_int64_t a) {
g_atomic_pointer_add(&u->p, a);
}
INLINE void atomic_uint64_add_na(atomic_uint64 *u, u_int64_t a) {
u->p = (void *) (((u_int64_t) u->p) + a);
}
INLINE u_int64_t atomic_uint64_get_set(atomic_uint64 *u, u_int64_t a) {
u_int64_t old;
do {
old = atomic_uint64_get(u);
if (g_atomic_pointer_compare_and_exchange(&u->p, (void *) old, (void *) a))
return old;
} while (1);
}
#endif
#endif

+ 49
- 55
daemon/call.c View File

@ -687,12 +687,9 @@ loop_ok:
if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) {
ilog(LOG_WARNING, "RTP packet from %s discarded", addr);
mutex_lock(&stream->in_lock);
stream->stats.errors++;
mutex_lock(&cm->statspslock);
cm->statsps.errors++;
mutex_unlock(&cm->statspslock);
goto done;
atomic_uint64_inc(&stream->stats.errors);
atomic_uint64_inc(&cm->statsps.errors);
goto unlock_out;
}
mutex_lock(&in_srtp->in_lock);
@ -755,7 +752,7 @@ use_cand:
mutex_unlock(&stream->out_lock);
if (tmp && PS_ISSET(stream, STRICT_SOURCE)) {
stream->stats.errors++;
atomic_uint64_inc(&stream->stats.errors);
goto drop;
}
}
@ -844,10 +841,8 @@ forward:
if (ret == -1) {
ret = -errno;
ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno));
stream->stats.errors++;
mutex_lock(&cm->statspslock);
cm->statsps.errors++;
mutex_unlock(&cm->statspslock);
atomic_uint64_inc(&stream->stats.errors);
atomic_uint64_inc(&cm->statsps.errors);
goto out;
}
@ -857,13 +852,11 @@ drop:
if (sink)
mutex_unlock(&sink->out_lock);
ret = 0;
stream->stats.packets++;
stream->stats.bytes += s->len;
stream->last_packet = poller_now;
mutex_lock(&cm->statspslock);
cm->statsps.packets++;
cm->statsps.bytes += s->len;
mutex_unlock(&cm->statspslock);
atomic_uint64_inc(&stream->stats.packets);
atomic_uint64_add(&stream->stats.bytes, s->len);
atomic_uint64_set(&stream->last_packet, poller_now);
atomic_uint64_inc(&cm->statsps.packets);
atomic_uint64_add(&cm->statsps.bytes, s->len);
out:
if (ret == 0 && update)
@ -1078,7 +1071,7 @@ no_sfd:
tmp_t_reason = 2;
}
if (poller_now - ps->last_packet < check)
if (poller_now - atomic_uint64_get(&ps->last_packet) < check)
good = 1;
next:
@ -1288,16 +1281,14 @@ destroy:
#define DS(x) do { \
mutex_lock(&ps->in_lock); \
if (ke->stats.x < ps->kernel_stats.x) \
u_int64_t ks_val, d; \
ks_val = atomic_uint64_get(&ps->kernel_stats.x); \
if (ke->stats.x < ks_val) \
d = 0; \
else \
d = ke->stats.x - ps->kernel_stats.x; \
ps->stats.x += d; \
mutex_unlock(&ps->in_lock); \
mutex_lock(&m->statspslock); \
m->statsps.x += d; \
mutex_unlock(&m->statspslock); \
d = ke->stats.x - ks_val; \
atomic_uint64_add(&ps->stats.x, d); \
atomic_uint64_add(&m->statsps.x, d); \
} while (0)
static void callmaster_timer(void *ptr) {
struct callmaster *m = ptr;
@ -1305,7 +1296,6 @@ static void callmaster_timer(void *ptr) {
GList *i;
struct rtpengine_list_entry *ke;
struct packet_stream *ps, *sink;
u_int64_t d;
struct stats tmpstats;
int j, update;
struct stream_fd *sfd;
@ -1316,13 +1306,13 @@ static void callmaster_timer(void *ptr) {
g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp);
rwlock_unlock_r(&m->hashlock);
mutex_lock(&m->statspslock);
memcpy(&tmpstats, &m->statsps, sizeof(tmpstats));
ZERO(m->statsps);
mutex_unlock(&m->statspslock);
mutex_lock(&m->statslock);
memcpy(&m->stats, &tmpstats, sizeof(m->stats));
mutex_unlock(&m->statslock);
atomic_uint64_set_na(&tmpstats.bytes, atomic_uint64_get_set(&m->statsps.bytes, 0));
atomic_uint64_set_na(&tmpstats.packets, atomic_uint64_get_set(&m->statsps.packets, 0));
atomic_uint64_set_na(&tmpstats.errors, atomic_uint64_get_set(&m->statsps.errors, 0));
atomic_uint64_set(&m->stats.bytes, atomic_uint64_get_na(&tmpstats.bytes));
atomic_uint64_set(&m->stats.packets, atomic_uint64_get_na(&tmpstats.packets));
atomic_uint64_set(&m->stats.errors, atomic_uint64_get_na(&tmpstats.errors));
i = (m->conf.kernelid >= 0) ? kernel_list(m->conf.kernelid) : NULL;
while (i) {
@ -1346,12 +1336,12 @@ static void callmaster_timer(void *ptr) {
mutex_lock(&ps->in_lock);
if (ke->stats.packets != ps->kernel_stats.packets)
ps->last_packet = poller_now;
if (ke->stats.packets != atomic_uint64_get(&ps->kernel_stats.packets))
atomic_uint64_set(&ps->last_packet, poller_now);
ps->kernel_stats.packets = ke->stats.packets;
ps->kernel_stats.bytes = ke->stats.bytes;
ps->kernel_stats.errors = ke->stats.errors;
atomic_uint64_set(&ps->kernel_stats.bytes, ke->stats.bytes);
atomic_uint64_set(&ps->kernel_stats.packets, ke->stats.packets);
atomic_uint64_set(&ps->kernel_stats.errors, ke->stats.errors);
update = 0;
@ -1764,7 +1754,7 @@ struct packet_stream *__packet_stream_new(struct call *call) {
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
stream->call = call;
stream->last_packet = poller_now;
atomic_uint64_set_na(&stream->last_packet, poller_now);
call->streams = g_slist_prepend(call->streams, stream);
return stream;
@ -2498,10 +2488,14 @@ void call_destroy(struct call *c) {
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
cdrlinecnt, md->index, protocol,
(unsigned long long) atomic_uint64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
(unsigned long long) atomic_uint64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
(unsigned long long) atomic_uint64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
(unsigned long long) atomic_uint64_get(&ps->last_packet));
}
ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, "
@ -2510,16 +2504,16 @@ void call_destroy(struct call *c) {
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
addr, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
(unsigned long long) atomic_uint64_get(&ps->stats.packets),
(unsigned long long) atomic_uint64_get(&ps->stats.bytes),
(unsigned long long) atomic_uint64_get(&ps->stats.errors),
(unsigned long long) atomic_uint64_get(&ps->last_packet));
mutex_lock(&m->totalstats_lock);
m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors;
m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors;
m->totalstats.total_relayed_packets += atomic_uint64_get(&ps->stats.packets);
m->totalstats_interval.total_relayed_packets += atomic_uint64_get(&ps->stats.packets);
m->totalstats.total_relayed_errors += atomic_uint64_get(&ps->stats.errors);
m->totalstats_interval.total_relayed_errors += atomic_uint64_get(&ps->stats.errors);
mutex_unlock(&m->totalstats_lock);
}
}
@ -2569,9 +2563,9 @@ void call_destroy(struct call *c) {
}
}
if (ps && ps2 && ps2->stats.packets==0) {
if (ps && ps2 && atomic_uint64_get(&ps2->stats.packets)==0) {
mutex_lock(&m->totalstats_lock);
if (ps->stats.packets!=0) {
if (atomic_uint64_get(&ps->stats.packets)!=0) {
m->totalstats.total_oneway_stream_sess++;
m->totalstats_interval.total_oneway_stream_sess++;
}


+ 7
- 8
daemon/call.h View File

@ -12,6 +12,7 @@
#include <openssl/x509.h>
#include "compat.h"
#include "control_ng.h"
#include "aux.h"
enum termination_reason {
UNKNOWN=0,
@ -187,9 +188,9 @@ extern const struct transport_protocol transport_protocols[];
struct stats {
u_int64_t packets;
u_int64_t bytes;
u_int64_t errors;
atomic_uint64 packets;
atomic_uint64 bytes;
atomic_uint64 errors;
};
struct totalstats {
@ -268,9 +269,9 @@ struct packet_stream {
struct endpoint advertised_endpoint; /* RO */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct stats stats; /* LOCK: in_lock */
struct stats kernel_stats; /* LOCK: in_lock */
time_t last_packet; /* LOCK: in_lock */
struct stats stats;
struct stats kernel_stats;
atomic_uint64 last_packet;
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
@ -406,9 +407,7 @@ struct callmaster {
BIT_ARRAY_DECLARE(ports_used, 0x10000);
/* XXX rework these */
mutex_t statspslock;
struct stats statsps; /* per second stats, running timer */
mutex_t statslock;
struct stats stats; /* copied from statsps once a second */
mutex_t totalstats_lock; /* for both of them */
struct totalstats totalstats;


+ 13
- 19
daemon/call_interfaces.c View File

@ -386,8 +386,8 @@ str *call_query_udp(char **out, struct callmaster *m) {
ret = str_sprintf("%s %lld "UINT64F" "UINT64F" "UINT64F" "UINT64F"\n", out[RE_UDP_COOKIE],
(long long int) m->conf.silent_timeout - (poller_now - stats.last_packet),
stats.totals[0].packets, stats.totals[1].packets,
stats.totals[2].packets, stats.totals[3].packets);
atomic_uint64_get_na(&stats.totals[0].packets), atomic_uint64_get_na(&stats.totals[1].packets),
atomic_uint64_get_na(&stats.totals[2].packets), atomic_uint64_get_na(&stats.totals[3].packets));
goto out;
err:
@ -431,20 +431,14 @@ static void call_status_iterator(struct call *c, struct control_stream *s) {
}
void calls_status_tcp(struct callmaster *m, struct control_stream *s) {
struct stats st;
GQueue q = G_QUEUE_INIT;
struct call *c;
mutex_lock(&m->statslock);
st = m->stats;
mutex_unlock(&m->statslock);
callmaster_get_all_calls(m, &q);
control_stream_printf(s, "proxy %u "UINT64F"/"UINT64F"/"UINT64F"\n",
control_stream_printf(s, "proxy %u "UINT64F"/%i/%i\n",
g_queue_get_length(&q),
st.bytes, st.bytes - st.errors,
st.bytes * 2 - st.errors);
atomic_uint64_get(&m->stats.bytes), 0, 0);
while (q.head) {
c = g_queue_pop_head(&q);
@ -726,14 +720,14 @@ const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_
}
static void ng_stats(bencode_item_t *d, const struct stats *s, struct stats *totals) {
bencode_dictionary_add_integer(d, "packets", s->packets);
bencode_dictionary_add_integer(d, "bytes", s->bytes);
bencode_dictionary_add_integer(d, "errors", s->errors);
bencode_dictionary_add_integer(d, "packets", atomic_uint64_get(&s->packets));
bencode_dictionary_add_integer(d, "bytes", atomic_uint64_get(&s->bytes));
bencode_dictionary_add_integer(d, "errors", atomic_uint64_get(&s->errors));
if (!totals)
return;
totals->packets += s->packets;
totals->bytes += s->bytes;
totals->errors += s->errors;
atomic_uint64_add_na(&totals->packets, atomic_uint64_get(&s->packets));
atomic_uint64_add_na(&totals->bytes, atomic_uint64_get(&s->bytes));
atomic_uint64_add_na(&totals->errors, atomic_uint64_get(&s->errors));
}
static void ng_stats_endpoint(bencode_item_t *dict, const struct endpoint *ep) {
@ -772,7 +766,7 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
if (ps->crypto.params.crypto_suite)
bencode_dictionary_add_string(dict, "crypto suite",
ps->crypto.params.crypto_suite->name);
bencode_dictionary_add_integer(dict, "last packet", ps->last_packet);
bencode_dictionary_add_integer(dict, "last packet", atomic_uint64_get(&ps->last_packet));
flags = bencode_dictionary_add_list(dict, "flags");
@ -788,8 +782,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
BF_PS("media handover", MEDIA_HANDOVER);
stats:
if (totals->last_packet < ps->last_packet)
totals->last_packet = ps->last_packet;
if (totals->last_packet < atomic_uint64_get(&ps->last_packet))
totals->last_packet = atomic_uint64_get(&ps->last_packet);
/* XXX distinguish between input and output */
s = &totals->totals[0];


+ 4
- 4
daemon/cli.c View File

@ -153,10 +153,10 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
(unsigned long long) atomic_uint64_get(&ps->stats.packets),
(unsigned long long) atomic_uint64_get(&ps->stats.bytes),
(unsigned long long) atomic_uint64_get(&ps->stats.errors),
(unsigned long long) atomic_uint64_get(&ps->last_packet));
ADJUSTLEN(printlen,outbufend,replybuffer);
}
}


Loading…
Cancel
Save