Browse Source

Merge branch 'rfuchs/atomic-stats'

pull/81/head
Richard Fuchs 11 years ago
parent
commit
c22cc161ed
9 changed files with 309 additions and 197 deletions
  1. +4
    -0
      daemon/aux.c
  2. +95
    -0
      daemon/aux.h
  3. +121
    -118
      daemon/call.c
  4. +19
    -19
      daemon/call.h
  5. +13
    -19
      daemon/call_interfaces.c
  6. +25
    -22
      daemon/cli.c
  7. +30
    -17
      daemon/graphite.c
  8. +1
    -1
      daemon/main.c
  9. +1
    -1
      daemon/sdp.c

+ 4
- 0
daemon/aux.c View File

@ -34,6 +34,10 @@ static cond_t threads_cond = COND_STATIC_INIT;
static struct thread_buf __thread t_bufs[NUM_THREAD_BUFS];
static int __thread t_buf_idx;
#ifdef NEED_ATOMIC64_MUTEX
mutex_t __atomic64_mutex = MUTEX_STATIC_INIT;
#endif
GList *g_list_link(GList *list, GList *el) {


+ 95
- 0
daemon/aux.h View File

@ -510,4 +510,99 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) {
unsigned int in6_addr_hash(const void *p);
int in6_addr_eq(const void *a, const void *b);
#if GLIB_SIZEOF_VOID_P >= 8
typedef struct {
void *p;
} atomic64;
INLINE u_int64_t atomic64_get(const atomic64 *u) {
return (u_int64_t) g_atomic_pointer_get(&u->p);
}
INLINE u_int64_t atomic64_get_na(const atomic64 *u) {
return (u_int64_t) u->p;
}
INLINE void atomic64_set(atomic64 *u, u_int64_t a) {
g_atomic_pointer_set(&u->p, (void *) a);
}
INLINE void atomic64_set_na(atomic64 *u, u_int64_t a) {
u->p = (void *) a;
}
INLINE void atomic64_add(atomic64 *u, u_int64_t a) {
g_atomic_pointer_add(&u->p, a);
}
INLINE void atomic64_add_na(atomic64 *u, u_int64_t a) {
u->p = (void *) (((u_int64_t) u->p) + a);
}
INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) {
u_int64_t old;
do {
old = atomic64_get(u);
if (g_atomic_pointer_compare_and_exchange(&u->p, (void *) old, (void *) a))
return old;
} while (1);
}
#else
/* Simulate atomic u64 with a global mutex on non-64-bit platforms.
* Bad performance possible, thus not recommended. */
typedef struct {
u_int64_t u;
} atomic64;
#define NEED_ATOMIC64_MUTEX
extern mutex_t __atomic64_mutex;
INLINE u_int64_t atomic64_get(const atomic64 *u) {
u_int64_t ret;
mutex_lock(&__atomic64_mutex);
ret = u->u;
mutex_unlock(&__atomic64_mutex);
return ret;
}
INLINE u_int64_t atomic64_get_na(const atomic64 *u) {
return u->u;
}
INLINE void atomic64_set(atomic64 *u, u_int64_t a) {
mutex_lock(&__atomic64_mutex);
u->u = a;
mutex_unlock(&__atomic64_mutex);
}
INLINE void atomic64_set_na(atomic64 *u, u_int64_t a) {
u->u = a;
}
INLINE void atomic64_add(atomic64 *u, u_int64_t a) {
mutex_lock(&__atomic64_mutex);
u->u += a;
mutex_unlock(&__atomic64_mutex);
}
INLINE void atomic64_add_na(atomic64 *u, u_int64_t a) {
u->u += a;
}
INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) {
u_int64_t old;
mutex_lock(&__atomic64_mutex);
old = u->u;
u->u = a;
mutex_unlock(&__atomic64_mutex);
return old;
}
#endif
INLINE void atomic64_inc(atomic64 *u) {
atomic64_add(u, 1);
}
INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) {
atomic64_set_na(dst, atomic64_get_set(src, 0));
}
#define atomic64_local_copy_zero_struct(d, s, member) \
atomic64_local_copy_zero(&((d)->member), &((s)->member))
#endif

+ 121
- 118
daemon/call.c View File

@ -687,12 +687,9 @@ loop_ok:
if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) {
ilog(LOG_WARNING, "RTP packet from %s discarded", addr);
mutex_lock(&stream->in_lock);
stream->stats.errors++;
mutex_lock(&cm->statspslock);
cm->statsps.errors++;
mutex_unlock(&cm->statspslock);
goto done;
atomic64_inc(&stream->stats.errors);
atomic64_inc(&cm->statsps.errors);
goto unlock_out;
}
mutex_lock(&in_srtp->in_lock);
@ -755,7 +752,7 @@ use_cand:
mutex_unlock(&stream->out_lock);
if (tmp && PS_ISSET(stream, STRICT_SOURCE)) {
stream->stats.errors++;
atomic64_inc(&stream->stats.errors);
goto drop;
}
}
@ -844,10 +841,8 @@ forward:
if (ret == -1) {
ret = -errno;
ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno));
stream->stats.errors++;
mutex_lock(&cm->statspslock);
cm->statsps.errors++;
mutex_unlock(&cm->statspslock);
atomic64_inc(&stream->stats.errors);
atomic64_inc(&cm->statsps.errors);
goto out;
}
@ -857,13 +852,11 @@ drop:
if (sink)
mutex_unlock(&sink->out_lock);
ret = 0;
stream->stats.packets++;
stream->stats.bytes += s->len;
stream->last_packet = poller_now;
mutex_lock(&cm->statspslock);
cm->statsps.packets++;
cm->statsps.bytes += s->len;
mutex_unlock(&cm->statspslock);
atomic64_inc(&stream->stats.packets);
atomic64_add(&stream->stats.bytes, s->len);
atomic64_set(&stream->last_packet, poller_now);
atomic64_inc(&cm->statsps.packets);
atomic64_add(&cm->statsps.bytes, s->len);
out:
if (ret == 0 && update)
@ -1078,7 +1071,7 @@ no_sfd:
tmp_t_reason = 2;
}
if (poller_now - ps->last_packet < check)
if (poller_now - atomic64_get(&ps->last_packet) < check)
good = 1;
next:
@ -1093,7 +1086,6 @@ next:
for (i = c->monologues; i; i = i->next) {
ml = i->data;
memset(&ml->terminated,0,sizeof(struct timeval));
gettimeofday(&(ml->terminated),NULL);
if (tmp_t_reason==1) {
ml->term_reason = TIMEOUT;
@ -1288,16 +1280,14 @@ destroy:
#define DS(x) do { \
mutex_lock(&ps->in_lock); \
if (ke->stats.x < ps->kernel_stats.x) \
u_int64_t ks_val, d; \
ks_val = atomic64_get(&ps->kernel_stats.x); \
if (ke->stats.x < ks_val) \
d = 0; \
else \
d = ke->stats.x - ps->kernel_stats.x; \
ps->stats.x += d; \
mutex_unlock(&ps->in_lock); \
mutex_lock(&m->statspslock); \
m->statsps.x += d; \
mutex_unlock(&m->statspslock); \
d = ke->stats.x - ks_val; \
atomic64_add(&ps->stats.x, d); \
atomic64_add(&m->statsps.x, d); \
} while (0)
static void callmaster_timer(void *ptr) {
struct callmaster *m = ptr;
@ -1305,7 +1295,6 @@ static void callmaster_timer(void *ptr) {
GList *i;
struct rtpengine_list_entry *ke;
struct packet_stream *ps, *sink;
u_int64_t d;
struct stats tmpstats;
int j, update;
struct stream_fd *sfd;
@ -1316,13 +1305,13 @@ static void callmaster_timer(void *ptr) {
g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp);
rwlock_unlock_r(&m->hashlock);
mutex_lock(&m->statspslock);
memcpy(&tmpstats, &m->statsps, sizeof(tmpstats));
ZERO(m->statsps);
mutex_unlock(&m->statspslock);
mutex_lock(&m->statslock);
memcpy(&m->stats, &tmpstats, sizeof(m->stats));
mutex_unlock(&m->statslock);
atomic64_local_copy_zero_struct(&tmpstats, &m->statsps, bytes);
atomic64_local_copy_zero_struct(&tmpstats, &m->statsps, packets);
atomic64_local_copy_zero_struct(&tmpstats, &m->statsps, errors);
atomic64_set(&m->stats.bytes, atomic64_get_na(&tmpstats.bytes));
atomic64_set(&m->stats.packets, atomic64_get_na(&tmpstats.packets));
atomic64_set(&m->stats.errors, atomic64_get_na(&tmpstats.errors));
i = (m->conf.kernelid >= 0) ? kernel_list(m->conf.kernelid) : NULL;
while (i) {
@ -1346,12 +1335,12 @@ static void callmaster_timer(void *ptr) {
mutex_lock(&ps->in_lock);
if (ke->stats.packets != ps->kernel_stats.packets)
ps->last_packet = poller_now;
if (ke->stats.packets != atomic64_get(&ps->kernel_stats.packets))
atomic64_set(&ps->last_packet, poller_now);
ps->kernel_stats.packets = ke->stats.packets;
ps->kernel_stats.bytes = ke->stats.bytes;
ps->kernel_stats.errors = ke->stats.errors;
atomic64_set(&ps->kernel_stats.bytes, ke->stats.bytes);
atomic64_set(&ps->kernel_stats.packets, ke->stats.packets);
atomic64_set(&ps->kernel_stats.errors, ke->stats.errors);
update = 0;
@ -1423,7 +1412,8 @@ struct callmaster *callmaster_new(struct poller *p) {
poller_add_timer(p, callmaster_timer, &c->obj);
mutex_init(&c->totalstats_lock);
mutex_init(&c->totalstats.total_average_lock);
mutex_init(&c->totalstats_interval.total_average_lock);
c->totalstats.started = poller_now;
mutex_init(&c->cngs_lock);
@ -1764,7 +1754,7 @@ struct packet_stream *__packet_stream_new(struct call *call) {
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
stream->call = call;
stream->last_packet = poller_now;
atomic64_set_na(&stream->last_packet, poller_now);
call->streams = g_slist_prepend(call->streams, stream);
return stream;
@ -2376,31 +2366,56 @@ static void unkernelize(struct packet_stream *p) {
}
void timeval_subtract (struct timeval *result, const struct timeval *a, const struct timeval *b) {
long microseconds=0;
microseconds = ((long)a->tv_sec - (long)b->tv_sec) * 1000000 + ((long)a->tv_usec - (long)b->tv_usec);
result->tv_sec = microseconds/(long)1000000;
result->tv_usec = microseconds%(long)1000000;
u_int64_t microseconds=0;
microseconds = ((u_int64_t)a->tv_sec - (u_int64_t)b->tv_sec) * 1000000LLU + (a->tv_usec - b->tv_usec);
result->tv_sec = microseconds/1000000LLU;
result->tv_usec = microseconds%1000000LLU;
}
void timeval_multiply(struct timeval *result, const struct timeval *a, const long multiplier) {
long microseconds=0;
microseconds = (((long)a->tv_sec * 1000000) + (long)a->tv_usec) * multiplier;
result->tv_sec = microseconds/(long)1000000;
result->tv_usec = microseconds%(long)1000000;
u_int64_t microseconds=0;
microseconds = (((u_int64_t)a->tv_sec * 1000000LLU) + a->tv_usec) * multiplier;
result->tv_sec = microseconds/1000000LLU;
result->tv_usec = microseconds%1000000LLU;
}
void timeval_devide(struct timeval *result, const struct timeval *a, const long devisor) {
long microseconds=0;
microseconds = (((long)a->tv_sec * 1000000) + (long)a->tv_usec) / devisor;
result->tv_sec = microseconds/(long)1000000;
result->tv_usec = microseconds%(long)1000000;
void timeval_divide(struct timeval *result, const struct timeval *a, const long divisor) {
u_int64_t microseconds=0;
microseconds = (((u_int64_t)a->tv_sec * 1000000LLU) + a->tv_usec) / divisor;
result->tv_sec = microseconds/1000000LLU;
result->tv_usec = microseconds%1000000LLU;
}
void timeval_add(struct timeval *result, const struct timeval *a, const struct timeval *b) {
long microseconds=0;
microseconds = ((long)a->tv_sec + (long)b->tv_sec) * (long)1000000 + ((long)a->tv_usec + (long)b->tv_usec);
result->tv_sec = microseconds/(long)1000000;
result->tv_usec = microseconds%(long)1000000;
u_int64_t microseconds=0;
microseconds = ((u_int64_t)a->tv_sec + (u_int64_t)b->tv_sec) * 1000000LLU + (a->tv_usec + b->tv_usec);
result->tv_sec = microseconds/1000000LLU;
result->tv_usec = microseconds%1000000LLU;
}
static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) {
struct timeval dp, oa;
mutex_lock(&s->total_average_lock);
// new average = ((old average * old num sessions) + datapoint) / new num sessions
// ... but this will overflow when num sessions becomes very large
// timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess);
// timeval_add(&t, &t, add);
// s->total_managed_sess++;
// timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess);
// alternative:
// new average = old average + (datapoint / new num sessions) - (old average / new num sessions)
s->total_managed_sess++;
timeval_divide(&dp, add, s->total_managed_sess);
timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess);
timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp);
timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa);
mutex_unlock(&s->total_average_lock);
}
/* called lock-free, but must hold a reference to the call */
@ -2447,9 +2462,8 @@ void call_destroy(struct call *c) {
cdrbufcur += sprintf(cdrbufcur,"tos=%u, ", (unsigned int)c->tos);
for (l = c->monologues; l; l = l->next) {
ml = l->data;
timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started);
if (_log_facility_cdr) {
memset(&tim_result_duration,0,sizeof(struct timeval));
timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started);
cdrbufcur += sprintf(cdrbufcur, "ml%i_start_time=%ld.%06lu, "
"ml%i_end_time=%ld.%06ld, "
"ml%i_duration=%ld.%06ld, "
@ -2491,36 +2505,42 @@ void call_destroy(struct call *c) {
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, ",
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet));
}
ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e, %llu last_packet",
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
addr, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
mutex_lock(&m->totalstats_lock);
m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors;
m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors;
mutex_unlock(&m->totalstats_lock);
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
atomic64_add(&m->totalstats.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats_interval.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats.total_relayed_errors,
atomic64_get(&ps->stats.errors));
atomic64_add(&m->totalstats_interval.total_relayed_errors,
atomic64_get(&ps->stats.errors));
}
}
if (_log_facility_cdr)
@ -2528,10 +2548,7 @@ void call_destroy(struct call *c) {
}
// --- for statistics getting one way stream or no relay at all
mutex_lock(&m->totalstats_lock);
m->totalstats.total_nopacket_relayed_sess *= 2;
m->totalstats_interval.total_nopacket_relayed_sess *= 2;
mutex_unlock(&m->totalstats_lock);
int total_nopacket_relayed_sess = 0;
for (l = c->monologues; l; l = l->next) {
ml = l->data;
@ -2569,52 +2586,38 @@ void call_destroy(struct call *c) {
}
}
if (ps && ps2 && ps2->stats.packets==0) {
mutex_lock(&m->totalstats_lock);
if (ps->stats.packets!=0) {
m->totalstats.total_oneway_stream_sess++;
m->totalstats_interval.total_oneway_stream_sess++;
if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) {
if (atomic64_get(&ps->stats.packets)!=0) {
atomic64_inc(&m->totalstats.total_oneway_stream_sess);
atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess);
}
else {
m->totalstats.total_nopacket_relayed_sess++;
m->totalstats_interval.total_nopacket_relayed_sess++;
total_nopacket_relayed_sess++;
}
mutex_unlock(&m->totalstats_lock);
}
}
mutex_lock(&m->totalstats_lock);
m->totalstats.total_nopacket_relayed_sess /= 2;
m->totalstats_interval.total_nopacket_relayed_sess /= 2;
m->totalstats.total_managed_sess += 1;
m->totalstats_interval.total_managed_sess += 1;
atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
ml = c->monologues->data;
if (ml->term_reason==TIMEOUT) {
m->totalstats.total_timeout_sess++;
m->totalstats_interval.total_timeout_sess++;
atomic64_inc(&m->totalstats.total_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_timeout_sess);
} else if (ml->term_reason==SILENT_TIMEOUT) {
m->totalstats.total_silent_timeout_sess++;
m->totalstats_interval.total_silent_timeout_sess++;
atomic64_inc(&m->totalstats.total_silent_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess);
} else if (ml->term_reason==REGULAR) {
m->totalstats.total_regular_term_sess++;
m->totalstats_interval.total_regular_term_sess++;
atomic64_inc(&m->totalstats.total_regular_term_sess);
atomic64_inc(&m->totalstats_interval.total_regular_term_sess);
} else if (ml->term_reason==FORCED) {
m->totalstats.total_forced_term_sess++;
m->totalstats_interval.total_forced_term_sess++;
atomic64_inc(&m->totalstats.total_forced_term_sess);
atomic64_inc(&m->totalstats_interval.total_forced_term_sess);
}
timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1);
timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration);
timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess);
timeval_multiply(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess-1);
timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration);
timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess);
mutex_unlock(&m->totalstats_lock);
timeval_totalstats_average_add(&m->totalstats, &tim_result_duration);
timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration);
if (_log_facility_cdr)
/* log it */
@ -2852,6 +2855,7 @@ struct call_monologue *__monologue_create(struct call *call) {
ret->created = poller_now;
ret->other_tags = g_hash_table_new(str_hash, str_equal);
g_queue_init(&ret->medias);
gettimeofday(&ret->started, NULL);
call->monologues = g_slist_prepend(call->monologues, ret);
@ -3026,7 +3030,6 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc
for (i = c->monologues; i; i = i->next) {
ml = i->data;
memset(&ml->terminated,0,sizeof(struct timeval));
gettimeofday(&(ml->terminated), NULL);
ml->term_reason = REGULAR;
}


+ 19
- 19
daemon/call.h View File

@ -12,6 +12,7 @@
#include <openssl/x509.h>
#include "compat.h"
#include "control_ng.h"
#include "aux.h"
enum termination_reason {
UNKNOWN=0,
@ -187,23 +188,25 @@ extern const struct transport_protocol transport_protocols[];
struct stats {
u_int64_t packets;
u_int64_t bytes;
u_int64_t errors;
atomic64 packets;
atomic64 bytes;
atomic64 errors;
};
struct totalstats {
time_t started;
atomic64 total_timeout_sess;
atomic64 total_silent_timeout_sess;
atomic64 total_regular_term_sess;
atomic64 total_forced_term_sess;
atomic64 total_relayed_packets;
atomic64 total_relayed_errors;
atomic64 total_nopacket_relayed_sess;
atomic64 total_oneway_stream_sess;
mutex_t total_average_lock; /* for these two below */
u_int64_t total_managed_sess;
u_int64_t total_timeout_sess;
u_int64_t total_silent_timeout_sess;
u_int64_t total_regular_term_sess;
u_int64_t total_forced_term_sess;
u_int64_t total_relayed_packets;
u_int64_t total_relayed_errors;
u_int64_t total_nopacket_relayed_sess;
u_int64_t total_oneway_stream_sess;
struct timeval total_average_call_dur;
struct timeval total_average_call_dur;
};
struct udp_fd {
@ -268,9 +271,9 @@ struct packet_stream {
struct endpoint advertised_endpoint; /* RO */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct stats stats; /* LOCK: in_lock */
struct stats kernel_stats; /* LOCK: in_lock */
time_t last_packet; /* LOCK: in_lock */
struct stats stats;
struct stats kernel_stats;
atomic64 last_packet;
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
@ -406,11 +409,8 @@ struct callmaster {
BIT_ARRAY_DECLARE(ports_used, 0x10000);
/* XXX rework these */
mutex_t statspslock;
struct stats statsps; /* per second stats, running timer */
mutex_t statslock;
struct stats stats; /* copied from statsps once a second */
mutex_t totalstats_lock; /* for both of them */
struct totalstats totalstats;
struct totalstats totalstats_interval;
/* control_ng_stats stuff */
@ -469,7 +469,7 @@ const struct transport_protocol *transport_protocol(const str *s);
void timeval_subtract (struct timeval *result, const struct timeval *a, const struct timeval *b);
void timeval_multiply(struct timeval *result, const struct timeval *a, const long multiplier);
void timeval_devide(struct timeval *result, const struct timeval *a, const long devisor);
void timeval_divide(struct timeval *result, const struct timeval *a, const long divisor);
void timeval_add(struct timeval *result, const struct timeval *a, const struct timeval *b);


+ 13
- 19
daemon/call_interfaces.c View File

@ -386,8 +386,8 @@ str *call_query_udp(char **out, struct callmaster *m) {
ret = str_sprintf("%s %lld "UINT64F" "UINT64F" "UINT64F" "UINT64F"\n", out[RE_UDP_COOKIE],
(long long int) m->conf.silent_timeout - (poller_now - stats.last_packet),
stats.totals[0].packets, stats.totals[1].packets,
stats.totals[2].packets, stats.totals[3].packets);
atomic64_get_na(&stats.totals[0].packets), atomic64_get_na(&stats.totals[1].packets),
atomic64_get_na(&stats.totals[2].packets), atomic64_get_na(&stats.totals[3].packets));
goto out;
err:
@ -431,20 +431,14 @@ static void call_status_iterator(struct call *c, struct control_stream *s) {
}
void calls_status_tcp(struct callmaster *m, struct control_stream *s) {
struct stats st;
GQueue q = G_QUEUE_INIT;
struct call *c;
mutex_lock(&m->statslock);
st = m->stats;
mutex_unlock(&m->statslock);
callmaster_get_all_calls(m, &q);
control_stream_printf(s, "proxy %u "UINT64F"/"UINT64F"/"UINT64F"\n",
control_stream_printf(s, "proxy %u "UINT64F"/%i/%i\n",
g_queue_get_length(&q),
st.bytes, st.bytes - st.errors,
st.bytes * 2 - st.errors);
atomic64_get(&m->stats.bytes), 0, 0);
while (q.head) {
c = g_queue_pop_head(&q);
@ -726,14 +720,14 @@ const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_
}
static void ng_stats(bencode_item_t *d, const struct stats *s, struct stats *totals) {
bencode_dictionary_add_integer(d, "packets", s->packets);
bencode_dictionary_add_integer(d, "bytes", s->bytes);
bencode_dictionary_add_integer(d, "errors", s->errors);
bencode_dictionary_add_integer(d, "packets", atomic64_get(&s->packets));
bencode_dictionary_add_integer(d, "bytes", atomic64_get(&s->bytes));
bencode_dictionary_add_integer(d, "errors", atomic64_get(&s->errors));
if (!totals)
return;
totals->packets += s->packets;
totals->bytes += s->bytes;
totals->errors += s->errors;
atomic64_add_na(&totals->packets, atomic64_get(&s->packets));
atomic64_add_na(&totals->bytes, atomic64_get(&s->bytes));
atomic64_add_na(&totals->errors, atomic64_get(&s->errors));
}
static void ng_stats_endpoint(bencode_item_t *dict, const struct endpoint *ep) {
@ -772,7 +766,7 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
if (ps->crypto.params.crypto_suite)
bencode_dictionary_add_string(dict, "crypto suite",
ps->crypto.params.crypto_suite->name);
bencode_dictionary_add_integer(dict, "last packet", ps->last_packet);
bencode_dictionary_add_integer(dict, "last packet", atomic64_get(&ps->last_packet));
flags = bencode_dictionary_add_list(dict, "flags");
@ -788,8 +782,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
BF_PS("media handover", MEDIA_HANDOVER);
stats:
if (totals->last_packet < ps->last_packet)
totals->last_packet = ps->last_packet;
if (totals->last_packet < atomic64_get(&ps->last_packet))
totals->last_packet = atomic64_get(&ps->last_packet);
/* XXX distinguish between input and output */
s = &totals->totals[0];


+ 25
- 22
daemon/cli.c View File

@ -25,36 +25,39 @@ static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer
static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
struct timeval avg;
u_int64_t num_sessions;
mutex_lock(&m->totalstats_lock);
mutex_lock(&m->totalstats.total_average_lock);
avg = m->totalstats.total_average_call_dur;
num_sessions = m->totalstats.total_managed_sess;
mutex_unlock(&m->totalstats.total_average_lock);
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nTotal statistics (does not include current running sessions):\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Uptime of rtpengine :%llu seconds\n", (unsigned long long)time(NULL)-m->totalstats.started);
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :%llu\n", (unsigned long long)m->totalstats.total_managed_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions);
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :%llu\n",(unsigned long long)m->totalstats.total_timeout_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via SILENT_TIMEOUT :%llu\n",(unsigned long long)m->totalstats.total_silent_timeout_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via SILENT_TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_silent_timeout_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total regular terminated sessions :%llu\n",(unsigned long long)m->totalstats.total_regular_term_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total regular terminated sessions :"UINT64F"\n",atomic64_get(&m->totalstats.total_regular_term_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total forced terminated sessions :%llu\n",(unsigned long long)m->totalstats.total_forced_term_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total forced terminated sessions :"UINT64F"\n",atomic64_get(&m->totalstats.total_forced_term_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packets :%llu\n",(unsigned long long)m->totalstats.total_relayed_packets);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packets :"UINT64F"\n",atomic64_get(&m->totalstats.total_relayed_packets));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packet errors :%llu\n",(unsigned long long)m->totalstats.total_relayed_errors);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total relayed packet errors :"UINT64F"\n",atomic64_get(&m->totalstats.total_relayed_errors));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of streams with no relayed packets :%llu\n", (unsigned long long)m->totalstats.total_nopacket_relayed_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of streams with no relayed packets :"UINT64F"\n", atomic64_get(&m->totalstats.total_nopacket_relayed_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of 1-way streams :%llu\n",(unsigned long long)m->totalstats.total_oneway_stream_sess);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total number of 1-way streams :"UINT64F"\n",atomic64_get(&m->totalstats.total_oneway_stream_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",avg.tv_sec,avg.tv_usec);
ADJUSTLEN(printlen,outbufend,replybuffer);
mutex_unlock(&m->totalstats_lock);
printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n",
@ -148,15 +151,15 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
continue;
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e, %llu last_packet\n",
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
ADJUSTLEN(printlen,outbufend,replybuffer);
}
}
@ -309,7 +312,7 @@ next:
sprintf(replybuffer, "Could currently not accept CLI commands. Reason:%s\n", strerror(errno));
goto cleanup;
}
ilog(LOG_INFO, "Accept error:%s\n", strerror(errno));
ilog(LOG_INFO, "Accept error:%s", strerror(errno));
goto next;
}
@ -319,15 +322,15 @@ next:
readbytes = read(nfd, inbuf+inlen, MAXINPUT);
if (readbytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s\n", strerror(errno));
ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s", strerror(errno));
goto cleanup;
}
ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s\n", strerror(errno));
ilog(LOG_INFO, "Could currently not read CLI commands. Reason:%s", strerror(errno));
}
inlen += readbytes;
} while (readbytes > 0);
ilog(LOG_INFO, "Got CLI command:%s\n",inbuf);
ilog(LOG_INFO, "Got CLI command:%s",inbuf);
static const char* LIST = "list";
static const char* TERMINATE = "terminate";


+ 30
- 17
daemon/graphite.c View File

@ -87,23 +87,36 @@ int send_graphite_data() {
char data_to_send[8192]; memset(&data_to_send,0,8192);
char* ptr = data_to_send;
mutex_lock(&cm->totalstats_lock);
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)g_now); ptr += rc;
ZERO(cm->totalstats_interval);
mutex_unlock(&cm->totalstats_lock);
struct totalstats ts;
/* atomically copy values to stack and reset to zero */
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_timeout_sess);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_silent_timeout_sess);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_regular_term_sess);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_forced_term_sess);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_relayed_packets);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_relayed_errors);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_nopacket_relayed_sess);
atomic64_local_copy_zero_struct(&ts, &cm->totalstats_interval, total_oneway_stream_sess);
mutex_lock(&cm->totalstats_interval.total_average_lock);
ts.total_average_call_dur = cm->totalstats_interval.total_average_call_dur;
ts.total_managed_sess = cm->totalstats_interval.total_managed_sess;
ZERO(ts.total_average_call_dur);
ZERO(ts.total_managed_sess);
mutex_unlock(&cm->totalstats_interval.total_average_lock);
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) ts.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %lu %llu\n",hostname, ts.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_forced_term_sess),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts.total_managed_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_nopacket_relayed_sess),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_oneway_stream_sess),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.regular_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_regular_term_sess),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_errors "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_errors),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_packets "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_packets),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.silent_timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_silent_timeout_sess),(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_timeout_sess),(unsigned long long)g_now); ptr += rc;
rc = write(graphite_sock, data_to_send, strlen(data_to_send));
if (rc<0) {


+ 1
- 1
daemon/main.c View File

@ -30,7 +30,7 @@
#define REDIS_MODULE_VERSION "redis/6"
#define REDIS_MODULE_VERSION "redis/7"


+ 1
- 1
daemon/sdp.c View File

@ -910,7 +910,7 @@ int sdp_parse(str *body, GQueue *sessions) {
return 0;
error:
ilog(LOG_WARNING, "Error parsing SDP at offset %li: %s", b - body->s, errstr);
ilog(LOG_WARNING, "Error parsing SDP at offset %li: %s", (long) (b - body->s), errstr);
sdp_free(sessions);
return -1;
}


Loading…
Cancel
Save