Browse Source

First attempt to seperate more stats and cdr away from the core running code

changes/85/12185/1
Frederic-Philippe Metz 9 years ago
committed by Richard Fuchs
parent
commit
8f01e780a8
8 changed files with 558 additions and 442 deletions
  1. +1
    -1
      daemon/Makefile
  2. +23
    -374
      daemon/call.c
  3. +4
    -66
      daemon/call.h
  4. +224
    -0
      daemon/cdr.c
  5. +15
    -0
      daemon/cdr.h
  6. +3
    -1
      daemon/log.c
  7. +208
    -0
      daemon/statistics.c
  8. +80
    -0
      daemon/statistics.h

+ 1
- 1
daemon/Makefile View File

@ -38,7 +38,7 @@ include ../lib/lib.Makefile
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \
media_socket.c rtcp_xr.c homer.c recording.c
media_socket.c rtcp_xr.c homer.c recording.c statistics.c cdr.c
LIBSRCS= loglib.c auxlib.c rtplib.c
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o)


+ 23
- 374
daemon/call.c View File

@ -38,7 +38,8 @@
#include "log_funcs.h"
#include "recording.h"
#include "rtplib.h"
#include "cdr.h"
#include "statistics.h"
/* also serves as array index for callstream->peers[] */
@ -113,37 +114,8 @@ const struct transport_protocol transport_protocols[] = {
};
const int num_transport_protocols = G_N_ELEMENTS(transport_protocols);
static const char * const __term_reason_texts[] = {
[TIMEOUT] = "TIMEOUT",
[REGULAR] = "REGULAR",
[FORCED] = "FORCED",
[SILENT_TIMEOUT] = "SILENT_TIMEOUT",
[FINAL_TIMEOUT] = "FINAL_TIMEOUT",
};
static const char * const __tag_type_texts[] = {
[FROM_TAG] = "FROM_TAG",
[TO_TAG] = "TO_TAG",
};
static const char *const __opmode_texts[] = {
[OP_OFFER] = "offer",
[OP_ANSWER] = "answer",
};
static const char * get_term_reason_text(enum termination_reason t) {
return get_enum_array_text(__term_reason_texts, t, "UNKNOWN");
}
const char * get_tag_type_text(enum tag_type t) {
return get_enum_array_text(__tag_type_texts, t, "UNKNOWN");
}
const char *get_opmode_text(enum call_opmode m) {
return get_enum_array_text(__opmode_texts, m, "other");
}
/* ********** */
static void __monologue_destroy(struct call_monologue *monologue);
static int monologue_destroy(struct call_monologue *ml);
@ -1744,62 +1716,6 @@ error_intf:
return ERROR_NO_FREE_LOGS;
}
static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) {
struct timeval dp, oa;
mutex_lock(&s->total_average_lock);
// new average = ((old average * old num sessions) + datapoint) / new num sessions
// ... but this will overflow when num sessions becomes very large
// timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess);
// timeval_add(&t, &t, add);
// s->total_managed_sess++;
// timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess);
// alternative:
// new average = old average + (datapoint / new num sessions) - (old average / new num sessions)
s->total_managed_sess++;
timeval_divide(&dp, add, s->total_managed_sess);
timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess);
timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp);
timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa);
mutex_unlock(&s->total_average_lock);
}
static void timeval_totalstats_interval_call_duration_add(struct totalstats *s,
struct timeval *call_start, struct timeval *call_stop,
struct timeval *interval_start, int interval_dur_s) {
/* work with graphite interval start val which might be changed elsewhere in the code*/
struct timeval real_iv_start = *interval_start;
struct timeval call_duration;
struct timeval *call_start_in_iv = call_start;
/* in case graphite interval needs to be the previous one */
if (timercmp(&real_iv_start, call_stop, >)) {
struct timeval graph_dur = { .tv_sec = interval_dur_s, .tv_usec = 0LL };
timeval_subtract(&real_iv_start, interval_start, &graph_dur);
}
if (timercmp(&real_iv_start, call_start, >))
call_start_in_iv = &real_iv_start;
/* this should never happen and is here for sanitization of output */
if (timercmp(call_start_in_iv, call_stop, >)) {
ilog(LOG_ERR, "Call start seems to exceed call stop");
return;
}
timeval_subtract(&call_duration, call_stop, call_start_in_iv);
mutex_lock(&s->total_calls_duration_lock);
timeval_add(&s->total_calls_duration_interval,
&s->total_calls_duration_interval, &call_duration);
mutex_unlock(&s->total_calls_duration_lock);
}
static int __rtp_stats_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
@ -1812,7 +1728,7 @@ static int __rtp_stats_sort(const void *ap, const void *bp) {
return 0;
}
static const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) {
const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) {
struct packet_stream *ps;
GList *values;
struct rtp_stats *rtp_s;
@ -1882,12 +1798,10 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
return res;
}
#define CDRBUFREMAINDER cdrbufend-cdrbufcur
/* called lock-free, but must hold a reference to the call */
void call_destroy(struct call *c) {
struct callmaster *m;
struct packet_stream *ps=0, *ps2=0;
struct packet_stream *ps=0;
struct stream_fd *sfd;
struct poller *p;
GList *l;
@ -1895,14 +1809,6 @@ void call_destroy(struct call *c) {
struct call_monologue *ml;
struct call_media *md;
GList *k, *o;
struct timeval tim_result_duration;
static const int CDRBUFLENGTH = 4096*2;
char cdrbuffer[CDRBUFLENGTH];
char* cdrbufcur = cdrbuffer;
char* cdrbufend = cdrbuffer+CDRBUFLENGTH-1;
int cdrlinecnt = 0;
int printlen=0;
int found = 0;
const struct rtp_payload_type *rtp_pt;
if (!c) {
@ -1924,68 +1830,21 @@ void call_destroy(struct call *c) {
obj_put(c);
if (IS_FOREIGN_CALL(c)) {
atomic64_dec(&m->stats.foreign_sessions);
}
if(!IS_FOREIGN_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min,
g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
statistics_update_foreignown_dec(c);
if (!IS_FOREIGN_CALL(c)) {
if (IS_OWN_CALL(c)) {
redis_delete(c, m->conf.redis_write);
}
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */
if (!IS_FOREIGN_CALL(c)) {
ilog(LOG_INFO, "Final packet stats:");
/* CDRs and statistics */
if (_log_facility_cdr) {
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"ci=%s, ",c->callid.s);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
}
if (IS_OWN_CALL(c)) {
for (l = c->monologues.head; l; l = l->next) {
ml = l->data;
if (!ml->terminated.tv_sec) {
gettimeofday(&ml->terminated, NULL);
ml->term_reason = UNKNOWN;
}
timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started);
if (_log_facility_cdr) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_start_time=%ld.%06lu, "
"ml%i_end_time=%ld.%06ld, "
"ml%i_duration=%ld.%06ld, "
"ml%i_termination=%s, "
"ml%i_local_tag=%s, "
"ml%i_local_tag_type=%s, "
"ml%i_remote_tag=%s, ",
cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec,
cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec,
cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec,
cdrlinecnt, get_term_reason_text(ml->term_reason),
cdrlinecnt, ml->tag.s,
cdrlinecnt, get_tag_type_text(ml->tagtype),
cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)");
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
}
ilog(LOG_INFO, "--- Tag '"STR_FORMAT"', created "
"%u:%02u ago for branch '"STR_FORMAT"', in dialogue with '"STR_FORMAT"'",
STR_FMT(&ml->tag),
@ -1994,10 +1853,10 @@ void call_destroy(struct call *c) {
STR_FMT(&ml->viabranch),
ml->active_dialogue ? ml->active_dialogue->tag.len : 6,
ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)");
for (k = ml->medias.head; k; k = k->next) {
md = k->data;
rtp_pt = __rtp_stats_codec(md);
#define MLL_PREFIX "------ Media #%u ("STR_FORMAT" over %s) using " /* media log line prefix */
#define MLL_COMMON /* common args */ \
@ -2009,119 +1868,16 @@ void call_destroy(struct call *c) {
else
ilog(LOG_INFO, MLL_PREFIX STR_FORMAT, MLL_COMMON,
STR_FMT(&rtp_pt->encoding_with_params));
/* add PayloadType(codec) info in CDR logging */
if (_log_facility_cdr && rtp_pt) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=%u, ", rtp_pt->payload_type);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
} else if (_log_facility_cdr && !rtp_pt) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=unknown, ");
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
}
for (o = md->streams.head; o; o = o->next) {
ps = o->data;
if (PS_ISSET(ps, FALLBACK_RTCP))
continue;
char *addr = sockaddr_print_buf(&ps->endpoint.address);
char *local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0";
if (_log_facility_cdr) {
const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp";
if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_ip=%s, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol,
(ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
ps->stats.in_tos_tclass);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
} else {
#if (RE_HAS_MEASUREDELAY)
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_ip=%s, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", "
"ml%i_midx%u_%s_delay_min=%.9f, "
"ml%i_midx%u_%s_delay_avg=%.9f, "
"ml%i_midx%u_%s_delay_max=%.9f, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
ps->stats.in_tos_tclass,
cdrlinecnt, md->index, protocol, (double) ps->stats.delay_min / 1000000,
cdrlinecnt, md->index, protocol, (double) ps->stats.delay_avg / 1000000,
cdrlinecnt, md->index, protocol, (double) ps->stats.delay_max / 1000000);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
#else
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_ip=%s, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol,
(ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
ps->stats.in_tos_tclass);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
#endif
}
}
ilog(LOG_INFO, "--------- Port %15s:%-5u <> %15s:%-5u%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet", local_addr,
(unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
@ -2132,117 +1888,18 @@ void call_destroy(struct call *c) {
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
atomic64_add(&m->totalstats.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats_interval.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats.total_relayed_errors,
atomic64_get(&ps->stats.errors));
atomic64_add(&m->totalstats_interval.total_relayed_errors,
atomic64_get(&ps->stats.errors));
}
ice_shutdown(&md->ice_agent);
}
if (_log_facility_cdr)
++cdrlinecnt;
}
}
// --- for statistics getting one way stream or no relay at all
int total_nopacket_relayed_sess = 0;
for (l = c->monologues.head; l; l = l->next) {
ml = l->data;
// --- go through partner ml and search the RTP
for (k = ml->medias.head; k; k = k->next) {
md = k->data;
for (o = md->streams.head; o; o = o->next) {
ps = o->data;
if ((PS_ISSET(ps, RTP) && !PS_ISSET(ps, RTCP))) {
// --- only RTP is interesting
found = 1;
break;
}
}
if (found) { break; }
}
found = 0;
if (ml->active_dialogue) {
// --- go through partner ml and search the RTP
for (k = ml->active_dialogue->medias.head; k; k = k->next) {
md = k->data;
for (o = md->streams.head; o; o = o->next) {
ps2 = o->data;
if ((PS_ISSET(ps2, RTP) && !PS_ISSET(ps2, RTCP))) {
// --- only RTP is interesting
found = 1;
break;
}
}
if (found) { break; }
}
}
statistics_update_totals(m,ps);
if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) {
if (atomic64_get(&ps->stats.packets)!=0 && !IS_FOREIGN_CALL(c)){
if (atomic64_get(&ps->stats.packets)!=0) {
atomic64_inc(&m->totalstats.total_oneway_stream_sess);
atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess);
}
}
else {
total_nopacket_relayed_sess++;
}
}
}
if (!IS_FOREIGN_CALL(c)) {
atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
}
if (c->monologues.head) {
ml = c->monologues.head->data;
if (!IS_FOREIGN_CALL(c)) {
if (ml->term_reason==TIMEOUT) {
atomic64_inc(&m->totalstats.total_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_timeout_sess);
} else if (ml->term_reason==SILENT_TIMEOUT) {
atomic64_inc(&m->totalstats.total_silent_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess);
} else if (ml->term_reason==REGULAR) {
atomic64_inc(&m->totalstats.total_regular_term_sess);
atomic64_inc(&m->totalstats_interval.total_regular_term_sess);
} else if (ml->term_reason==FORCED) {
atomic64_inc(&m->totalstats.total_forced_term_sess);
atomic64_inc(&m->totalstats_interval.total_forced_term_sess);
ice_shutdown(&md->ice_agent);
}
timeval_totalstats_average_add(&m->totalstats, &tim_result_duration);
timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration);
timeval_totalstats_interval_call_duration_add(
&m->totalstats_interval, &ml->started, &ml->terminated,
&m->latest_graphite_interval_start,
m->conf.graphite_interval);
}
if (ml->term_reason==FINAL_TIMEOUT) {
atomic64_inc(&m->totalstats.total_final_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_final_timeout_sess);
}
}
statistics_update_oneway(c);
if (_log_facility_cdr)
/* log it */
cdrlog(cdrbuffer);
cdr_update_entry(c);
for (l = c->streams.head; l; l = l->next) {
ps = l->data;
@ -2394,19 +2051,11 @@ restart:
}
g_hash_table_insert(m->callhash, &c->callid, obj_get(c));
if (type == CT_OWN_CALL) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_max = MAX(
m->totalstats_interval.managed_sess_max,
g_hash_table_size(m->callhash)
- atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
else if (type == CT_FOREIGN_CALL) { /* foreign call*/
c->foreign_call = 1;
atomic64_inc(&m->stats.foreign_sessions);
atomic64_inc(&m->totalstats.total_foreign_sessions);
}
if (type == CT_FOREIGN_CALL) /* foreign call*/
c->foreign_call = 1;
statistics_update_foreignown_inc(m,c);
rwlock_lock_w(&c->master_lock);
rwlock_unlock_w(&m->hashlock);
}


+ 4
- 66
daemon/call.h View File

@ -19,6 +19,7 @@
#include "socket.h"
#include "media_socket.h"
#include "recording.h"
#include "statistics.h"
#define UNDEFINED ((unsigned int) -1)
#define TRUNCATED " ... Output truncated. Increase Output Buffer ... \n"
@ -109,6 +110,7 @@ enum call_type {
#endif
#define IS_FOREIGN_CALL(c) (c->foreign_call)
#define IS_OWN_CALL(c) !IS_FOREIGN_CALL(c)
/* flags shared by several of the structs below */
#define SHARED_FLAG_IMPLICIT_RTCP 0x00000001
@ -232,54 +234,6 @@ struct transport_protocol {
};
extern const struct transport_protocol transport_protocols[];
struct stats {
atomic64 packets;
atomic64 bytes;
atomic64 errors;
u_int64_t delay_min;
u_int64_t delay_avg;
u_int64_t delay_max;
u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */
atomic64 foreign_sessions; // unresponsible via redis notification
};
struct request_time {
mutex_t lock;
u_int64_t count;
struct timeval time_min, time_max, time_avg;
};
struct totalstats {
time_t started;
atomic64 total_timeout_sess;
atomic64 total_foreign_sessions;
atomic64 total_rejected_sess;
atomic64 total_silent_timeout_sess;
atomic64 total_final_timeout_sess;
atomic64 total_regular_term_sess;
atomic64 total_forced_term_sess;
atomic64 total_relayed_packets;
atomic64 total_relayed_errors;
atomic64 total_nopacket_relayed_sess;
atomic64 total_oneway_stream_sess;
u_int64_t foreign_sessions;
u_int64_t own_sessions;
u_int64_t total_sessions;
mutex_t total_average_lock; /* for these two below */
u_int64_t total_managed_sess;
struct timeval total_average_call_dur;
mutex_t managed_sess_lock; /* for these below */
u_int64_t managed_sess_max; /* per graphite interval statistic */
u_int64_t managed_sess_min; /* per graphite interval statistic */
mutex_t total_calls_duration_lock; /* for these two below */
struct timeval total_calls_duration_interval;
struct request_time offer, answer, delete;
};
struct stream_params {
unsigned int index; /* starting with 1 */
@ -314,14 +268,7 @@ struct loop_protector {
unsigned char buf[RTP_LOOP_PROTECT];
};
struct rtp_stats {
unsigned int payload_type;
atomic64 packets;
atomic64 bytes;
atomic64 kernel_packets;
atomic64 kernel_bytes;
atomic64 in_tos_tclass;
};
struct packet_stream {
mutex_t in_lock,
@ -502,13 +449,6 @@ struct callmaster {
struct homer_sender *homer;
};
struct call_stats {
time_t last_packet;
struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */
};
struct callmaster *callmaster_new(struct poller *);
void callmaster_get_all_calls(struct callmaster *m, GQueue *q);
struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
@ -548,13 +488,11 @@ void __rtp_stats_update(GHashTable *dst, GHashTable *src);
const char *get_tag_type_text(enum tag_type t);
const char *get_opmode_text(enum call_opmode);
const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m);
#include "str.h"
#include "rtp.h"
INLINE void *call_malloc(struct call *c, size_t l) {
void *ret;
mutex_lock(&c->buffer_lock);


+ 224
- 0
daemon/cdr.c View File

@ -0,0 +1,224 @@
#include <inttypes.h>
#include "rtplib.h"
#include "cdr.h"
#include "call.h"
#include "poller.h"
#include "str.h"
#define CDRBUFREMAINDER cdrbufend-cdrbufcur
static const char * const __term_reason_texts[] = {
[TIMEOUT] = "TIMEOUT",
[REGULAR] = "REGULAR",
[FORCED] = "FORCED",
[SILENT_TIMEOUT] = "SILENT_TIMEOUT",
[FINAL_TIMEOUT] = "FINAL_TIMEOUT",
};
static const char * const __tag_type_texts[] = {
[FROM_TAG] = "FROM_TAG",
[TO_TAG] = "TO_TAG",
};
static const char *const __opmode_texts[] = {
[OP_OFFER] = "offer",
[OP_ANSWER] = "answer",
};
const char * get_tag_type_text(enum tag_type t) {
return get_enum_array_text(__tag_type_texts, t, "UNKNOWN");
}
const char *get_opmode_text(enum call_opmode m) {
return get_enum_array_text(__opmode_texts, m, "other");
}
static const char * get_term_reason_text(enum termination_reason t) {
return get_enum_array_text(__term_reason_texts, t, "UNKNOWN");
}
void cdr_update_entry(struct call* c) {
GList *l;
struct call_monologue *ml;
struct timeval tim_result_duration;
int printlen=0;
int cdrlinecnt = 0;
static const int CDRBUFLENGTH = 4096*2;
char cdrbuffer[CDRBUFLENGTH];
char* cdrbufcur = cdrbuffer;
char* cdrbufend = cdrbuffer+CDRBUFLENGTH-1;
struct call_media *md;
GList *k, *o;
const struct rtp_payload_type *rtp_pt;
struct packet_stream *ps=0;
if (IS_OWN_CALL(c)) {
/* CDRs and statistics */
if (_log_facility_cdr) {
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"ci=%s, ",c->callid.s);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
}
for (l = c->monologues.head; l; l = l->next) {
ml = l->data;
if (!ml->terminated.tv_sec) {
gettimeofday(&ml->terminated, NULL);
ml->term_reason = UNKNOWN;
}
timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started);
if (_log_facility_cdr) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_start_time=%ld.%06lu, "
"ml%i_end_time=%ld.%06ld, "
"ml%i_duration=%ld.%06ld, "
"ml%i_termination=%s, "
"ml%i_local_tag=%s, "
"ml%i_local_tag_type=%s, "
"ml%i_remote_tag=%s, ",
cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec,
cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec,
cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec,
cdrlinecnt, get_term_reason_text(ml->term_reason),
cdrlinecnt, ml->tag.s,
cdrlinecnt, get_tag_type_text(ml->tagtype),
cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)");
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
}
for (k = ml->medias.head; k; k = k->next) {
md = k->data;
rtp_pt = __rtp_stats_codec(md);
/* add PayloadType(codec) info in CDR logging */
if (_log_facility_cdr && rtp_pt) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=%u, ", rtp_pt->payload_type);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
} else if (_log_facility_cdr && !rtp_pt) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=unknown, ");
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
}
for (o = md->streams.head; o; o = o->next) {
ps = o->data;
if (PS_ISSET(ps, FALLBACK_RTCP))
continue;
char *addr = sockaddr_print_buf(&ps->endpoint.address);
char *local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0";
if (_log_facility_cdr) {
const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp";
if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_ip=%s, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol,
(ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
ps->stats.in_tos_tclass);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
} else {
#if (RE_HAS_MEASUREDELAY)
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_ip=%s, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", "
"ml%i_midx%u_%s_delay_min=%.9f, "
"ml%i_midx%u_%s_delay_avg=%.9f, "
"ml%i_midx%u_%s_delay_max=%.9f, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
ps->stats.in_tos_tclass,
cdrlinecnt, md->index, protocol, (double) ps->stats.delay_min / 1000000,
cdrlinecnt, md->index, protocol, (double) ps->stats.delay_avg / 1000000,
cdrlinecnt, md->index, protocol, (double) ps->stats.delay_max / 1000000);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
#else
printlen = snprintf(cdrbufcur, CDRBUFREMAINDER,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_ip=%s, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, local_addr,
cdrlinecnt, md->index, protocol,
(ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
ps->stats.in_tos_tclass);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
#endif
}
}
}
}
if (_log_facility_cdr)
++cdrlinecnt;
}
}
/* log it */
cdrlog(cdrbuffer);
}

+ 15
- 0
daemon/cdr.h View File

@ -0,0 +1,15 @@
/*
* cdr.h
*
* Created on: Mar 14, 2017
* Author: fmetz
*/
#ifndef CDR_H_
#define CDR_H_
struct call;
void cdr_update_entry(struct call* c);
#endif /* CDR_H_ */

+ 3
- 1
daemon/log.c View File

@ -58,7 +58,9 @@ void __ilog(int prio, const char *fmt, ...) {
}
void cdrlog(const char* cdrbuffer) {
syslog(LOG_INFO | _log_facility_cdr, "%s", cdrbuffer);
if (_log_facility_cdr) {
syslog(LOG_INFO | _log_facility_cdr, "%s", cdrbuffer);
}
}


+ 208
- 0
daemon/statistics.c View File

@ -0,0 +1,208 @@
#include "call.h"
#include "statistics.h"
static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) {
struct timeval dp, oa;
mutex_lock(&s->total_average_lock);
// new average = ((old average * old num sessions) + datapoint) / new num sessions
// ... but this will overflow when num sessions becomes very large
// timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess);
// timeval_add(&t, &t, add);
// s->total_managed_sess++;
// timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess);
// alternative:
// new average = old average + (datapoint / new num sessions) - (old average / new num sessions)
s->total_managed_sess++;
timeval_divide(&dp, add, s->total_managed_sess);
timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess);
timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp);
timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa);
mutex_unlock(&s->total_average_lock);
}
static void timeval_totalstats_interval_call_duration_add(struct totalstats *s,
struct timeval *call_start, struct timeval *call_stop,
struct timeval *interval_start, int interval_dur_s) {
/* work with graphite interval start val which might be changed elsewhere in the code*/
struct timeval real_iv_start = *interval_start;
struct timeval call_duration;
struct timeval *call_start_in_iv = call_start;
/* in case graphite interval needs to be the previous one */
if (timercmp(&real_iv_start, call_stop, >)) {
struct timeval graph_dur = { .tv_sec = interval_dur_s, .tv_usec = 0LL };
timeval_subtract(&real_iv_start, interval_start, &graph_dur);
}
if (timercmp(&real_iv_start, call_start, >))
call_start_in_iv = &real_iv_start;
/* this should never happen and is here for sanitization of output */
if (timercmp(call_start_in_iv, call_stop, >)) {
ilog(LOG_ERR, "Call start seems to exceed call stop");
return;
}
timeval_subtract(&call_duration, call_stop, call_start_in_iv);
mutex_lock(&s->total_calls_duration_lock);
timeval_add(&s->total_calls_duration_interval,
&s->total_calls_duration_interval, &call_duration);
mutex_unlock(&s->total_calls_duration_lock);
}
void statistics_update_totals(struct callmaster* m, struct packet_stream *ps) {
atomic64_add(&m->totalstats.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats_interval.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats.total_relayed_errors,
atomic64_get(&ps->stats.errors));
atomic64_add(&m->totalstats_interval.total_relayed_errors,
atomic64_get(&ps->stats.errors));
}
void statistics_update_foreignown_dec(struct call* c) {
struct callmaster *m;
m = c->callmaster;
if (IS_FOREIGN_CALL(c)) {
atomic64_dec(&m->stats.foreign_sessions);
}
if(IS_OWN_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min,
g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
}
void statistics_update_foreignown_inc(struct callmaster *m, struct call* c) {
if (IS_OWN_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_max = MAX(
m->totalstats_interval.managed_sess_max,
g_hash_table_size(m->callhash)
- atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
else if (IS_FOREIGN_CALL(c)) { /* foreign call*/
atomic64_inc(&m->stats.foreign_sessions);
atomic64_inc(&m->totalstats.total_foreign_sessions);
}
}
void statistics_update_oneway(struct call* c) {
struct callmaster *m;
struct packet_stream *ps=0, *ps2=0;
struct call_monologue *ml;
struct call_media *md;
GList *k, *o;
int found = 0;
GList *l;
struct timeval tim_result_duration;
m = c->callmaster;
// --- for statistics getting one way stream or no relay at all
int total_nopacket_relayed_sess = 0;
for (l = c->monologues.head; l; l = l->next) {
ml = l->data;
// --- go through partner ml and search the RTP
for (k = ml->medias.head; k; k = k->next) {
md = k->data;
for (o = md->streams.head; o; o = o->next) {
ps = o->data;
if ((PS_ISSET(ps, RTP) && !PS_ISSET(ps, RTCP))) {
// --- only RTP is interesting
found = 1;
break;
}
}
if (found) { break; }
}
found = 0;
if (ml->active_dialogue) {
// --- go through partner ml and search the RTP
for (k = ml->active_dialogue->medias.head; k; k = k->next) {
md = k->data;
for (o = md->streams.head; o; o = o->next) {
ps2 = o->data;
if ((PS_ISSET(ps2, RTP) && !PS_ISSET(ps2, RTCP))) {
// --- only RTP is interesting
found = 1;
break;
}
}
if (found) { break; }
}
}
if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) {
if (atomic64_get(&ps->stats.packets)!=0 && IS_OWN_CALL(c)){
if (atomic64_get(&ps->stats.packets)!=0) {
atomic64_inc(&m->totalstats.total_oneway_stream_sess);
atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess);
}
}
else {
total_nopacket_relayed_sess++;
}
}
}
if (IS_OWN_CALL(c)) {
atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
}
if (c->monologues.head) {
ml = c->monologues.head->data;
if (IS_OWN_CALL(c)) {
if (ml->term_reason==TIMEOUT) {
atomic64_inc(&m->totalstats.total_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_timeout_sess);
} else if (ml->term_reason==SILENT_TIMEOUT) {
atomic64_inc(&m->totalstats.total_silent_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess);
} else if (ml->term_reason==REGULAR) {
atomic64_inc(&m->totalstats.total_regular_term_sess);
atomic64_inc(&m->totalstats_interval.total_regular_term_sess);
} else if (ml->term_reason==FORCED) {
atomic64_inc(&m->totalstats.total_forced_term_sess);
atomic64_inc(&m->totalstats_interval.total_forced_term_sess);
}
timeval_totalstats_average_add(&m->totalstats, &tim_result_duration);
timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration);
timeval_totalstats_interval_call_duration_add(
&m->totalstats_interval, &ml->started, &ml->terminated,
&m->latest_graphite_interval_start,
m->conf.graphite_interval);
}
if (ml->term_reason==FINAL_TIMEOUT) {
atomic64_inc(&m->totalstats.total_final_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_final_timeout_sess);
}
}
}

+ 80
- 0
daemon/statistics.h View File

@ -0,0 +1,80 @@
#ifndef STATISTICS_H_
#define STATISTICS_H_
#include "call.h"
struct stats {
atomic64 packets;
atomic64 bytes;
atomic64 errors;
u_int64_t delay_min;
u_int64_t delay_avg;
u_int64_t delay_max;
u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */
atomic64 foreign_sessions; // unresponsible via redis notification
};
struct request_time {
mutex_t lock;
u_int64_t count;
struct timeval time_min, time_max, time_avg;
};
struct totalstats {
time_t started;
atomic64 total_timeout_sess;
atomic64 total_foreign_sessions;
atomic64 total_rejected_sess;
atomic64 total_silent_timeout_sess;
atomic64 total_final_timeout_sess;
atomic64 total_regular_term_sess;
atomic64 total_forced_term_sess;
atomic64 total_relayed_packets;
atomic64 total_relayed_errors;
atomic64 total_nopacket_relayed_sess;
atomic64 total_oneway_stream_sess;
u_int64_t foreign_sessions;
u_int64_t own_sessions;
u_int64_t total_sessions;
mutex_t total_average_lock; /* for these two below */
u_int64_t total_managed_sess;
struct timeval total_average_call_dur;
mutex_t managed_sess_lock; /* for these below */
u_int64_t managed_sess_max; /* per graphite interval statistic */
u_int64_t managed_sess_min; /* per graphite interval statistic */
mutex_t total_calls_duration_lock; /* for these two below */
struct timeval total_calls_duration_interval;
struct request_time offer, answer, delete;
};
struct rtp_stats {
unsigned int payload_type;
atomic64 packets;
atomic64 bytes;
atomic64 kernel_packets;
atomic64 kernel_bytes;
atomic64 in_tos_tclass;
};
struct call_stats {
time_t last_packet;
struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */
};
struct callmaster;
void statistics_update_oneway(struct call *);
void statistics_update_foreignown_dec(struct call *);
void statistics_update_foreignown_inc(struct callmaster *m, struct call* c);
void statistics_update_totals(struct callmaster *, struct packet_stream *) ;
#endif /* STATISTICS_H_ */

Loading…
Cancel
Save