diff --git a/daemon/Makefile b/daemon/Makefile index 9843099a7..d76e8b132 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -38,7 +38,7 @@ include ../lib/lib.Makefile SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ - media_socket.c rtcp_xr.c homer.c recording.c + media_socket.c rtcp_xr.c homer.c recording.c statistics.c cdr.c LIBSRCS= loglib.c auxlib.c rtplib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/daemon/call.c b/daemon/call.c index ffa91bc8b..d0bdb3c05 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -38,7 +38,8 @@ #include "log_funcs.h" #include "recording.h" #include "rtplib.h" - +#include "cdr.h" +#include "statistics.h" /* also serves as array index for callstream->peers[] */ @@ -113,37 +114,8 @@ const struct transport_protocol transport_protocols[] = { }; const int num_transport_protocols = G_N_ELEMENTS(transport_protocols); -static const char * const __term_reason_texts[] = { - [TIMEOUT] = "TIMEOUT", - [REGULAR] = "REGULAR", - [FORCED] = "FORCED", - [SILENT_TIMEOUT] = "SILENT_TIMEOUT", - [FINAL_TIMEOUT] = "FINAL_TIMEOUT", -}; -static const char * const __tag_type_texts[] = { - [FROM_TAG] = "FROM_TAG", - [TO_TAG] = "TO_TAG", -}; -static const char *const __opmode_texts[] = { - [OP_OFFER] = "offer", - [OP_ANSWER] = "answer", -}; - -static const char * get_term_reason_text(enum termination_reason t) { - return get_enum_array_text(__term_reason_texts, t, "UNKNOWN"); -} -const char * get_tag_type_text(enum tag_type t) { - return get_enum_array_text(__tag_type_texts, t, "UNKNOWN"); -} -const char *get_opmode_text(enum call_opmode m) { - return get_enum_array_text(__opmode_texts, m, "other"); -} - - /* ********** */ - - static void __monologue_destroy(struct call_monologue *monologue); static int monologue_destroy(struct call_monologue *ml); @@ -1744,62 +1716,6 @@ error_intf: return ERROR_NO_FREE_LOGS; } -static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { - struct timeval dp, oa; - - mutex_lock(&s->total_average_lock); - - // new average = ((old average * old num sessions) + datapoint) / new num sessions - // ... but this will overflow when num sessions becomes very large - - // timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess); - // timeval_add(&t, &t, add); - // s->total_managed_sess++; - // timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess); - - // alternative: - // new average = old average + (datapoint / new num sessions) - (old average / new num sessions) - - s->total_managed_sess++; - timeval_divide(&dp, add, s->total_managed_sess); - timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess); - timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp); - timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa); - - mutex_unlock(&s->total_average_lock); -} - -static void timeval_totalstats_interval_call_duration_add(struct totalstats *s, - struct timeval *call_start, struct timeval *call_stop, - struct timeval *interval_start, int interval_dur_s) { - - /* work with graphite interval start val which might be changed elsewhere in the code*/ - struct timeval real_iv_start = *interval_start; - struct timeval call_duration; - struct timeval *call_start_in_iv = call_start; - - /* in case graphite interval needs to be the previous one */ - if (timercmp(&real_iv_start, call_stop, >)) { - struct timeval graph_dur = { .tv_sec = interval_dur_s, .tv_usec = 0LL }; - timeval_subtract(&real_iv_start, interval_start, &graph_dur); - } - - if (timercmp(&real_iv_start, call_start, >)) - call_start_in_iv = &real_iv_start; - - /* this should never happen and is here for sanitization of output */ - if (timercmp(call_start_in_iv, call_stop, >)) { - ilog(LOG_ERR, "Call start seems to exceed call stop"); - return; - } - - timeval_subtract(&call_duration, call_stop, call_start_in_iv); - - mutex_lock(&s->total_calls_duration_lock); - timeval_add(&s->total_calls_duration_interval, - &s->total_calls_duration_interval, &call_duration); - mutex_unlock(&s->total_calls_duration_lock); -} static int __rtp_stats_sort(const void *ap, const void *bp) { const struct rtp_stats *a = ap, *b = bp; @@ -1812,7 +1728,7 @@ static int __rtp_stats_sort(const void *ap, const void *bp) { return 0; } -static const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) { +const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) { struct packet_stream *ps; GList *values; struct rtp_stats *rtp_s; @@ -1882,12 +1798,10 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, return res; } -#define CDRBUFREMAINDER cdrbufend-cdrbufcur - /* called lock-free, but must hold a reference to the call */ void call_destroy(struct call *c) { struct callmaster *m; - struct packet_stream *ps=0, *ps2=0; + struct packet_stream *ps=0; struct stream_fd *sfd; struct poller *p; GList *l; @@ -1895,14 +1809,6 @@ void call_destroy(struct call *c) { struct call_monologue *ml; struct call_media *md; GList *k, *o; - struct timeval tim_result_duration; - static const int CDRBUFLENGTH = 4096*2; - char cdrbuffer[CDRBUFLENGTH]; - char* cdrbufcur = cdrbuffer; - char* cdrbufend = cdrbuffer+CDRBUFLENGTH-1; - int cdrlinecnt = 0; - int printlen=0; - int found = 0; const struct rtp_payload_type *rtp_pt; if (!c) { @@ -1924,68 +1830,21 @@ void call_destroy(struct call *c) { obj_put(c); - if (IS_FOREIGN_CALL(c)) { - atomic64_dec(&m->stats.foreign_sessions); - } - if(!IS_FOREIGN_CALL(c)) { - mutex_lock(&m->totalstats_interval.managed_sess_lock); - m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, - g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); - mutex_unlock(&m->totalstats_interval.managed_sess_lock); - } + statistics_update_foreignown_dec(c); - if (!IS_FOREIGN_CALL(c)) { + if (IS_OWN_CALL(c)) { redis_delete(c, m->conf.redis_write); } rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ - if (!IS_FOREIGN_CALL(c)) { - ilog(LOG_INFO, "Final packet stats:"); - - /* CDRs and statistics */ - if (_log_facility_cdr) { - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"ci=%s, ",c->callid.s); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } + if (IS_OWN_CALL(c)) { for (l = c->monologues.head; l; l = l->next) { ml = l->data; - - if (!ml->terminated.tv_sec) { - gettimeofday(&ml->terminated, NULL); - ml->term_reason = UNKNOWN; - } - - timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started); - - if (_log_facility_cdr) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_start_time=%ld.%06lu, " - "ml%i_end_time=%ld.%06ld, " - "ml%i_duration=%ld.%06ld, " - "ml%i_termination=%s, " - "ml%i_local_tag=%s, " - "ml%i_local_tag_type=%s, " - "ml%i_remote_tag=%s, ", - cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec, - cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec, - cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec, - cdrlinecnt, get_term_reason_text(ml->term_reason), - cdrlinecnt, ml->tag.s, - cdrlinecnt, get_tag_type_text(ml->tagtype), - cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } - + ilog(LOG_INFO, "--- Tag '"STR_FORMAT"', created " "%u:%02u ago for branch '"STR_FORMAT"', in dialogue with '"STR_FORMAT"'", STR_FMT(&ml->tag), @@ -1994,10 +1853,10 @@ void call_destroy(struct call *c) { STR_FMT(&ml->viabranch), ml->active_dialogue ? ml->active_dialogue->tag.len : 6, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); - + for (k = ml->medias.head; k; k = k->next) { md = k->data; - + rtp_pt = __rtp_stats_codec(md); #define MLL_PREFIX "------ Media #%u ("STR_FORMAT" over %s) using " /* media log line prefix */ #define MLL_COMMON /* common args */ \ @@ -2009,119 +1868,16 @@ void call_destroy(struct call *c) { else ilog(LOG_INFO, MLL_PREFIX STR_FORMAT, MLL_COMMON, STR_FMT(&rtp_pt->encoding_with_params)); - - /* add PayloadType(codec) info in CDR logging */ - if (_log_facility_cdr && rtp_pt) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=%u, ", rtp_pt->payload_type); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } else if (_log_facility_cdr && !rtp_pt) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=unknown, "); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } - + for (o = md->streams.head; o; o = o->next) { ps = o->data; - + if (PS_ISSET(ps, FALLBACK_RTCP)) continue; - + char *addr = sockaddr_print_buf(&ps->endpoint.address); char *local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0"; - - if (_log_facility_cdr) { - const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp"; - - if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_ip=%s, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", " - "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, local_addr, - cdrlinecnt, md->index, protocol, - (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet), - cdrlinecnt, md->index, protocol, - ps->stats.in_tos_tclass); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } else { -#if (RE_HAS_MEASUREDELAY) - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_ip=%s, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", " - "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", " - "ml%i_midx%u_%s_delay_min=%.9f, " - "ml%i_midx%u_%s_delay_avg=%.9f, " - "ml%i_midx%u_%s_delay_max=%.9f, ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, local_addr, - cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet), - cdrlinecnt, md->index, protocol, - ps->stats.in_tos_tclass, - cdrlinecnt, md->index, protocol, (double) ps->stats.delay_min / 1000000, - cdrlinecnt, md->index, protocol, (double) ps->stats.delay_avg / 1000000, - cdrlinecnt, md->index, protocol, (double) ps->stats.delay_max / 1000000); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); -#else - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_ip=%s, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", " - "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, local_addr, - cdrlinecnt, md->index, protocol, - (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet), - cdrlinecnt, md->index, protocol, - ps->stats.in_tos_tclass); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); -#endif - } - } - + ilog(LOG_INFO, "--------- Port %15s:%-5u <> %15s:%-5u%s, " ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet", local_addr, (unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), @@ -2132,117 +1888,18 @@ void call_destroy(struct call *c) { atomic64_get(&ps->stats.errors), atomic64_get(&ps->last_packet)); - - - atomic64_add(&m->totalstats.total_relayed_packets, - atomic64_get(&ps->stats.packets)); - atomic64_add(&m->totalstats_interval.total_relayed_packets, - atomic64_get(&ps->stats.packets)); - atomic64_add(&m->totalstats.total_relayed_errors, - atomic64_get(&ps->stats.errors)); - atomic64_add(&m->totalstats_interval.total_relayed_errors, - atomic64_get(&ps->stats.errors)); - } - - ice_shutdown(&md->ice_agent); - } - if (_log_facility_cdr) - ++cdrlinecnt; - } - } - // --- for statistics getting one way stream or no relay at all - int total_nopacket_relayed_sess = 0; - - for (l = c->monologues.head; l; l = l->next) { - ml = l->data; - - // --- go through partner ml and search the RTP - for (k = ml->medias.head; k; k = k->next) { - md = k->data; - - for (o = md->streams.head; o; o = o->next) { - ps = o->data; - if ((PS_ISSET(ps, RTP) && !PS_ISSET(ps, RTCP))) { - // --- only RTP is interesting - found = 1; - break; - } - } - if (found) { break; } - } - found = 0; - - if (ml->active_dialogue) { - // --- go through partner ml and search the RTP - for (k = ml->active_dialogue->medias.head; k; k = k->next) { - md = k->data; - - for (o = md->streams.head; o; o = o->next) { - ps2 = o->data; - if ((PS_ISSET(ps2, RTP) && !PS_ISSET(ps2, RTCP))) { - // --- only RTP is interesting - found = 1; - break; - } - } - if (found) { break; } - } - } + statistics_update_totals(m,ps); - if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { - if (atomic64_get(&ps->stats.packets)!=0 && !IS_FOREIGN_CALL(c)){ - if (atomic64_get(&ps->stats.packets)!=0) { - atomic64_inc(&m->totalstats.total_oneway_stream_sess); - atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); } - } - else { - total_nopacket_relayed_sess++; - } - } - } - if (!IS_FOREIGN_CALL(c)) { - atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); - atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); - } - - if (c->monologues.head) { - ml = c->monologues.head->data; - - if (!IS_FOREIGN_CALL(c)) { - if (ml->term_reason==TIMEOUT) { - atomic64_inc(&m->totalstats.total_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_timeout_sess); - } else if (ml->term_reason==SILENT_TIMEOUT) { - atomic64_inc(&m->totalstats.total_silent_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); - } else if (ml->term_reason==REGULAR) { - atomic64_inc(&m->totalstats.total_regular_term_sess); - atomic64_inc(&m->totalstats_interval.total_regular_term_sess); - } else if (ml->term_reason==FORCED) { - atomic64_inc(&m->totalstats.total_forced_term_sess); - atomic64_inc(&m->totalstats_interval.total_forced_term_sess); + ice_shutdown(&md->ice_agent); } - - timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); - timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); - timeval_totalstats_interval_call_duration_add( - &m->totalstats_interval, &ml->started, &ml->terminated, - &m->latest_graphite_interval_start, - m->conf.graphite_interval); - } - - if (ml->term_reason==FINAL_TIMEOUT) { - atomic64_inc(&m->totalstats.total_final_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_final_timeout_sess); } } + statistics_update_oneway(c); - if (_log_facility_cdr) - /* log it */ - cdrlog(cdrbuffer); + cdr_update_entry(c); for (l = c->streams.head; l; l = l->next) { ps = l->data; @@ -2394,19 +2051,11 @@ restart: } g_hash_table_insert(m->callhash, &c->callid, obj_get(c)); - if (type == CT_OWN_CALL) { - mutex_lock(&m->totalstats_interval.managed_sess_lock); - m->totalstats_interval.managed_sess_max = MAX( - m->totalstats_interval.managed_sess_max, - g_hash_table_size(m->callhash) - - atomic64_get(&m->stats.foreign_sessions)); - mutex_unlock(&m->totalstats_interval.managed_sess_lock); - } - else if (type == CT_FOREIGN_CALL) { /* foreign call*/ - c->foreign_call = 1; - atomic64_inc(&m->stats.foreign_sessions); - atomic64_inc(&m->totalstats.total_foreign_sessions); - } + if (type == CT_FOREIGN_CALL) /* foreign call*/ + c->foreign_call = 1; + + statistics_update_foreignown_inc(m,c); + rwlock_lock_w(&c->master_lock); rwlock_unlock_w(&m->hashlock); } diff --git a/daemon/call.h b/daemon/call.h index a5777cee9..150b2e129 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -19,6 +19,7 @@ #include "socket.h" #include "media_socket.h" #include "recording.h" +#include "statistics.h" #define UNDEFINED ((unsigned int) -1) #define TRUNCATED " ... Output truncated. Increase Output Buffer ... \n" @@ -109,6 +110,7 @@ enum call_type { #endif #define IS_FOREIGN_CALL(c) (c->foreign_call) +#define IS_OWN_CALL(c) !IS_FOREIGN_CALL(c) /* flags shared by several of the structs below */ #define SHARED_FLAG_IMPLICIT_RTCP 0x00000001 @@ -232,54 +234,6 @@ struct transport_protocol { }; extern const struct transport_protocol transport_protocols[]; -struct stats { - atomic64 packets; - atomic64 bytes; - atomic64 errors; - u_int64_t delay_min; - u_int64_t delay_avg; - u_int64_t delay_max; - u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */ - atomic64 foreign_sessions; // unresponsible via redis notification -}; - -struct request_time { - mutex_t lock; - u_int64_t count; - struct timeval time_min, time_max, time_avg; -}; - -struct totalstats { - time_t started; - atomic64 total_timeout_sess; - atomic64 total_foreign_sessions; - atomic64 total_rejected_sess; - atomic64 total_silent_timeout_sess; - atomic64 total_final_timeout_sess; - atomic64 total_regular_term_sess; - atomic64 total_forced_term_sess; - atomic64 total_relayed_packets; - atomic64 total_relayed_errors; - atomic64 total_nopacket_relayed_sess; - atomic64 total_oneway_stream_sess; - - u_int64_t foreign_sessions; - u_int64_t own_sessions; - u_int64_t total_sessions; - - mutex_t total_average_lock; /* for these two below */ - u_int64_t total_managed_sess; - struct timeval total_average_call_dur; - - mutex_t managed_sess_lock; /* for these below */ - u_int64_t managed_sess_max; /* per graphite interval statistic */ - u_int64_t managed_sess_min; /* per graphite interval statistic */ - - mutex_t total_calls_duration_lock; /* for these two below */ - struct timeval total_calls_duration_interval; - - struct request_time offer, answer, delete; -}; struct stream_params { unsigned int index; /* starting with 1 */ @@ -314,14 +268,7 @@ struct loop_protector { unsigned char buf[RTP_LOOP_PROTECT]; }; -struct rtp_stats { - unsigned int payload_type; - atomic64 packets; - atomic64 bytes; - atomic64 kernel_packets; - atomic64 kernel_bytes; - atomic64 in_tos_tclass; -}; + struct packet_stream { mutex_t in_lock, @@ -502,13 +449,6 @@ struct callmaster { struct homer_sender *homer; }; -struct call_stats { - time_t last_packet; - struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */ -}; - - - struct callmaster *callmaster_new(struct poller *); void callmaster_get_all_calls(struct callmaster *m, GQueue *q); struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, @@ -548,13 +488,11 @@ void __rtp_stats_update(GHashTable *dst, GHashTable *src); const char *get_tag_type_text(enum tag_type t); const char *get_opmode_text(enum call_opmode); - +const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m); #include "str.h" #include "rtp.h" - - INLINE void *call_malloc(struct call *c, size_t l) { void *ret; mutex_lock(&c->buffer_lock); diff --git a/daemon/cdr.c b/daemon/cdr.c new file mode 100644 index 000000000..0456a2a3c --- /dev/null +++ b/daemon/cdr.c @@ -0,0 +1,224 @@ +#include +#include "rtplib.h" +#include "cdr.h" +#include "call.h" +#include "poller.h" +#include "str.h" + +#define CDRBUFREMAINDER cdrbufend-cdrbufcur + +static const char * const __term_reason_texts[] = { + [TIMEOUT] = "TIMEOUT", + [REGULAR] = "REGULAR", + [FORCED] = "FORCED", + [SILENT_TIMEOUT] = "SILENT_TIMEOUT", + [FINAL_TIMEOUT] = "FINAL_TIMEOUT", +}; +static const char * const __tag_type_texts[] = { + [FROM_TAG] = "FROM_TAG", + [TO_TAG] = "TO_TAG", +}; +static const char *const __opmode_texts[] = { + [OP_OFFER] = "offer", + [OP_ANSWER] = "answer", +}; + +const char * get_tag_type_text(enum tag_type t) { + return get_enum_array_text(__tag_type_texts, t, "UNKNOWN"); +} +const char *get_opmode_text(enum call_opmode m) { + return get_enum_array_text(__opmode_texts, m, "other"); +} + +static const char * get_term_reason_text(enum termination_reason t) { + return get_enum_array_text(__term_reason_texts, t, "UNKNOWN"); +} + +void cdr_update_entry(struct call* c) { + GList *l; + struct call_monologue *ml; + struct timeval tim_result_duration; + int printlen=0; + int cdrlinecnt = 0; + static const int CDRBUFLENGTH = 4096*2; + char cdrbuffer[CDRBUFLENGTH]; + char* cdrbufcur = cdrbuffer; + char* cdrbufend = cdrbuffer+CDRBUFLENGTH-1; + struct call_media *md; + GList *k, *o; + const struct rtp_payload_type *rtp_pt; + struct packet_stream *ps=0; + + if (IS_OWN_CALL(c)) { + + /* CDRs and statistics */ + if (_log_facility_cdr) { + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"ci=%s, ",c->callid.s); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + if (!ml->terminated.tv_sec) { + gettimeofday(&ml->terminated, NULL); + ml->term_reason = UNKNOWN; + } + + timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started); + + if (_log_facility_cdr) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_start_time=%ld.%06lu, " + "ml%i_end_time=%ld.%06ld, " + "ml%i_duration=%ld.%06ld, " + "ml%i_termination=%s, " + "ml%i_local_tag=%s, " + "ml%i_local_tag_type=%s, " + "ml%i_remote_tag=%s, ", + cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec, + cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec, + cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec, + cdrlinecnt, get_term_reason_text(ml->term_reason), + cdrlinecnt, ml->tag.s, + cdrlinecnt, get_tag_type_text(ml->tagtype), + cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } + + for (k = ml->medias.head; k; k = k->next) { + md = k->data; + + rtp_pt = __rtp_stats_codec(md); + + /* add PayloadType(codec) info in CDR logging */ + if (_log_facility_cdr && rtp_pt) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=%u, ", rtp_pt->payload_type); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } else if (_log_facility_cdr && !rtp_pt) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=unknown, "); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } + + for (o = md->streams.head; o; o = o->next) { + ps = o->data; + + if (PS_ISSET(ps, FALLBACK_RTCP)) + continue; + + char *addr = sockaddr_print_buf(&ps->endpoint.address); + char *local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0"; + + if (_log_facility_cdr) { + const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp"; + + if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_ip=%s, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, local_addr, + cdrlinecnt, md->index, protocol, + (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, + ps->stats.in_tos_tclass); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } else { +#if (RE_HAS_MEASUREDELAY) + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_ip=%s, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", " + "ml%i_midx%u_%s_delay_min=%.9f, " + "ml%i_midx%u_%s_delay_avg=%.9f, " + "ml%i_midx%u_%s_delay_max=%.9f, ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, local_addr, + cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, + ps->stats.in_tos_tclass, + cdrlinecnt, md->index, protocol, (double) ps->stats.delay_min / 1000000, + cdrlinecnt, md->index, protocol, (double) ps->stats.delay_avg / 1000000, + cdrlinecnt, md->index, protocol, (double) ps->stats.delay_max / 1000000); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); +#else + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_ip=%s, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, local_addr, + cdrlinecnt, md->index, protocol, + (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, + ps->stats.in_tos_tclass); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); +#endif + } + } + + } + } + if (_log_facility_cdr) + ++cdrlinecnt; + } + } + + /* log it */ + cdrlog(cdrbuffer); + + +} + diff --git a/daemon/cdr.h b/daemon/cdr.h new file mode 100644 index 000000000..6cb7f13f7 --- /dev/null +++ b/daemon/cdr.h @@ -0,0 +1,15 @@ +/* + * cdr.h + * + * Created on: Mar 14, 2017 + * Author: fmetz + */ + +#ifndef CDR_H_ +#define CDR_H_ + +struct call; + +void cdr_update_entry(struct call* c); + +#endif /* CDR_H_ */ diff --git a/daemon/log.c b/daemon/log.c index 450da4d55..e096284ba 100644 --- a/daemon/log.c +++ b/daemon/log.c @@ -58,7 +58,9 @@ void __ilog(int prio, const char *fmt, ...) { } void cdrlog(const char* cdrbuffer) { - syslog(LOG_INFO | _log_facility_cdr, "%s", cdrbuffer); + if (_log_facility_cdr) { + syslog(LOG_INFO | _log_facility_cdr, "%s", cdrbuffer); + } } diff --git a/daemon/statistics.c b/daemon/statistics.c new file mode 100644 index 000000000..db20c3505 --- /dev/null +++ b/daemon/statistics.c @@ -0,0 +1,208 @@ +#include "call.h" +#include "statistics.h" + +static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { + struct timeval dp, oa; + + mutex_lock(&s->total_average_lock); + + // new average = ((old average * old num sessions) + datapoint) / new num sessions + // ... but this will overflow when num sessions becomes very large + + // timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess); + // timeval_add(&t, &t, add); + // s->total_managed_sess++; + // timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess); + + // alternative: + // new average = old average + (datapoint / new num sessions) - (old average / new num sessions) + + s->total_managed_sess++; + timeval_divide(&dp, add, s->total_managed_sess); + timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess); + timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp); + timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa); + + mutex_unlock(&s->total_average_lock); +} + +static void timeval_totalstats_interval_call_duration_add(struct totalstats *s, + struct timeval *call_start, struct timeval *call_stop, + struct timeval *interval_start, int interval_dur_s) { + + /* work with graphite interval start val which might be changed elsewhere in the code*/ + struct timeval real_iv_start = *interval_start; + struct timeval call_duration; + struct timeval *call_start_in_iv = call_start; + + /* in case graphite interval needs to be the previous one */ + if (timercmp(&real_iv_start, call_stop, >)) { + struct timeval graph_dur = { .tv_sec = interval_dur_s, .tv_usec = 0LL }; + timeval_subtract(&real_iv_start, interval_start, &graph_dur); + } + + if (timercmp(&real_iv_start, call_start, >)) + call_start_in_iv = &real_iv_start; + + /* this should never happen and is here for sanitization of output */ + if (timercmp(call_start_in_iv, call_stop, >)) { + ilog(LOG_ERR, "Call start seems to exceed call stop"); + return; + } + + timeval_subtract(&call_duration, call_stop, call_start_in_iv); + + mutex_lock(&s->total_calls_duration_lock); + timeval_add(&s->total_calls_duration_interval, + &s->total_calls_duration_interval, &call_duration); + mutex_unlock(&s->total_calls_duration_lock); +} + + +void statistics_update_totals(struct callmaster* m, struct packet_stream *ps) { + atomic64_add(&m->totalstats.total_relayed_packets, + atomic64_get(&ps->stats.packets)); + atomic64_add(&m->totalstats_interval.total_relayed_packets, + atomic64_get(&ps->stats.packets)); + atomic64_add(&m->totalstats.total_relayed_errors, + atomic64_get(&ps->stats.errors)); + atomic64_add(&m->totalstats_interval.total_relayed_errors, + atomic64_get(&ps->stats.errors)); +} + +void statistics_update_foreignown_dec(struct call* c) { + struct callmaster *m; + + m = c->callmaster; + + if (IS_FOREIGN_CALL(c)) { + atomic64_dec(&m->stats.foreign_sessions); + } + + if(IS_OWN_CALL(c)) { + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, + g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + } + +} + +void statistics_update_foreignown_inc(struct callmaster *m, struct call* c) { + if (IS_OWN_CALL(c)) { + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats_interval.managed_sess_max = MAX( + m->totalstats_interval.managed_sess_max, + g_hash_table_size(m->callhash) + - atomic64_get(&m->stats.foreign_sessions)); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + } + else if (IS_FOREIGN_CALL(c)) { /* foreign call*/ + atomic64_inc(&m->stats.foreign_sessions); + atomic64_inc(&m->totalstats.total_foreign_sessions); + } + +} + +void statistics_update_oneway(struct call* c) { + struct callmaster *m; + struct packet_stream *ps=0, *ps2=0; + struct call_monologue *ml; + struct call_media *md; + GList *k, *o; + int found = 0; + GList *l; + struct timeval tim_result_duration; + + m = c->callmaster; + + // --- for statistics getting one way stream or no relay at all + int total_nopacket_relayed_sess = 0; + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + // --- go through partner ml and search the RTP + for (k = ml->medias.head; k; k = k->next) { + md = k->data; + + for (o = md->streams.head; o; o = o->next) { + ps = o->data; + if ((PS_ISSET(ps, RTP) && !PS_ISSET(ps, RTCP))) { + // --- only RTP is interesting + found = 1; + break; + } + } + if (found) { break; } + } + found = 0; + + if (ml->active_dialogue) { + // --- go through partner ml and search the RTP + for (k = ml->active_dialogue->medias.head; k; k = k->next) { + md = k->data; + + for (o = md->streams.head; o; o = o->next) { + ps2 = o->data; + if ((PS_ISSET(ps2, RTP) && !PS_ISSET(ps2, RTCP))) { + // --- only RTP is interesting + found = 1; + break; + } + } + if (found) { break; } + } + } + + if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { + if (atomic64_get(&ps->stats.packets)!=0 && IS_OWN_CALL(c)){ + if (atomic64_get(&ps->stats.packets)!=0) { + atomic64_inc(&m->totalstats.total_oneway_stream_sess); + atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); + } + } + else { + total_nopacket_relayed_sess++; + } + } + } + + if (IS_OWN_CALL(c)) { + atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + } + + if (c->monologues.head) { + ml = c->monologues.head->data; + + if (IS_OWN_CALL(c)) { + if (ml->term_reason==TIMEOUT) { + atomic64_inc(&m->totalstats.total_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_timeout_sess); + } else if (ml->term_reason==SILENT_TIMEOUT) { + atomic64_inc(&m->totalstats.total_silent_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); + } else if (ml->term_reason==REGULAR) { + atomic64_inc(&m->totalstats.total_regular_term_sess); + atomic64_inc(&m->totalstats_interval.total_regular_term_sess); + } else if (ml->term_reason==FORCED) { + atomic64_inc(&m->totalstats.total_forced_term_sess); + atomic64_inc(&m->totalstats_interval.total_forced_term_sess); + } + + timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); + timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); + timeval_totalstats_interval_call_duration_add( + &m->totalstats_interval, &ml->started, &ml->terminated, + &m->latest_graphite_interval_start, + m->conf.graphite_interval); + } + + if (ml->term_reason==FINAL_TIMEOUT) { + atomic64_inc(&m->totalstats.total_final_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_final_timeout_sess); + } + } + +} diff --git a/daemon/statistics.h b/daemon/statistics.h new file mode 100644 index 000000000..333a1e7ba --- /dev/null +++ b/daemon/statistics.h @@ -0,0 +1,80 @@ +#ifndef STATISTICS_H_ +#define STATISTICS_H_ + +#include "call.h" + +struct stats { + atomic64 packets; + atomic64 bytes; + atomic64 errors; + u_int64_t delay_min; + u_int64_t delay_avg; + u_int64_t delay_max; + u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */ + atomic64 foreign_sessions; // unresponsible via redis notification +}; + + +struct request_time { + mutex_t lock; + u_int64_t count; + struct timeval time_min, time_max, time_avg; +}; + + +struct totalstats { + time_t started; + atomic64 total_timeout_sess; + atomic64 total_foreign_sessions; + atomic64 total_rejected_sess; + atomic64 total_silent_timeout_sess; + atomic64 total_final_timeout_sess; + atomic64 total_regular_term_sess; + atomic64 total_forced_term_sess; + atomic64 total_relayed_packets; + atomic64 total_relayed_errors; + atomic64 total_nopacket_relayed_sess; + atomic64 total_oneway_stream_sess; + + u_int64_t foreign_sessions; + u_int64_t own_sessions; + u_int64_t total_sessions; + + mutex_t total_average_lock; /* for these two below */ + u_int64_t total_managed_sess; + struct timeval total_average_call_dur; + + mutex_t managed_sess_lock; /* for these below */ + u_int64_t managed_sess_max; /* per graphite interval statistic */ + u_int64_t managed_sess_min; /* per graphite interval statistic */ + + mutex_t total_calls_duration_lock; /* for these two below */ + struct timeval total_calls_duration_interval; + + struct request_time offer, answer, delete; +}; + +struct rtp_stats { + unsigned int payload_type; + atomic64 packets; + atomic64 bytes; + atomic64 kernel_packets; + atomic64 kernel_bytes; + atomic64 in_tos_tclass; +}; + + +struct call_stats { + time_t last_packet; + struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */ +}; + + +struct callmaster; + +void statistics_update_oneway(struct call *); +void statistics_update_foreignown_dec(struct call *); +void statistics_update_foreignown_inc(struct callmaster *m, struct call* c); +void statistics_update_totals(struct callmaster *, struct packet_stream *) ; + +#endif /* STATISTICS_H_ */