diff --git a/daemon/call.c b/daemon/call.c index ceb1258bf..737002610 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -33,7 +33,7 @@ #include "rtp.h" #include "call_interfaces.h" #include "ice.h" - +#include "rtpengine_config.h" @@ -1401,6 +1401,14 @@ static void callmaster_timer(void *ptr) { if (ke->stats.packets != atomic64_get(&ps->kernel_stats.packets)) atomic64_set(&ps->last_packet, poller_now); +#if (RE_HAS_MEASUREDELAY) + mutex_lock(&m->statspslock); + ps->stats.delay_min = m->statsps.delay_min = ke->stats.delay_min; + ps->stats.delay_avg = m->statsps.delay_avg = ke->stats.delay_avg; + ps->stats.delay_max = m->statsps.delay_max = ke->stats.delay_max; + mutex_unlock(&m->statspslock); +#endif + atomic64_set(&ps->kernel_stats.bytes, ke->stats.bytes); atomic64_set(&ps->kernel_stats.packets, ke->stats.packets); atomic64_set(&ps->kernel_stats.errors, ke->stats.errors); @@ -1420,6 +1428,14 @@ static void callmaster_timer(void *ptr) { atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); } +#if (RE_HAS_MEASUREDELAY) + mutex_lock(&m->statspslock); + ps->kernel_stats.delay_min = ke->stats.delay_min; + ps->kernel_stats.delay_avg = ke->stats.delay_avg; + ps->kernel_stats.delay_max = ke->stats.delay_max; + mutex_unlock(&m->statspslock); +#endif + update = 0; sink = packet_stream_sink(ps); @@ -2731,25 +2747,76 @@ void call_destroy(struct call *c) { if (_log_facility_cdr) { const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp"; - cdrbufcur += sprintf(cdrbufcur, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet)); + + if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) { + cdrbufcur += sprintf(cdrbufcur, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet)); + } else { +#if (RE_HAS_MEASUREDELAY) + cdrbufcur += sprintf(cdrbufcur, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_delay_min=%llu.%09llu, " + "ml%i_midx%u_%s_delay_avg=%llu.%09llu, " + "ml%i_midx%u_%s_delay_max=%llu.%09llu, ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_min.tv_sec, (unsigned long long) ps->stats.delay_min.tv_nsec, + cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_avg.tv_sec, (unsigned long long) ps->stats.delay_avg.tv_nsec, + cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_max.tv_sec, (unsigned long long) ps->stats.delay_max.tv_nsec); +#else + cdrbufcur += sprintf(cdrbufcur, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet)); +#endif + } } ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5hu%s, " diff --git a/daemon/call.h b/daemon/call.h index f70d7e032..840ed825d 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -210,13 +210,13 @@ struct transport_protocol { }; extern const struct transport_protocol transport_protocols[]; - - - struct stats { atomic64 packets; atomic64 bytes; atomic64 errors; + struct timespec delay_min; + struct timespec delay_avg; + struct timespec delay_max; }; struct totalstats { @@ -452,6 +452,7 @@ struct callmaster { /* XXX rework these */ struct stats statsps; /* per second stats, running timer */ struct stats stats; /* copied from statsps once a second */ + mutex_t statspslock; struct totalstats totalstats; struct totalstats totalstats_interval; /* control_ng_stats stuff */ diff --git a/daemon/cli.c b/daemon/cli.c index 711812531..37bca6ac4 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -14,6 +14,7 @@ #include "call.h" #include "cli.h" +#include "rtpengine_config.h" static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer ...\n"; @@ -149,6 +150,37 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m if (PS_ISSET(ps, FALLBACK_RTCP)) continue; +#if (RE_HAS_MEASUREDELAY) + if (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) { + printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " + ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n", + md->index, + (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port, + (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", + atomic64_get(&ps->stats.packets), + atomic64_get(&ps->stats.bytes), + atomic64_get(&ps->stats.errors), + atomic64_get(&ps->last_packet)); + } else { + printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " + ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet, %llu.%09llu delay_min, %llu.%09llu delay_avg, %llu.%09llu delay_max\n", + md->index, + (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port, + (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", + atomic64_get(&ps->stats.packets), + atomic64_get(&ps->stats.bytes), + atomic64_get(&ps->stats.errors), + atomic64_get(&ps->last_packet), + (unsigned long long) ps->stats.delay_min.tv_sec, + (unsigned long long) ps->stats.delay_min.tv_nsec, + (unsigned long long) ps->stats.delay_avg.tv_sec, + (unsigned long long) ps->stats.delay_avg.tv_nsec, + (unsigned long long) ps->stats.delay_max.tv_sec, + (unsigned long long) ps->stats.delay_max.tv_nsec); + } +#else printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n", md->index, @@ -158,7 +190,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m atomic64_get(&ps->stats.packets), atomic64_get(&ps->stats.bytes), atomic64_get(&ps->stats.errors), - atomic64_get(&ps->last_packet)); + atomic64_get(&ps->last_packet)); +#endif ADJUSTLEN(printlen,outbufend,replybuffer); } } diff --git a/daemon/kernel.c b/daemon/kernel.c index 670f5496e..e6cbf9219 100644 --- a/daemon/kernel.c +++ b/daemon/kernel.c @@ -102,7 +102,6 @@ int kernel_del_stream(int fd, u_int16_t p) { return -1; } - GList *kernel_list(unsigned int id) { char str[64]; int fd; diff --git a/daemon/main.c b/daemon/main.c index 6d3724893..d51d78bec 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -643,6 +643,7 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(poller_timer_loop, ctx.p); + if (graphite_ip) thread_create_detach(graphite_loop, ctx.m); thread_create_detach(ice_thread_run, NULL); diff --git a/kernel-module/rtpengine_config.h b/kernel-module/rtpengine_config.h new file mode 100644 index 000000000..13188e85c --- /dev/null +++ b/kernel-module/rtpengine_config.h @@ -0,0 +1,15 @@ +/* + * rtpengine_config.h + * + * Description: Config file with preprocessor config makros + * + * Created on: Mar 18, 2015 + * Author: fmetz + */ + +#ifndef RTPENGINE_CONFIG_H_ +#define RTPENGINE_CONFIG_H_ + +#define RE_HAS_MEASUREDELAY 1 + +#endif /* RTPENGINE_CONFIG_H_ */ diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index 713c34997..f7dd76b89 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -29,6 +29,8 @@ #include "xt_RTPENGINE.h" #endif +#include "rtpengine_config.h" + MODULE_LICENSE("GPL"); @@ -147,6 +149,9 @@ struct rtpengine_stats_a { atomic64_t packets; atomic64_t bytes; atomic64_t errors; + struct timespec delay_min; + struct timespec delay_avg; + struct timespec delay_max; }; struct rtpengine_rtp_stats_a { atomic64_t packets; @@ -854,6 +859,9 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t op.stats.packets = atomic64_read(&g->stats.packets); op.stats.bytes = atomic64_read(&g->stats.bytes); op.stats.errors = atomic64_read(&g->stats.errors); + op.stats.delay_min = g->stats.delay_min; + op.stats.delay_max = g->stats.delay_max; + op.stats.delay_avg = g->stats.delay_avg; for (i = 0; i < g->target.num_payload_types; i++) { op.rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets); @@ -882,10 +890,6 @@ err: return err; } - - - - static int proc_list_open(struct inode *i, struct file *f) { int err; struct seq_file *p; @@ -1474,6 +1478,9 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i atomic64_set(&g->stats.packets, atomic64_read(&og->stats.packets)); atomic64_set(&g->stats.bytes, atomic64_read(&og->stats.bytes)); atomic64_set(&g->stats.errors, atomic64_read(&og->stats.errors)); + g->stats.delay_min = og->stats.delay_min; + g->stats.delay_max = og->stats.delay_max; + g->stats.delay_avg = og->stats.delay_avg; for (j = 0; j < NUM_PAYLOAD_TYPES; j++) { atomic64_set(&g->rtp_stats[j].packets, atomic64_read(&og->rtp_stats[j].packets)); @@ -1816,6 +1823,7 @@ drop: static int send_proxy_packet(struct sk_buff *skb, struct re_address *src, struct re_address *dst, unsigned char tos) { + if (src->family != dst->family) goto drop; @@ -2184,7 +2192,48 @@ static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rt return match - tg->payload_types; } -static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) { +static void re_timespec_subtract (struct timespec *result, const struct timespec *a, const struct timespec *b) { + long long nanoseconds=0; + nanoseconds = ((long)a->tv_sec - (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec - (long long)b->tv_nsec); + result->tv_sec = nanoseconds/(long long)1000000000; + result->tv_nsec = nanoseconds%(long long)1000000000; +} + +static void re_timespec_multiply(struct timespec *result, const struct timespec *a, const long long multiplier) { + long long nanoseconds=0; + nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec * multiplier; + result->tv_sec = nanoseconds/(long long)1000000000; + result->tv_nsec = nanoseconds%(long long)1000000000; +} + +static void re_timespec_devide(struct timespec *result, const struct timespec *a, const long devisor) { + long long nanoseconds=0; + nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec / devisor; + result->tv_sec = nanoseconds/(long long)1000000000; + result->tv_nsec = nanoseconds%(long long)1000000000; +} + +static void re_timespec_add(struct timespec *result, const struct timespec *a, const struct timespec *b) { + long long nanoseconds=0; + nanoseconds = ((long)a->tv_sec + (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec + (long long)b->tv_nsec); + result->tv_sec = nanoseconds/(long long)1000000000; + result->tv_nsec = nanoseconds%(long long)1000000000; +} + +/* Return negative, zero, positive if A < B, A == B, A > B, respectively. + Assume the nanosecond components are in range, or close to it. */ +static int re_timespec_cmp (struct timespec *a, struct timespec *b) +{ + return (a->tv_sec < b->tv_sec ? -1 + : a->tv_sec > b->tv_sec ? 1 + : a->tv_nsec - b->tv_nsec); +} + +#if (RE_HAS_MEASUREDELAY) + static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, struct timespec *starttime) { +#else + static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) { +#endif struct udphdr *uh; struct rtpengine_target *g; struct sk_buff *skb2; @@ -2194,6 +2243,10 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct rtp_parsed rtp; u_int64_t pkt_idx = 0, pkt_idx_u; +#if (RE_HAS_MEASUREDELAY) + struct timespec endtime, delay; +#endif + skb_reset_transport_header(skb); uh = udp_hdr(skb); skb_pull(skb, sizeof(*uh)); @@ -2308,6 +2361,29 @@ out: if (rtp_pt_idx >= 0) { atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets); atomic64_add(datalen, &g->rtp_stats[rtp_pt_idx].bytes); + +#if (RE_HAS_MEASUREDELAY) + getnstimeofday(&endtime); + + re_timespec_subtract(&delay,&endtime, starttime); + + if (atomic64_read(&g->stats.packets)==1) { + g->stats.delay_min=delay; + g->stats.delay_avg=delay; + g->stats.delay_max=delay; + } else { + if (re_timespec_cmp(&g->stats.delay_min,&delay)>0) { + g->stats.delay_min = delay; + } + if (re_timespec_cmp(&g->stats.delay_max,&delay)<0) { + g->stats.delay_max = delay; + } + + re_timespec_multiply(&g->stats.delay_avg,&g->stats.delay_avg,atomic64_read(&g->stats.packets)-1); + re_timespec_add(&g->stats.delay_avg,&g->stats.delay_avg,&delay); + re_timespec_devide(&g->stats.delay_avg,&g->stats.delay_avg,atomic64_read(&g->stats.packets)); + } +#endif } else if (rtp_pt_idx == -2) /* not RTP */ ; @@ -2345,6 +2421,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para struct rtpengine_table *t; struct re_address src; +#if (RE_HAS_MEASUREDELAY) + struct timespec starttime; + getnstimeofday(&starttime); +#endif + t = get_table(pinfo->id); if (!t) goto skip; @@ -2363,7 +2444,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para src.family = AF_INET; src.u.ipv4 = ih->saddr; +#if (RE_HAS_MEASUREDELAY) + return rtpengine46(skb, t, &src, &starttime); +#else return rtpengine46(skb, t, &src); +#endif skip2: kfree_skb(skb); @@ -2387,6 +2472,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para struct rtpengine_table *t; struct re_address src; +#if (RE_HAS_MEASUREDELAY) + struct timespec starttime; + getnstimeofday(&starttime); +#endif + t = get_table(pinfo->id); if (!t) goto skip; @@ -2405,7 +2495,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para src.family = AF_INET6; memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6)); +#if (RE_HAS_MEASUREDELAY) + return rtpengine46(skb, t, &src, &starttime); +#else return rtpengine46(skb, t, &src); +#endif skip2: kfree_skb(skb); diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 361a56fb0..fac7e33c9 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -15,6 +15,9 @@ struct rtpengine_stats { u_int64_t packets; u_int64_t bytes; u_int64_t errors; + struct timespec delay_min; + struct timespec delay_avg; + struct timespec delay_max; }; struct rtpengine_rtp_stats { u_int64_t packets; @@ -99,7 +102,7 @@ struct rtpengine_message { MMG_NOOP = 1, MMG_ADD, MMG_DEL, - MMG_UPDATE, + MMG_UPDATE } cmd; struct rtpengine_target_info target;