Browse Source

Merge branch 'dev-fmetz'

Conflicts:
	daemon/call.c
	daemon/call.h
	daemon/cli.c
	daemon/main.c
	kernel-module/xt_RTPENGINE.c
pull/101/head
Frederic-Philippe Metz 11 years ago
parent
commit
a9e4c023e4
8 changed files with 244 additions and 31 deletions
  1. +87
    -20
      daemon/call.c
  2. +4
    -3
      daemon/call.h
  3. +34
    -1
      daemon/cli.c
  4. +0
    -1
      daemon/kernel.c
  5. +1
    -0
      daemon/main.c
  6. +15
    -0
      kernel-module/rtpengine_config.h
  7. +99
    -5
      kernel-module/xt_RTPENGINE.c
  8. +4
    -1
      kernel-module/xt_RTPENGINE.h

+ 87
- 20
daemon/call.c View File

@ -33,7 +33,7 @@
#include "rtp.h"
#include "call_interfaces.h"
#include "ice.h"
#include "rtpengine_config.h"
@ -1401,6 +1401,14 @@ static void callmaster_timer(void *ptr) {
if (ke->stats.packets != atomic64_get(&ps->kernel_stats.packets))
atomic64_set(&ps->last_packet, poller_now);
#if (RE_HAS_MEASUREDELAY)
mutex_lock(&m->statspslock);
ps->stats.delay_min = m->statsps.delay_min = ke->stats.delay_min;
ps->stats.delay_avg = m->statsps.delay_avg = ke->stats.delay_avg;
ps->stats.delay_max = m->statsps.delay_max = ke->stats.delay_max;
mutex_unlock(&m->statspslock);
#endif
atomic64_set(&ps->kernel_stats.bytes, ke->stats.bytes);
atomic64_set(&ps->kernel_stats.packets, ke->stats.packets);
atomic64_set(&ps->kernel_stats.errors, ke->stats.errors);
@ -1420,6 +1428,14 @@ static void callmaster_timer(void *ptr) {
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
}
#if (RE_HAS_MEASUREDELAY)
mutex_lock(&m->statspslock);
ps->kernel_stats.delay_min = ke->stats.delay_min;
ps->kernel_stats.delay_avg = ke->stats.delay_avg;
ps->kernel_stats.delay_max = ke->stats.delay_max;
mutex_unlock(&m->statspslock);
#endif
update = 0;
sink = packet_stream_sink(ps);
@ -2731,25 +2747,76 @@ void call_destroy(struct call *c) {
if (_log_facility_cdr) {
const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp";
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet));
if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet));
} else {
#if (RE_HAS_MEASUREDELAY)
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_delay_min=%llu.%09llu, "
"ml%i_midx%u_%s_delay_avg=%llu.%09llu, "
"ml%i_midx%u_%s_delay_max=%llu.%09llu, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_min.tv_sec, (unsigned long long) ps->stats.delay_min.tv_nsec,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_avg.tv_sec, (unsigned long long) ps->stats.delay_avg.tv_nsec,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_max.tv_sec, (unsigned long long) ps->stats.delay_max.tv_nsec);
#else
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet));
#endif
}
}
ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5hu%s, "


+ 4
- 3
daemon/call.h View File

@ -210,13 +210,13 @@ struct transport_protocol {
};
extern const struct transport_protocol transport_protocols[];
struct stats {
atomic64 packets;
atomic64 bytes;
atomic64 errors;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
};
struct totalstats {
@ -452,6 +452,7 @@ struct callmaster {
/* XXX rework these */
struct stats statsps; /* per second stats, running timer */
struct stats stats; /* copied from statsps once a second */
mutex_t statspslock;
struct totalstats totalstats;
struct totalstats totalstats_interval;
/* control_ng_stats stuff */


+ 34
- 1
daemon/cli.c View File

@ -14,6 +14,7 @@
#include "call.h"
#include "cli.h"
#include "rtpengine_config.h"
static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer ...\n";
@ -149,6 +150,37 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
if (PS_ISSET(ps, FALLBACK_RTCP))
continue;
#if (RE_HAS_MEASUREDELAY)
if (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
} else {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet, %llu.%09llu delay_min, %llu.%09llu delay_avg, %llu.%09llu delay_max\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet),
(unsigned long long) ps->stats.delay_min.tv_sec,
(unsigned long long) ps->stats.delay_min.tv_nsec,
(unsigned long long) ps->stats.delay_avg.tv_sec,
(unsigned long long) ps->stats.delay_avg.tv_nsec,
(unsigned long long) ps->stats.delay_max.tv_sec,
(unsigned long long) ps->stats.delay_max.tv_nsec);
}
#else
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n",
md->index,
@ -158,7 +190,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
atomic64_get(&ps->last_packet));
#endif
ADJUSTLEN(printlen,outbufend,replybuffer);
}
}


+ 0
- 1
daemon/kernel.c View File

@ -102,7 +102,6 @@ int kernel_del_stream(int fd, u_int16_t p) {
return -1;
}
GList *kernel_list(unsigned int id) {
char str[64];
int fd;


+ 1
- 0
daemon/main.c View File

@ -643,6 +643,7 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL);
thread_create_detach(poller_timer_loop, ctx.p);
if (graphite_ip)
thread_create_detach(graphite_loop, ctx.m);
thread_create_detach(ice_thread_run, NULL);


+ 15
- 0
kernel-module/rtpengine_config.h View File

@ -0,0 +1,15 @@
/*
* rtpengine_config.h
*
* Description: Config file with preprocessor config makros
*
* Created on: Mar 18, 2015
* Author: fmetz
*/
#ifndef RTPENGINE_CONFIG_H_
#define RTPENGINE_CONFIG_H_
#define RE_HAS_MEASUREDELAY 1
#endif /* RTPENGINE_CONFIG_H_ */

+ 99
- 5
kernel-module/xt_RTPENGINE.c View File

@ -29,6 +29,8 @@
#include "xt_RTPENGINE.h"
#endif
#include "rtpengine_config.h"
MODULE_LICENSE("GPL");
@ -147,6 +149,9 @@ struct rtpengine_stats_a {
atomic64_t packets;
atomic64_t bytes;
atomic64_t errors;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
};
struct rtpengine_rtp_stats_a {
atomic64_t packets;
@ -854,6 +859,9 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
op.stats.packets = atomic64_read(&g->stats.packets);
op.stats.bytes = atomic64_read(&g->stats.bytes);
op.stats.errors = atomic64_read(&g->stats.errors);
op.stats.delay_min = g->stats.delay_min;
op.stats.delay_max = g->stats.delay_max;
op.stats.delay_avg = g->stats.delay_avg;
for (i = 0; i < g->target.num_payload_types; i++) {
op.rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets);
@ -882,10 +890,6 @@ err:
return err;
}
static int proc_list_open(struct inode *i, struct file *f) {
int err;
struct seq_file *p;
@ -1474,6 +1478,9 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
atomic64_set(&g->stats.packets, atomic64_read(&og->stats.packets));
atomic64_set(&g->stats.bytes, atomic64_read(&og->stats.bytes));
atomic64_set(&g->stats.errors, atomic64_read(&og->stats.errors));
g->stats.delay_min = og->stats.delay_min;
g->stats.delay_max = og->stats.delay_max;
g->stats.delay_avg = og->stats.delay_avg;
for (j = 0; j < NUM_PAYLOAD_TYPES; j++) {
atomic64_set(&g->rtp_stats[j].packets, atomic64_read(&og->rtp_stats[j].packets));
@ -1816,6 +1823,7 @@ drop:
static int send_proxy_packet(struct sk_buff *skb, struct re_address *src, struct re_address *dst, unsigned char tos) {
if (src->family != dst->family)
goto drop;
@ -2184,7 +2192,48 @@ static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rt
return match - tg->payload_types;
}
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) {
static void re_timespec_subtract (struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec - (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec - (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_multiply(struct timespec *result, const struct timespec *a, const long long multiplier) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec * multiplier;
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_devide(struct timespec *result, const struct timespec *a, const long devisor) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec / devisor;
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_add(struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec + (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec + (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
/* Return negative, zero, positive if A < B, A == B, A > B, respectively.
Assume the nanosecond components are in range, or close to it. */
static int re_timespec_cmp (struct timespec *a, struct timespec *b)
{
return (a->tv_sec < b->tv_sec ? -1
: a->tv_sec > b->tv_sec ? 1
: a->tv_nsec - b->tv_nsec);
}
#if (RE_HAS_MEASUREDELAY)
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, struct timespec *starttime) {
#else
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) {
#endif
struct udphdr *uh;
struct rtpengine_target *g;
struct sk_buff *skb2;
@ -2194,6 +2243,10 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
struct rtp_parsed rtp;
u_int64_t pkt_idx = 0, pkt_idx_u;
#if (RE_HAS_MEASUREDELAY)
struct timespec endtime, delay;
#endif
skb_reset_transport_header(skb);
uh = udp_hdr(skb);
skb_pull(skb, sizeof(*uh));
@ -2308,6 +2361,29 @@ out:
if (rtp_pt_idx >= 0) {
atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets);
atomic64_add(datalen, &g->rtp_stats[rtp_pt_idx].bytes);
#if (RE_HAS_MEASUREDELAY)
getnstimeofday(&endtime);
re_timespec_subtract(&delay,&endtime, starttime);
if (atomic64_read(&g->stats.packets)==1) {
g->stats.delay_min=delay;
g->stats.delay_avg=delay;
g->stats.delay_max=delay;
} else {
if (re_timespec_cmp(&g->stats.delay_min,&delay)>0) {
g->stats.delay_min = delay;
}
if (re_timespec_cmp(&g->stats.delay_max,&delay)<0) {
g->stats.delay_max = delay;
}
re_timespec_multiply(&g->stats.delay_avg,&g->stats.delay_avg,atomic64_read(&g->stats.packets)-1);
re_timespec_add(&g->stats.delay_avg,&g->stats.delay_avg,&delay);
re_timespec_devide(&g->stats.delay_avg,&g->stats.delay_avg,atomic64_read(&g->stats.packets));
}
#endif
}
else if (rtp_pt_idx == -2)
/* not RTP */ ;
@ -2345,6 +2421,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
struct rtpengine_table *t;
struct re_address src;
#if (RE_HAS_MEASUREDELAY)
struct timespec starttime;
getnstimeofday(&starttime);
#endif
t = get_table(pinfo->id);
if (!t)
goto skip;
@ -2363,7 +2444,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
src.family = AF_INET;
src.u.ipv4 = ih->saddr;
#if (RE_HAS_MEASUREDELAY)
return rtpengine46(skb, t, &src, &starttime);
#else
return rtpengine46(skb, t, &src);
#endif
skip2:
kfree_skb(skb);
@ -2387,6 +2472,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
struct rtpengine_table *t;
struct re_address src;
#if (RE_HAS_MEASUREDELAY)
struct timespec starttime;
getnstimeofday(&starttime);
#endif
t = get_table(pinfo->id);
if (!t)
goto skip;
@ -2405,7 +2495,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
src.family = AF_INET6;
memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6));
#if (RE_HAS_MEASUREDELAY)
return rtpengine46(skb, t, &src, &starttime);
#else
return rtpengine46(skb, t, &src);
#endif
skip2:
kfree_skb(skb);


+ 4
- 1
kernel-module/xt_RTPENGINE.h View File

@ -15,6 +15,9 @@ struct rtpengine_stats {
u_int64_t packets;
u_int64_t bytes;
u_int64_t errors;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
};
struct rtpengine_rtp_stats {
u_int64_t packets;
@ -99,7 +102,7 @@ struct rtpengine_message {
MMG_NOOP = 1,
MMG_ADD,
MMG_DEL,
MMG_UPDATE,
MMG_UPDATE
} cmd;
struct rtpengine_target_info target;


Loading…
Cancel
Save