From 8c2c69f5e6888df7d7e97171caa9dadbc4d8c424 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sat, 4 Aug 2012 13:04:51 +0000 Subject: [PATCH] make the stats collection thread-safe --- daemon/call.c | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index f70114727..90aae9563 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -50,8 +50,10 @@ struct callmaster { u_int16_t lastport; BIT_ARRAY_DECLARE(ports_used, 0x10000); - struct stats statsps; - struct stats stats; + mutex_t statspslock; + struct stats statsps; /* per second stats, running timer */ + mutex_t statslock; + struct stats stats; /* copied from statsps once a second */ struct poller *poller; pcre *info_re; @@ -213,7 +215,9 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ LOG_PARAMS_C(c), r->localport, addr, ntohs(fsin->sin6_port)); r->stats.errors++; mutex_unlock(&cs->lock); + mutex_lock(&m->statspslock); m->statsps.errors++; + mutex_unlock(&m->statspslock); return 0; } @@ -318,7 +322,9 @@ forward: if (ret == -1) { r->stats.errors++; mutex_unlock(&cs->lock); + mutex_lock(&m->statspslock); m->statsps.errors++; + mutex_unlock(&m->statspslock); return -1; } @@ -327,8 +333,10 @@ drop: r->stats.bytes += l; r->last = poller_now; mutex_unlock(&cs->lock); + mutex_lock(&m->statspslock); m->statsps.packets++; m->statsps.bytes += l; + mutex_unlock(&m->statspslock); return 0; } @@ -466,6 +474,7 @@ struct iterator_helper { }; +/* called with callmaster->hashlock held */ static void call_timer_iterator(void *key, void *val, void *ptr) { struct call *c = val; struct iterator_helper *hlp = ptr; @@ -493,6 +502,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { for (j = 0; j < 2; j++) { sr = &p->rtps[j]; hlp->ports[sr->localport] = sr; + obj_hold(cs); check = cm->conf.timeout; if (!sr->peer.port) @@ -575,8 +585,12 @@ next: d = 0; \ else \ d = ke->stats.x - sr->kstats.x; \ + mutex_lock(&cs->lock); \ sr->stats.x += d; \ + mutex_unlock(&cs->lock); \ + mutex_lock(&m->statspslock); \ m->statsps.x += d; \ + mutex_unlock(&m->statspslock); \ } while (0) static void callmaster_timer(void *ptr) { struct callmaster *m = ptr; @@ -586,6 +600,8 @@ static void callmaster_timer(void *ptr) { struct mediaproxy_list_entry *ke; struct streamrelay *sr; u_int64_t d; + struct stats tmpstats; + struct callstream *cs; ZERO(hlp); @@ -593,31 +609,42 @@ static void callmaster_timer(void *ptr) { g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp); rwlock_unlock_r(&m->hashlock); - memcpy(&m->stats, &m->statsps, sizeof(m->stats)); + mutex_lock(&m->statspslock); + memcpy(&tmpstats, &m->statsps, sizeof(tmpstats)); ZERO(m->statsps); + mutex_unlock(&m->statspslock); + mutex_lock(&m->statslock); + memcpy(&m->stats, &tmpstats, sizeof(m->stats)); + mutex_unlock(&m->statslock); i = (m->conf.kernelid != -1) ? kernel_list(m->conf.kernelid) : NULL; while (i) { ke = i->data; + cs = NULL; sr = hlp.ports[ke->target.target_port]; if (!sr) goto next; + cs = sr->up->up; DS(packets); DS(bytes); DS(errors); + mutex_lock(&cs->lock); if (ke->stats.packets != sr->kstats.packets) sr->last = poller_now; sr->kstats.packets = ke->stats.packets; sr->kstats.bytes = ke->stats.bytes; sr->kstats.errors = ke->stats.errors; + mutex_unlock(&cs->lock); next: g_slice_free1(sizeof(*ke), ke); i = g_list_delete_link(i, i); + if (cs) + obj_put(cs); } if (m->conf.b2b_url) @@ -1793,12 +1820,18 @@ next: } void calls_status(struct callmaster *m, struct control_stream *s) { + struct stats st; + + mutex_lock(&m->statslock); + st = m->stats; + mutex_unlock(&m->statslock); + rwlock_lock_r(&m->hashlock); control_stream_printf(s, "proxy %u %llu/%llu/%llu\n", g_hash_table_size(m->callhash), - (long long unsigned int) m->stats.bytes, - (long long unsigned int) m->stats.bytes - m->stats.errors, - (long long unsigned int) m->stats.bytes * 2 - m->stats.errors); + (long long unsigned int) st.bytes, + (long long unsigned int) st.bytes - st.errors, + (long long unsigned int) st.bytes * 2 - st.errors); g_hash_table_foreach(m->callhash, call_status_iterator, s); rwlock_unlock_r(&m->hashlock);