Browse Source

Merge branch 'master' of git://github.com/1and1/rtpengine into rfuchs/1and1-master

pull/114/head
Richard Fuchs 11 years ago
parent
commit
5785bea2c9
22 changed files with 1061 additions and 125 deletions
  1. +11
    -0
      README.md
  2. +2
    -1
      daemon/Makefile
  3. +106
    -20
      daemon/call.c
  4. +6
    -3
      daemon/call.h
  5. +34
    -1
      daemon/cli.c
  6. +117
    -18
      daemon/graphite.c
  7. +1
    -0
      daemon/graphite.h
  8. +0
    -1
      daemon/kernel.c
  9. +10
    -0
      daemon/log.c
  10. +2
    -0
      daemon/log.h
  11. +22
    -7
      daemon/main.c
  12. +72
    -3
      daemon/rtcp.c
  13. +79
    -1
      daemon/rtcp.h
  14. +160
    -0
      daemon/rtcp_xr.c
  15. +229
    -0
      daemon/rtcp_xr.h
  16. +3
    -1
      debian/ngcp-rtpengine-daemon.default
  17. +2
    -0
      debian/ngcp-rtpengine-daemon.init
  18. +10
    -0
      el/rtpengine.init
  19. +15
    -0
      kernel-module/rtpengine_config.h
  20. +108
    -7
      kernel-module/xt_RTPENGINE.c
  21. +5
    -1
      kernel-module/xt_RTPENGINE.h
  22. +67
    -61
      utils/rtpengine-ctl

+ 11
- 0
README.md View File

@ -177,6 +177,7 @@ option and which are reproduced below:
-L, --log-level=INT Mask log priorities above this level -L, --log-level=INT Mask log priorities above this level
--log-facility=daemon|local0|... Syslog facility to use for logging --log-facility=daemon|local0|... Syslog facility to use for logging
--log-facility-cdr=local0|... Syslog facility to use for logging CDRs --log-facility-cdr=local0|... Syslog facility to use for logging CDRs
--log-facility-rtcp=local0|... Syslog facility to use for logging RTCP data (take care of traffic amount)
-E, --log-stderr Log on stderr instead of syslog -E, --log-stderr Log on stderr instead of syslog
-x, --xmlrpc-format=INT XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only -x, --xmlrpc-format=INT XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only
--num-threads=INT Number of worker threads to create --num-threads=INT Number of worker threads to create
@ -185,6 +186,7 @@ option and which are reproduced below:
--dtls-passive Always prefer DTLS passive role --dtls-passive Always prefer DTLS passive role
-g, --graphite=[IP46:]PORT TCP address of graphite statistics server -g, --graphite=[IP46:]PORT TCP address of graphite statistics server
-w, --graphite-interval=INT Graphite data statistics send interval -w, --graphite-interval=INT Graphite data statistics send interval
--graphite-prefix=STRING Graphite prefix for every line
Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local
IP address through `--interface`, and at least one of the `--listen-...` options must be given. IP address through `--interface`, and at least one of the `--listen-...` options must be given.
@ -321,6 +323,11 @@ The options are described in more detail below.
Same as --log-facility with the difference that only CDRs are written to this log facility. Same as --log-facility with the difference that only CDRs are written to this log facility.
* --log-facilty-rtcp=daemon|local0|...|local7|...
Same as --log-facility with the difference that only RTCP data is written to this log facility.
Be careful with this parameter since there may be a lot of information written to it.
* -E, --log-stderr * -E, --log-stderr
Log to stderr instead of syslog. Only useful in combination with `--foreground`. Log to stderr instead of syslog. Only useful in combination with `--foreground`.
@ -363,6 +370,10 @@ The options are described in more detail below.
Interval of the time when information is sent to the graphite server. Interval of the time when information is sent to the graphite server.
* --graphite-prefix
Add a prefix for every graphite line.
A typical command line (enabling both UDP and NG protocols) thus may look like: A typical command line (enabling both UDP and NG protocols) thus may look like:
/usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \ /usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \


+ 2
- 1
daemon/Makefile View File

@ -63,7 +63,8 @@ endif
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c rtcp_xr.c
OBJS= $(SRCS:.c=.o) OBJS= $(SRCS:.c=.o)


+ 106
- 20
daemon/call.c View File

@ -16,6 +16,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <time.h> #include <time.h>
#include <sys/time.h> #include <sys/time.h>
#include <inttypes.h>
#include "poller.h" #include "poller.h"
#include "aux.h" #include "aux.h"
@ -33,7 +34,7 @@
#include "rtp.h" #include "rtp.h"
#include "call_interfaces.h" #include "call_interfaces.h"
#include "ice.h" #include "ice.h"
#include "rtpengine_config.h"
@ -702,6 +703,9 @@ loop_ok:
if (!sink && PS_ISSET(stream, RTCP)) { if (!sink && PS_ISSET(stream, RTCP)) {
sink = stream->rtcp_sink; sink = stream->rtcp_sink;
rtcp = 1; rtcp = 1;
if (_log_facility_rtcp) {
parse_and_log_rtcp_report(sfd, s->s, s->len);
}
} }
else if (stream->rtcp_sink) { else if (stream->rtcp_sink) {
muxed_rtcp = rtcp_demux(s, media); muxed_rtcp = rtcp_demux(s, media);
@ -1413,12 +1417,25 @@ static void callmaster_timer(void *ptr) {
DS(bytes); DS(bytes);
DS(errors); DS(errors);
if (ke->stats.packets != atomic64_get(&ps->kernel_stats.packets)) if (ke->stats.packets != atomic64_get(&ps->kernel_stats.packets))
atomic64_set(&ps->last_packet, poller_now); atomic64_set(&ps->last_packet, poller_now);
atomic64_set(&ps->stats.in_tos_tclass, ke->stats.in_tos);
atomic64_set(&m->statsps.in_tos_tclass, ke->stats.in_tos);
#if (RE_HAS_MEASUREDELAY)
mutex_lock(&m->statspslock);
ps->stats.delay_min = m->statsps.delay_min = ke->stats.delay_min;
ps->stats.delay_avg = m->statsps.delay_avg = ke->stats.delay_avg;
ps->stats.delay_max = m->statsps.delay_max = ke->stats.delay_max;
mutex_unlock(&m->statspslock);
#endif
atomic64_set(&ps->kernel_stats.bytes, ke->stats.bytes); atomic64_set(&ps->kernel_stats.bytes, ke->stats.bytes);
atomic64_set(&ps->kernel_stats.packets, ke->stats.packets); atomic64_set(&ps->kernel_stats.packets, ke->stats.packets);
atomic64_set(&ps->kernel_stats.errors, ke->stats.errors); atomic64_set(&ps->kernel_stats.errors, ke->stats.errors);
atomic64_set(&ps->kernel_stats.in_tos_tclass, ke->stats.in_tos);
for (j = 0; j < ke->target.num_payload_types; j++) { for (j = 0; j < ke->target.num_payload_types; j++) {
pt = ke->target.payload_types[j]; pt = ke->target.payload_types[j];
@ -1435,6 +1452,14 @@ static void callmaster_timer(void *ptr) {
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
} }
#if (RE_HAS_MEASUREDELAY)
mutex_lock(&m->statspslock);
ps->kernel_stats.delay_min = ke->stats.delay_min;
ps->kernel_stats.delay_avg = ke->stats.delay_avg;
ps->kernel_stats.delay_max = ke->stats.delay_max;
mutex_unlock(&m->statspslock);
#endif
update = 0; update = 0;
sink = packet_stream_sink(ps); sink = packet_stream_sink(ps);
@ -2796,25 +2821,86 @@ void call_destroy(struct call *c) {
if (_log_facility_cdr) { if (_log_facility_cdr) {
const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp"; const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp";
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet));
if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.in_tos_tclass));
} else {
#if (RE_HAS_MEASUREDELAY)
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", "
"ml%i_midx%u_%s_delay_min=%llu.%09llu, "
"ml%i_midx%u_%s_delay_avg=%llu.%09llu, "
"ml%i_midx%u_%s_delay_max=%llu.%09llu, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.in_tos_tclass),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_min.tv_sec, (unsigned long long) ps->stats.delay_min.tv_nsec,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_avg.tv_sec, (unsigned long long) ps->stats.delay_avg.tv_nsec,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_max.tv_sec, (unsigned long long) ps->stats.delay_max.tv_nsec);
#else
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets="UINT64F", "
"ml%i_midx%u_%s_relayed_bytes="UINT64F", "
"ml%i_midx%u_%s_relayed_errors="UINT64F", "
"ml%i_midx%u_%s_last_packet="UINT64F", "
"ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.packets),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.bytes),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->stats.in_tos_tclass));
#endif
}
} }
ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5hu%s, " ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5hu%s, "


+ 6
- 3
daemon/call.h View File

@ -210,13 +210,14 @@ struct transport_protocol {
}; };
extern const struct transport_protocol transport_protocols[]; extern const struct transport_protocol transport_protocols[];
struct stats { struct stats {
atomic64 packets; atomic64 packets;
atomic64 bytes; atomic64 bytes;
atomic64 errors; atomic64 errors;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
atomic64 in_tos_tclass;
}; };
struct totalstats { struct totalstats {
@ -284,6 +285,7 @@ struct rtp_stats {
atomic64 bytes; atomic64 bytes;
atomic64 kernel_packets; atomic64 kernel_packets;
atomic64 kernel_bytes; atomic64 kernel_bytes;
atomic64 in_tos_tclass;
}; };
struct packet_stream { struct packet_stream {
@ -452,6 +454,7 @@ struct callmaster {
/* XXX rework these */ /* XXX rework these */
struct stats statsps; /* per second stats, running timer */ struct stats statsps; /* per second stats, running timer */
struct stats stats; /* copied from statsps once a second */ struct stats stats; /* copied from statsps once a second */
mutex_t statspslock;
struct totalstats totalstats; struct totalstats totalstats;
struct totalstats totalstats_interval; struct totalstats totalstats_interval;
/* control_ng_stats stuff */ /* control_ng_stats stuff */


+ 34
- 1
daemon/cli.c View File

@ -14,6 +14,7 @@
#include "call.h" #include "call.h"
#include "cli.h" #include "cli.h"
#include "rtpengine_config.h"
static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer ...\n"; static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer ...\n";
@ -151,6 +152,37 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
if (PS_ISSET(ps, FALLBACK_RTCP)) if (PS_ISSET(ps, FALLBACK_RTCP))
continue; continue;
#if (RE_HAS_MEASUREDELAY)
if (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
} else {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet, %llu.%09llu delay_min, %llu.%09llu delay_avg, %llu.%09llu delay_max\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet),
(unsigned long long) ps->stats.delay_min.tv_sec,
(unsigned long long) ps->stats.delay_min.tv_nsec,
(unsigned long long) ps->stats.delay_avg.tv_sec,
(unsigned long long) ps->stats.delay_avg.tv_nsec,
(unsigned long long) ps->stats.delay_max.tv_sec,
(unsigned long long) ps->stats.delay_max.tv_nsec);
}
#else
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n", ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n",
md->index, md->index,
@ -160,7 +192,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
atomic64_get(&ps->stats.packets), atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes), atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors), atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
atomic64_get(&ps->last_packet));
#endif
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
} }
} }


+ 117
- 18
daemon/graphite.c View File

@ -9,21 +9,58 @@
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <string.h>
#include "log.h" #include "log.h"
#include "call.h" #include "call.h"
#include "graphite.h"
static int graphite_sock=-1; static int graphite_sock=-1;
static int connectinprogress=0;
static u_int32_t graphite_ipaddress; static u_int32_t graphite_ipaddress;
static int graphite_port=0; static int graphite_port=0;
static struct callmaster* cm=0; static struct callmaster* cm=0;
//struct totalstats totalstats_prev; //struct totalstats totalstats_prev;
static time_t next_run; static time_t next_run;
// HEAD: static time_t g_now, next_run;
static char* graphite_prefix = NULL;
void set_prefix(char* prefix) {
graphite_prefix = prefix;
}
/**
* Set a file descriptor to blocking or non-blocking mode.
*
* @param fd The file descriptor
* @param blocking 0:non-blocking mode, 1:blocking mode
*
* @return 1:success, 0:failure.
**/
int fd_set_blocking(int fd, int blocking) {
/* Save the current flags */
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
return 0;
if (blocking)
flags &= ~O_NONBLOCK;
else
flags |= O_NONBLOCK;
return fcntl(fd, F_SETFL, flags) != -1;
}
int connect_to_graphite_server(u_int32_t ipaddress, int port) { int connect_to_graphite_server(u_int32_t ipaddress, int port) {
if (graphite_sock>0)
close(graphite_sock);
graphite_sock=-1; graphite_sock=-1;
//int reconnect=0;
int rc=0; int rc=0;
struct sockaddr_in sin; struct sockaddr_in sin;
memset(&sin,0,sizeof(sin)); memset(&sin,0,sizeof(sin));
@ -32,9 +69,9 @@ int connect_to_graphite_server(u_int32_t ipaddress, int port) {
graphite_ipaddress = ipaddress; graphite_ipaddress = ipaddress;
graphite_port = port; graphite_port = port;
rc = graphite_sock = socket(AF_INET, SOCK_STREAM,0);
if(rc<0) {
ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite.");
graphite_sock = socket(AF_INET, SOCK_STREAM,0);
if(graphite_sock<0) {
ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite.Reason:%s\n",strerror(errno));
return -1; return -1;
} }
@ -48,18 +85,26 @@ int connect_to_graphite_server(u_int32_t ipaddress, int port) {
goto error; goto error;
} }
rc = fd_set_blocking(graphite_sock,0);
if (!rc) {
ilog(LOG_ERROR,"Could not set the socket to nonblocking.");
goto error;
}
struct in_addr ip; struct in_addr ip;
ip.s_addr = graphite_ipaddress; ip.s_addr = graphite_ipaddress;
ilog(LOG_INFO, "Connecting to graphite server %s at port:%i with fd:%i",inet_ntoa(ip),graphite_port,graphite_sock); ilog(LOG_INFO, "Connecting to graphite server %s at port:%i with fd:%i",inet_ntoa(ip),graphite_port,graphite_sock);
rc = connect(graphite_sock, (struct sockaddr *)&sin, sizeof(sin)); rc = connect(graphite_sock, (struct sockaddr *)&sin, sizeof(sin));
if (rc==-1) { if (rc==-1) {
ilog(LOG_ERROR, "Connection could not be established. Trying again next time of graphite-interval.");
ilog(LOG_WARN, "Connection information:%s\n",strerror(errno));
if (errno==EINPROGRESS) {
connectinprogress=1;
return 0;
}
goto error; goto error;
} }
ilog(LOG_INFO, "Graphite server connected.");
return graphite_sock;
return 0;
error: error:
close(graphite_sock); close(graphite_sock);
@ -86,7 +131,6 @@ int send_graphite_data() {
char data_to_send[8192]; char data_to_send[8192];
char* ptr = data_to_send; char* ptr = data_to_send;
struct totalstats ts; struct totalstats ts;
/* atomically copy values to stack and reset to zero */ /* atomically copy values to stack and reset to zero */
@ -102,20 +146,31 @@ int send_graphite_data() {
mutex_lock(&cm->totalstats_interval.total_average_lock); mutex_lock(&cm->totalstats_interval.total_average_lock);
ts.total_average_call_dur = cm->totalstats_interval.total_average_call_dur; ts.total_average_call_dur = cm->totalstats_interval.total_average_call_dur;
ts.total_managed_sess = cm->totalstats_interval.total_managed_sess; ts.total_managed_sess = cm->totalstats_interval.total_managed_sess;
ZERO(ts.total_average_call_dur);
ZERO(ts.total_managed_sess);
ZERO(cm->totalstats_interval.total_average_call_dur);
ZERO(cm->totalstats_interval.total_managed_sess);
mutex_unlock(&cm->totalstats_interval.total_average_lock); mutex_unlock(&cm->totalstats_interval.total_average_lock);
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) ts.total_average_call_dur.tv_sec,(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) ts.total_average_call_dur.tv_sec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %lu %llu\n",hostname, ts.total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %lu %llu\n",hostname, ts.total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts.total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts.total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.regular_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_regular_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.regular_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_regular_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.relayed_errors "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_errors),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.relayed_errors "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_errors),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.relayed_packets "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_packets),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.relayed_packets "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_relayed_packets),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.silent_timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_silent_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.silent_timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_silent_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc; rc = sprintf(ptr,"%s.totals.timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
rc = write(graphite_sock, data_to_send, ptr - data_to_send); rc = write(graphite_sock, data_to_send, ptr - data_to_send);
@ -133,29 +188,73 @@ error:
void graphite_loop_run(struct callmaster* callmaster, int seconds) { void graphite_loop_run(struct callmaster* callmaster, int seconds) {
int rc=0; int rc=0;
fd_set wfds;
FD_ZERO(&wfds);
struct timeval tv;
int optval=0;
socklen_t optlen=sizeof(optval);
if (connectinprogress && graphite_sock>0) {
FD_SET(graphite_sock,&wfds);
tv.tv_sec = 0;
tv.tv_usec = 1000000;
rc = select (graphite_sock+1, NULL, &wfds, NULL, &tv);
if ((rc == -1) && (errno == EINTR)) {
ilog(LOG_ERROR,"Error on the socket.");
close(graphite_sock);
graphite_sock=-1;connectinprogress=0;
return;
} else if (rc==0) {
// timeout
return;
} else {
if (!FD_ISSET(graphite_sock,&wfds)) {
ilog(LOG_WARN,"fd active but not the graphite fd.");
close(graphite_sock);
graphite_sock=-1;connectinprogress=0;
return;
}
rc = getsockopt(graphite_sock, SOL_SOCKET, SO_ERROR, &optval, &optlen);
if (rc) ilog(LOG_ERROR,"getsockopt failure.");
if (optval != 0) {
ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock, strerror(optval));
close(graphite_sock);
graphite_sock=-1;connectinprogress=0;
return;
}
ilog(LOG_INFO, "Graphite server connected.");
connectinprogress=0;
next_run=0; // fake next run to skip sleep after reconnect
}
}
gettimeofday(&g_now, NULL); gettimeofday(&g_now, NULL);
if (g_now.tv_sec < next_run)
goto sleep;
if (g_now.tv_sec < next_run) {
usleep(100000);
return;
}
next_run = g_now.tv_sec + seconds; next_run = g_now.tv_sec + seconds;
if (!cm) if (!cm)
cm = callmaster; cm = callmaster;
if (graphite_sock < 0) {
if (graphite_sock < 0 && !connectinprogress) {
rc = connect_to_graphite_server(graphite_ipaddress, graphite_port); rc = connect_to_graphite_server(graphite_ipaddress, graphite_port);
if (rc) {
close(graphite_sock);
graphite_sock=-1;
}
} }
if (rc>=0) {
if (graphite_sock>0 && !connectinprogress) {
rc = send_graphite_data(); rc = send_graphite_data();
if (rc<0) { if (rc<0) {
ilog(LOG_ERROR,"Sending graphite data failed."); ilog(LOG_ERROR,"Sending graphite data failed.");
} }
} }
sleep:
usleep(100000);
} }
void graphite_loop(void *d) { void graphite_loop(void *d) {
@ -166,7 +265,7 @@ void graphite_loop(void *d) {
cm->conf.graphite_interval=1; cm->conf.graphite_interval=1;
} }
connect_to_graphite_server(cm->conf.graphite_ip,graphite_port);
connect_to_graphite_server(cm->conf.graphite_ip,cm->conf.graphite_port);
while (!g_shutdown) while (!g_shutdown)
graphite_loop_run(cm,cm->conf.graphite_interval); // time in seconds graphite_loop_run(cm,cm->conf.graphite_interval); // time in seconds


+ 1
- 0
daemon/graphite.h View File

@ -13,6 +13,7 @@
int connect_to_graphite_server(u_int32_t ipaddress, int port); int connect_to_graphite_server(u_int32_t ipaddress, int port);
int send_graphite_data(); int send_graphite_data();
void graphite_loop_run(struct callmaster* cm, int seconds); void graphite_loop_run(struct callmaster* cm, int seconds);
void set_prefix(char* prefix);
void graphite_loop(void *d); void graphite_loop(void *d);
#endif /* GRAPHITE_H_ */ #endif /* GRAPHITE_H_ */

+ 0
- 1
daemon/kernel.c View File

@ -102,7 +102,6 @@ int kernel_del_stream(int fd, u_int16_t p) {
return -1; return -1;
} }
GList *kernel_list(unsigned int id) { GList *kernel_list(unsigned int id) {
char str[64]; char str[64];
int fd; int fd;


+ 10
- 0
daemon/log.c View File

@ -67,6 +67,7 @@ static const char* const prio_str[] = {
gboolean _log_stderr = 0; gboolean _log_stderr = 0;
int _log_facility = LOG_DAEMON; int _log_facility = LOG_DAEMON;
int _log_facility_cdr = 0; int _log_facility_cdr = 0;
int _log_facility_rtcp = 0;
static GHashTable *__log_limiter; static GHashTable *__log_limiter;
@ -204,6 +205,7 @@ static unsigned int log_limiter_entry_hash(const void *p) {
const struct log_limiter_entry *lle = p; const struct log_limiter_entry *lle = p;
return g_str_hash(lle->msg) ^ g_str_hash(lle->prefix); return g_str_hash(lle->msg) ^ g_str_hash(lle->prefix);
} }
static int log_limiter_entry_equal(const void *a, const void *b) { static int log_limiter_entry_equal(const void *a, const void *b) {
const struct log_limiter_entry *A = a, *B = b; const struct log_limiter_entry *A = a, *B = b;
if (!g_str_equal(A->msg, B->msg)) if (!g_str_equal(A->msg, B->msg))
@ -213,6 +215,14 @@ static int log_limiter_entry_equal(const void *a, const void *b) {
return 1; return 1;
} }
void rtcplog(const char* cdrbuffer) {
int previous;
int mask = LOG_MASK (LOG_INFO);
previous = setlogmask(mask);
syslog(LOG_INFO | _log_facility_rtcp, "%s", cdrbuffer);
setlogmask(previous);
}
void log_init() { void log_init() {
mutex_init(&__log_limiter_lock); mutex_init(&__log_limiter_lock);
__log_limiter = g_hash_table_new(log_limiter_entry_hash, log_limiter_entry_equal); __log_limiter = g_hash_table_new(log_limiter_entry_hash, log_limiter_entry_equal);


+ 2
- 0
daemon/log.h View File

@ -31,6 +31,7 @@ struct log_info {
extern gboolean _log_stderr; extern gboolean _log_stderr;
extern int _log_facility; extern int _log_facility;
extern int _log_facility_cdr; extern int _log_facility_cdr;
extern int _log_facility_rtcp;
typedef struct _fac_code { typedef struct _fac_code {
@ -69,6 +70,7 @@ void __ilog(int prio, const char *fmt, ...)__attribute__ ((format (printf, 2, 3)
#endif #endif
void cdrlog(const char* cdrbuffer); void cdrlog(const char* cdrbuffer);
void rtcplog(const char* cdrbuffer);
#include "obj.h" #include "obj.h"


+ 22
- 7
daemon/main.c View File

@ -257,9 +257,11 @@ static void options(int *argc, char ***argv) {
char *listenngs = NULL; char *listenngs = NULL;
char *listencli = NULL; char *listencli = NULL;
char *graphitep = NULL; char *graphitep = NULL;
char *graphite_prefix_s = NULL;
char *redisps = NULL; char *redisps = NULL;
char *log_facility_s = NULL; char *log_facility_s = NULL;
char *log_facility_cdr_s = NULL;
char *log_facility_cdr_s = NULL;
char *log_facility_rtcp_s = NULL;
int version = 0; int version = 0;
int sip_source = 0; int sip_source = 0;
@ -274,6 +276,7 @@ static void options(int *argc, char ***argv) {
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" }, { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" },
{ "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, { "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" },
{ "graphite-prefix",0, 0, G_OPTION_ARG_STRING, &graphite_prefix_s, "Prefix for graphite line", "STRING"},
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" },
@ -287,6 +290,7 @@ static void options(int *argc, char ***argv) {
{ "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level,"Mask log priorities above this level","INT" }, { "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level,"Mask log priorities above this level","INT" },
{ "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"},
{ "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"}, { "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"},
{ "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"},
{ "log-stderr", 'E', 0, G_OPTION_ARG_NONE, &_log_stderr, "Log on stderr instead of syslog", NULL }, { "log-stderr", 'E', 0, G_OPTION_ARG_NONE, &_log_stderr, "Log on stderr instead of syslog", NULL },
{ "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &xmlrpc_fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" }, { "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &xmlrpc_fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" },
{ "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads to create", "INT" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads to create", "INT" },
@ -340,6 +344,9 @@ static void options(int *argc, char ***argv) {
die("Invalid IP or port (--graphite)"); die("Invalid IP or port (--graphite)");
} }
if (graphite_prefix_s)
set_prefix(graphite_prefix_s);
if (tos < 0 || tos > 255) if (tos < 0 || tos > 255)
die("Invalid TOS value"); die("Invalid TOS value");
@ -369,12 +376,19 @@ static void options(int *argc, char ***argv) {
} }
} }
if (log_facility_cdr_s) {
if (!parse_log_facility(log_facility_cdr_s, &_log_facility_cdr)) {
print_available_log_facilities();
die ("Invalid log facility for CDR '%s' (--log-facility-cdr)\n", log_facility_cdr_s);
}
}
if (log_facility_cdr_s) {
if (!parse_log_facility(log_facility_cdr_s, &_log_facility_cdr)) {
print_available_log_facilities();
die ("Invalid log facility for CDR '%s' (--log-facility-cdr)\n", log_facility_cdr_s);
}
}
if (log_facility_rtcp_s) {
if (!parse_log_facility(log_facility_rtcp_s, &_log_facility_rtcp)) {
print_available_log_facilities();
die ("Invalid log facility for RTCP '%s' (--log-facility-rtcp)\n", log_facility_rtcp_s);
}
}
if (_log_stderr) { if (_log_stderr) {
write_log = log_to_stderr; write_log = log_to_stderr;
@ -638,6 +652,7 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL); thread_create_detach(sighandler, NULL);
thread_create_detach(poller_timer_loop, ctx.p); thread_create_detach(poller_timer_loop, ctx.p);
if (graphite_ip) if (graphite_ip)
thread_create_detach(graphite_loop, ctx.m); thread_create_detach(graphite_loop, ctx.m);
thread_create_detach(ice_thread_run, NULL); thread_create_detach(ice_thread_run, NULL);


+ 72
- 3
daemon/rtcp.c View File

@ -11,6 +11,7 @@
#include "log.h" #include "log.h"
#include "rtp.h" #include "rtp.h"
#include "crypto.h" #include "crypto.h"
#include "rtcp_xr.h"
@ -26,9 +27,6 @@
#define SRTCP_R_LENGTH 6 #define SRTCP_R_LENGTH 6
#endif #endif
#define RTCP_PT_SR 200 /* sender report */ #define RTCP_PT_SR 200 /* sender report */
#define RTCP_PT_RR 201 /* receiver report */ #define RTCP_PT_RR 201 /* receiver report */
#define RTCP_PT_SDES 202 /* source description */ #define RTCP_PT_SDES 202 /* source description */
@ -36,6 +34,7 @@
#define RTCP_PT_APP 204 /* application specific */ #define RTCP_PT_APP 204 /* application specific */
#define RTCP_PT_RTPFB 205 /* transport layer feedback message (RTP/AVPF) */ #define RTCP_PT_RTPFB 205 /* transport layer feedback message (RTP/AVPF) */
#define RTCP_PT_PSFB 206 /* payload-specific feedback message (RTP/AVPF) */ #define RTCP_PT_PSFB 206 /* payload-specific feedback message (RTP/AVPF) */
#define RTCP_PT_XR 207
#define SDES_TYPE_END 0 #define SDES_TYPE_END 0
#define SDES_TYPE_CNAME 1 #define SDES_TYPE_CNAME 1
@ -481,3 +480,73 @@ int rtcp_demux_is_rtcp(const str *s) {
return 0; return 0;
return 1; return 1;
} }
void print_rtcp_common(char** cdrbufcur, const pjmedia_rtcp_common *common) {
*cdrbufcur += sprintf(*cdrbufcur,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ",
common->version,
common->p,
common->count,
common->pt,
common->length,
ntohl(common->ssrc));
}
void print_rtcp_sr(char** cdrbufcur, const pjmedia_rtcp_sr* sr) {
*cdrbufcur += sprintf(*cdrbufcur,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, sender_bytes=%u, ",
ntohl(sr->ntp_sec),
ntohl(sr->ntp_frac),
ntohl(sr->rtp_ts),
ntohl(sr->sender_pcount),
ntohl(sr->sender_bcount));
}
void print_rtcp_rr(char** cdrbufcur, const pjmedia_rtcp_rr* rr) {
/* Get packet loss */
u_int32_t packet_loss=0;
packet_loss = (rr->total_lost_2 << 16) +
(rr->total_lost_1 << 8) +
rr->total_lost_0;
*cdrbufcur += sprintf(*cdrbufcur,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ",
ntohl(rr->ssrc),
ntohl(rr->fract_lost),
ntohl(packet_loss),
ntohl(rr->last_seq),
ntohl(rr->jitter),
ntohl(rr->lsr),
ntohl(rr->dlsr));
}
void parse_and_log_rtcp_report(struct stream_fd *sfd, const void *pkt, long size) {
static const int CDRBUFLENGTH = 1024*1024*1; // 1 MB
char cdrbuffer[CDRBUFLENGTH]; memset(&cdrbuffer,0,CDRBUFLENGTH);
char* cdrbufcur = cdrbuffer;
pjmedia_rtcp_common *common = (pjmedia_rtcp_common*) pkt;
const pjmedia_rtcp_rr *rr = NULL;
const pjmedia_rtcp_sr *sr = NULL;
cdrbufcur += sprintf(cdrbufcur,"[%s] ",sfd->stream->call->callid);
print_rtcp_common(&cdrbufcur,common);
/* Parse RTCP */
if (common->pt == RTCP_PT_SR) {
sr = (pjmedia_rtcp_sr*) (((char*)pkt) + sizeof(pjmedia_rtcp_common));
print_rtcp_sr(&cdrbufcur,sr);
if (common->count > 0 && size >= (sizeof(pjmedia_rtcp_sr_pkt))) {
rr = (pjmedia_rtcp_rr*)(((char*)pkt) + (sizeof(pjmedia_rtcp_common)
+ sizeof(pjmedia_rtcp_sr)));
print_rtcp_rr(&cdrbufcur,rr);
}
} else if (common->pt == RTCP_PT_RR && common->count > 0) {
rr = (pjmedia_rtcp_rr*)(((char*)pkt) + sizeof(pjmedia_rtcp_common));
print_rtcp_rr(&cdrbufcur,rr);
} else if (common->pt == RTCP_PT_XR) {
pjmedia_rtcp_xr_rx_rtcp_xr(&cdrbufcur, pkt, size);
}
rtcplog(cdrbuffer);
}

+ 79
- 1
daemon/rtcp.h View File

@ -2,6 +2,7 @@
#define _RTCP_H_ #define _RTCP_H_
#include "str.h" #include "str.h"
#include "call.h"
struct crypto_context; struct crypto_context;
@ -19,6 +20,83 @@ struct rtcp_packet {
u_int32_t ssrc; u_int32_t ssrc;
} __attribute__ ((packed)); } __attribute__ ((packed));
/**
* RTCP sender report.
*/
typedef struct pjmedia_rtcp_sr
{
u_int32_t ntp_sec; /**< NTP time, seconds part. */
u_int32_t ntp_frac; /**< NTP time, fractions part. */
u_int32_t rtp_ts; /**< RTP timestamp. */
u_int32_t sender_pcount; /**< Sender packet cound. */
u_int32_t sender_bcount; /**< Sender octet/bytes count. */
} pjmedia_rtcp_sr;
/**
* RTCP receiver report.
*/
typedef struct pjmedia_rtcp_rr
{
u_int32_t ssrc; /**< SSRC identification. */
#if defined(PJ_IS_BIG_ENDIAN) && PJ_IS_BIG_ENDIAN!=0
u_int32_t fract_lost:8; /**< Fraction lost. */
u_int32_t total_lost_2:8; /**< Total lost, bit 16-23. */
u_int32_t total_lost_1:8; /**< Total lost, bit 8-15. */
u_int32_t total_lost_0:8; /**< Total lost, bit 0-7. */
#else
u_int32_t fract_lost:8; /**< Fraction lost. */
u_int32_t total_lost_2:8; /**< Total lost, bit 0-7. */
u_int32_t total_lost_1:8; /**< Total lost, bit 8-15. */
u_int32_t total_lost_0:8; /**< Total lost, bit 16-23. */
#endif
u_int32_t last_seq; /**< Last sequence number. */
u_int32_t jitter; /**< Jitter. */
u_int32_t lsr; /**< Last SR. */
u_int32_t dlsr; /**< Delay since last SR. */
} pjmedia_rtcp_rr;
/**
* RTCP common header.
*/
typedef struct pjmedia_rtcp_common
{
#if defined(PJ_IS_BIG_ENDIAN) && PJ_IS_BIG_ENDIAN!=0
unsigned version:2; /**< packet type */
unsigned p:1; /**< padding flag */
unsigned count:5; /**< varies by payload type */
unsigned pt:8; /**< payload type */
#else
unsigned count:5; /**< varies by payload type */
unsigned p:1; /**< padding flag */
unsigned version:2; /**< packet type */
unsigned pt:8; /**< payload type */
#endif
unsigned length:16; /**< packet length */
u_int32_t ssrc; /**< SSRC identification */
} pjmedia_rtcp_common;
/**
* This structure declares default RTCP packet (SR) that is sent by pjmedia.
* Incoming RTCP packet may have different format, and must be parsed
* manually by application.
*/
typedef struct pjmedia_rtcp_sr_pkt
{
pjmedia_rtcp_common common; /**< Common header. */
pjmedia_rtcp_sr sr; /**< Sender report. */
pjmedia_rtcp_rr rr; /**< variable-length list */
} pjmedia_rtcp_sr_pkt;
/**
* This structure declares RTCP RR (Receiver Report) packet.
*/
typedef struct pjmedia_rtcp_rr_pkt
{
pjmedia_rtcp_common common; /**< Common header. */
pjmedia_rtcp_rr rr; /**< variable-length list */
} pjmedia_rtcp_rr_pkt;
int rtcp_avpf2avp(str *); int rtcp_avpf2avp(str *);
@ -27,6 +105,6 @@ int rtcp_savp2avp(str *, struct crypto_context *);
int rtcp_demux_is_rtcp(const str *); int rtcp_demux_is_rtcp(const str *);
void parse_and_log_rtcp_report(struct stream_fd *sfd, const void *pkt, long size);
#endif #endif

+ 160
- 0
daemon/rtcp_xr.c View File

@ -0,0 +1,160 @@
/*
* rtcp_xr.c
*
* Created on: Mar 29, 2015
* Author: fmetz
*/
#include <stdio.h>
#include <arpa/inet.h>
#include "rtcp_xr.h"
/* RTCP XR payload type */
#define RTCP_XR 207
/* RTCP XR block types */
#define BT_LOSS_RLE 1
#define BT_DUP_RLE 2
#define BT_RCPT_TIMES 3
#define BT_RR_TIME 4
#define BT_DLRR 5
#define BT_STATS 6
#define BT_VOIP_METRICS 7
void print_rtcp_xr_common(char* cdrbufcur,const pjmedia_rtcp_xr_pkt *rtcp_xr) {
cdrbufcur += sprintf(cdrbufcur,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ",
rtcp_xr->common.version,
rtcp_xr->common.p,
rtcp_xr->common.count,
rtcp_xr->common.pt,
rtcp_xr->common.length,
ntohl(rtcp_xr->common.ssrc));
}
void print_rtcp_xr_rb_header(char* cdrbufcur,const pjmedia_rtcp_xr_rb_header *rb_header) {
cdrbufcur += sprintf(cdrbufcur,"rb_header_blocktype=%u, rb_header_blockspecdata=%u, rb_header_blocklength=%u, ",
rb_header->bt,
rb_header->specific,
ntohs(rb_header->length));
}
void print_rtcp_xr_rb_rr_time(char* cdrbufcur,const pjmedia_rtcp_xr_rb_rr_time *rb_rr_time) {
print_rtcp_xr_rb_header(cdrbufcur,&rb_rr_time->header);
cdrbufcur += sprintf(cdrbufcur,"rb_rr_time_ntp_sec=%u, rb_rr_time_ntp_frac=%u, ",
ntohl(rb_rr_time->ntp_sec),
ntohl(rb_rr_time->ntp_frac));
}
void print_rtcp_xr_rb_dlrr(char* cdrbufcur,const pjmedia_rtcp_xr_rb_dlrr *rb_dlrr) {
print_rtcp_xr_rb_header(cdrbufcur,&rb_dlrr->header);
cdrbufcur += sprintf(cdrbufcur,"rb_dlrr_ssrc=%u, rb_dlrr_lrr=%u, rb_dlrr_dlrr=%u, ",
ntohl(rb_dlrr->item.ssrc),
ntohl(rb_dlrr->item.lrr),
ntohl(rb_dlrr->item.dlrr));
}
void print_rtcp_xr_rb_stats(char* cdrbufcur,const pjmedia_rtcp_xr_rb_stats *rb_stats) {
print_rtcp_xr_rb_header(cdrbufcur,&rb_stats->header);
cdrbufcur += sprintf(cdrbufcur,"rb_stats_ssrc=%u, rb_stats_begin_seq=%u, rb_stats_end_seq=%u, rb_stats_lost_packets=%u, rb_stats_duplicate_packets=%u,"
"rb_stats_jitter_min=%u, rb_stats_jitter_max=%u, rb_stats_jitter_mean=%u, rb_stats_jitter_deviation=%u,"
"rb_stats_toh_min=%u, rb_stats_toh_max=%u, rb_stats_toh_mean=%u, rb_stats_toh_deviation=%u, ",
ntohl(rb_stats->ssrc),
ntohs(rb_stats->begin_seq),
ntohl(rb_stats->end_seq),
ntohl(rb_stats->lost),
ntohl(rb_stats->dup),
ntohl(rb_stats->jitter_min),
ntohl(rb_stats->jitter_max),
ntohl(rb_stats->jitter_mean),
ntohl(rb_stats->jitter_dev),
ntohl(rb_stats->toh_min),
ntohl(rb_stats->toh_max),
ntohl(rb_stats->toh_mean),
ntohl(rb_stats->toh_dev));
}
void print_rtcp_xr_rb_voip_mtc(char* cdrbufcur,const pjmedia_rtcp_xr_rb_voip_mtc *rb_voip_mtc) {
print_rtcp_xr_rb_header(cdrbufcur,&rb_voip_mtc->header);
cdrbufcur += sprintf(cdrbufcur,"rb_voip_mtc_ssrc=%u, rb_voip_mtc_loss_rate=%u, rb_voip_mtc_discard_rate=%u, rb_voip_mtc_burst_den=%u, "
"rb_voip_mtc_gap_den=%u, rb_voip_mtc_burst_dur=%u, rb_voip_mtc_gap_dur=%u, rb_voip_mtc_rnd_trip_delay=%u, "
"rb_voip_mtc_end_sys_delay=%u, rb_voip_mtc_signal_lvl=%u, rb_voip_mtc_noise_lvl=%u, rb_voip_mtc_rerl=%u, "
"rb_voip_mtc_gmin=%u, rb_voip_mtc_r_factor=%u, rb_voip_mtc_ext_r_factor=%u, rb_voip_mtc_mos_lq=%u, "
"rb_voip_mtc_mos_cq=%u, rb_voip_mtc_rx_config=%u, rb_voip_mtc_jb_nom=%u, rb_voip_mtc_jb_max=%u, "
"rb_voip_mtc_jb_abs_max=%u, ",
ntohl(rb_voip_mtc->ssrc),
rb_voip_mtc->loss_rate,
rb_voip_mtc->discard_rate,
rb_voip_mtc->burst_den,
rb_voip_mtc->gap_den,
ntohs(rb_voip_mtc->burst_dur),
ntohs(rb_voip_mtc->gap_dur),
ntohs(rb_voip_mtc->rnd_trip_delay),
ntohs(rb_voip_mtc->end_sys_delay),
rb_voip_mtc->signal_lvl,
rb_voip_mtc->noise_lvl,
rb_voip_mtc->rerl,
rb_voip_mtc->gmin,
rb_voip_mtc->r_factor,
rb_voip_mtc->ext_r_factor,
rb_voip_mtc->mos_lq,
rb_voip_mtc->mos_cq,
rb_voip_mtc->rx_config,
ntohs(rb_voip_mtc->jb_nom),
ntohs(rb_voip_mtc->jb_max),
ntohs(rb_voip_mtc->jb_abs_max));
}
void pjmedia_rtcp_xr_rx_rtcp_xr(char* cdrbufcur, const void *pkt, size_t size) {
const pjmedia_rtcp_xr_pkt *rtcp_xr = (pjmedia_rtcp_xr_pkt*) pkt;
const pjmedia_rtcp_xr_rb_rr_time *rb_rr_time = NULL;
const pjmedia_rtcp_xr_rb_dlrr *rb_dlrr = NULL;
const pjmedia_rtcp_xr_rb_stats *rb_stats = NULL;
const pjmedia_rtcp_xr_rb_voip_mtc *rb_voip_mtc = NULL;
const pjmedia_rtcp_xr_rb_header *rb_hdr = (pjmedia_rtcp_xr_rb_header*)
rtcp_xr->buf;
unsigned pkt_len, rb_len;
if (rtcp_xr->common.pt != RTCP_XR)
return;
print_rtcp_xr_common(cdrbufcur,rtcp_xr);
pkt_len = ntohs((u_int16_t)rtcp_xr->common.length);
if ((pkt_len + 1) > (size / 4))
return;
/* Parse report rpt_types */
while ((int32_t*)rb_hdr < (int32_t*)pkt + pkt_len)
{
rb_len = ntohs((u_int16_t)rb_hdr->length);
/* Just skip any block with length == 0 (no report content) */
if (rb_len) {
switch (rb_hdr->bt) {
case BT_RR_TIME:
rb_rr_time = (pjmedia_rtcp_xr_rb_rr_time*) rb_hdr;
print_rtcp_xr_rb_rr_time(cdrbufcur,rb_rr_time);
break;
case BT_DLRR:
rb_dlrr = (pjmedia_rtcp_xr_rb_dlrr*) rb_hdr;
print_rtcp_xr_rb_dlrr(cdrbufcur,rb_dlrr);
break;
case BT_STATS:
rb_stats = (pjmedia_rtcp_xr_rb_stats*) rb_hdr;
print_rtcp_xr_rb_stats(cdrbufcur,rb_stats);
break;
case BT_VOIP_METRICS:
rb_voip_mtc = (pjmedia_rtcp_xr_rb_voip_mtc*) rb_hdr;
print_rtcp_xr_rb_voip_mtc(cdrbufcur,rb_voip_mtc);
break;
default:
break;
}
}
rb_hdr = (pjmedia_rtcp_xr_rb_header*)
((int32_t*)rb_hdr + rb_len + 1);
}
}

+ 229
- 0
daemon/rtcp_xr.h View File

@ -0,0 +1,229 @@
/*
* rtcp_xr.h
*
* Created on: Mar 29, 2015
* Author: fmetz
*/
#ifndef RTCP_XR_H_
#define RTCP_XR_H_
#include <stdint.h>
#include <sys/types.h>
/**
* @defgroup PJMED_RTCP_XR RTCP Extended Report (XR) - RFC 3611
* @ingroup PJMEDIA_SESSION
* @brief RTCP XR extension to RTCP session
* @{
*
* PJMEDIA implements subsets of RTCP XR specification (RFC 3611) to monitor
* the quality of the real-time media (audio/video) transmission.
*/
/**
* Enumeration of report types of RTCP XR. Useful for user to enable varying
* combinations of RTCP XR report blocks.
*/
typedef enum {
PJMEDIA_RTCP_XR_LOSS_RLE = (1 << 0),
PJMEDIA_RTCP_XR_DUP_RLE = (1 << 1),
PJMEDIA_RTCP_XR_RCPT_TIMES = (1 << 2),
PJMEDIA_RTCP_XR_RR_TIME = (1 << 3),
PJMEDIA_RTCP_XR_DLRR = (1 << 4),
PJMEDIA_RTCP_XR_STATS = (1 << 5),
PJMEDIA_RTCP_XR_VOIP_METRICS = (1 << 6)
} pjmedia_rtcp_xr_type;
/**
* Enumeration of info need to be updated manually to RTCP XR. Most info
* could be updated automatically each time RTP received.
*/
typedef enum {
PJMEDIA_RTCP_XR_INFO_SIGNAL_LVL = 1,
PJMEDIA_RTCP_XR_INFO_NOISE_LVL = 2,
PJMEDIA_RTCP_XR_INFO_RERL = 3,
PJMEDIA_RTCP_XR_INFO_R_FACTOR = 4,
PJMEDIA_RTCP_XR_INFO_MOS_LQ = 5,
PJMEDIA_RTCP_XR_INFO_MOS_CQ = 6,
PJMEDIA_RTCP_XR_INFO_CONF_PLC = 7,
PJMEDIA_RTCP_XR_INFO_CONF_JBA = 8,
PJMEDIA_RTCP_XR_INFO_CONF_JBR = 9,
PJMEDIA_RTCP_XR_INFO_JB_NOM = 10,
PJMEDIA_RTCP_XR_INFO_JB_MAX = 11,
PJMEDIA_RTCP_XR_INFO_JB_ABS_MAX = 12
} pjmedia_rtcp_xr_info;
/**
* Enumeration of PLC types definitions for RTCP XR report.
*/
typedef enum {
PJMEDIA_RTCP_XR_PLC_UNK = 0,
PJMEDIA_RTCP_XR_PLC_DIS = 1,
PJMEDIA_RTCP_XR_PLC_ENH = 2,
PJMEDIA_RTCP_XR_PLC_STD = 3
} pjmedia_rtcp_xr_plc_type;
/**
* Enumeration of jitter buffer types definitions for RTCP XR report.
*/
typedef enum {
PJMEDIA_RTCP_XR_JB_UNKNOWN = 0,
PJMEDIA_RTCP_XR_JB_FIXED = 2,
PJMEDIA_RTCP_XR_JB_ADAPTIVE = 3
} pjmedia_rtcp_xr_jb_type;
#pragma pack(1)
/**
* This type declares RTCP XR Report Header.
*/
typedef struct pjmedia_rtcp_xr_rb_header
{
u_int8_t bt; /**< Block type. */
u_int8_t specific; /**< Block specific data. */
u_int16_t length; /**< Block length. */
} pjmedia_rtcp_xr_rb_header;
/**
* This type declares RTCP XR Receiver Reference Time Report Block.
*/
typedef struct pjmedia_rtcp_xr_rb_rr_time
{
pjmedia_rtcp_xr_rb_header header; /**< Block header. */
u_int32_t ntp_sec; /**< NTP time, seconds part. */
u_int32_t ntp_frac; /**< NTP time, fractions part. */
} pjmedia_rtcp_xr_rb_rr_time;
/**
* This type declares RTCP XR DLRR Report Sub-block
*/
typedef struct pjmedia_rtcp_xr_rb_dlrr_item
{
u_int32_t ssrc; /**< receiver SSRC */
u_int32_t lrr; /**< last receiver report */
u_int32_t dlrr; /**< delay since last receiver
report */
} pjmedia_rtcp_xr_rb_dlrr_item;
/**
* This type declares RTCP XR DLRR Report Block
*/
typedef struct pjmedia_rtcp_xr_rb_dlrr
{
pjmedia_rtcp_xr_rb_header header; /**< Block header. */
pjmedia_rtcp_xr_rb_dlrr_item item; /**< Block contents,
variable length list */
} pjmedia_rtcp_xr_rb_dlrr;
/**
* This type declares RTCP XR Statistics Summary Report Block
*/
typedef struct pjmedia_rtcp_xr_rb_stats
{
pjmedia_rtcp_xr_rb_header header; /**< Block header. */
u_int32_t ssrc; /**< Receiver SSRC */
u_int16_t begin_seq; /**< Begin RTP sequence reported */
u_int16_t end_seq; /**< End RTP sequence reported */
u_int32_t lost; /**< Number of packet lost in this
interval */
u_int32_t dup; /**< Number of duplicated packet in
this interval */
u_int32_t jitter_min; /**< Minimum jitter in this interval */
u_int32_t jitter_max; /**< Maximum jitter in this interval */
u_int32_t jitter_mean; /**< Average jitter in this interval */
u_int32_t jitter_dev; /**< Jitter deviation in this
interval */
u_int32_t toh_min:8; /**< Minimum ToH in this interval */
u_int32_t toh_max:8; /**< Maximum ToH in this interval */
u_int32_t toh_mean:8; /**< Average ToH in this interval */
u_int32_t toh_dev:8; /**< ToH deviation in this interval */
} pjmedia_rtcp_xr_rb_stats;
/**
* This type declares RTCP XR VoIP Metrics Report Block
*/
typedef struct pjmedia_rtcp_xr_rb_voip_mtc
{
pjmedia_rtcp_xr_rb_header header; /**< Block header. */
u_int32_t ssrc; /**< Receiver SSRC */
u_int8_t loss_rate; /**< Packet loss rate */
u_int8_t discard_rate; /**< Packet discarded rate */
u_int8_t burst_den; /**< Burst density */
u_int8_t gap_den; /**< Gap density */
u_int16_t burst_dur; /**< Burst duration */
u_int16_t gap_dur; /**< Gap duration */
u_int16_t rnd_trip_delay;/**< Round trip delay */
u_int16_t end_sys_delay; /**< End system delay */
u_int8_t signal_lvl; /**< Signal level */
u_int8_t noise_lvl; /**< Noise level */
u_int8_t rerl; /**< Residual Echo Return Loss */
u_int8_t gmin; /**< The gap threshold */
u_int8_t r_factor; /**< Voice quality metric carried
over this RTP session */
u_int8_t ext_r_factor; /**< Voice quality metric carried
outside of this RTP session*/
u_int8_t mos_lq; /**< Mean Opinion Score for
Listening Quality */
u_int8_t mos_cq; /**< Mean Opinion Score for
Conversation Quality */
u_int8_t rx_config; /**< Receiver configuration */
u_int8_t reserved2; /**< Not used */
u_int16_t jb_nom; /**< Current delay by jitter
buffer */
u_int16_t jb_max; /**< Maximum delay by jitter
buffer */
u_int16_t jb_abs_max; /**< Maximum possible delay by
jitter buffer */
} pjmedia_rtcp_xr_rb_voip_mtc;
/**
* Constant of RTCP-XR content size.
*/
#define PJMEDIA_RTCP_XR_BUF_SIZE \
sizeof(pjmedia_rtcp_xr_rb_rr_time) + \
sizeof(pjmedia_rtcp_xr_rb_dlrr) + \
sizeof(pjmedia_rtcp_xr_rb_stats) + \
sizeof(pjmedia_rtcp_xr_rb_voip_mtc)
/**
* This structure declares RTCP XR (Extended Report) packet.
*/
typedef struct pjmedia_rtcp_xr_pkt
{
struct {
#if defined(PJ_IS_BIG_ENDIAN) && PJ_IS_BIG_ENDIAN!=0
unsigned version:2; /**< packet type */
unsigned p:1; /**< padding flag */
unsigned count:5; /**< varies by payload type */
unsigned pt:8; /**< payload type */
#else
unsigned count:5; /**< varies by payload type */
unsigned p:1; /**< padding flag */
unsigned version:2; /**< packet type */
unsigned pt:8; /**< payload type */
#endif
unsigned length:16; /**< packet length */
u_int32_t ssrc; /**< SSRC identification */
} common;
int8_t buf[PJMEDIA_RTCP_XR_BUF_SIZE];
/**< Content buffer */
} pjmedia_rtcp_xr_pkt;
/**
* This function is called internally by RTCP session when it receives
* incoming RTCP XR packets.
*
* @param rtcp_pkt The received RTCP XR packet.
* @param size Size of the incoming packet.
*/
void pjmedia_rtcp_xr_rx_rtcp_xr( char* cdrbufcur, const void *rtcp_pkt, size_t size);
#endif /* RTCP_XR_H_ */

+ 3
- 1
debian/ngcp-rtpengine-daemon.default View File

@ -21,7 +21,9 @@ TABLE=0
# LOG_LEVEL=6 # LOG_LEVEL=6
# LOG_FACILITY=daemon # LOG_FACILITY=daemon
# LOG_FACILITY_CDR=daemon # LOG_FACILITY_CDR=daemon
# LOG_FACILITY_RTCP=daemon
# NUM_THREADS=5 # NUM_THREADS=5
# DELETE_DELAY=30 # DELETE_DELAY=30
# GRAPHITE=9006 # GRAPHITE=9006
# GRAPHITE_INTERVAL=60
# GRAPHITE_INTERVAL=60
# GRAPHITE_PREFIX=myownprefix

+ 2
- 0
debian/ngcp-rtpengine-daemon.init View File

@ -70,10 +70,12 @@ OPTIONS="$OPTIONS --table=$TABLE"
[ -z "$LOG_LEVEL" ] || OPTIONS="$OPTIONS --log-level=$LOG_LEVEL" [ -z "$LOG_LEVEL" ] || OPTIONS="$OPTIONS --log-level=$LOG_LEVEL"
[ -z "$LOG_FACILITY" ] || OPTIONS="$OPTIONS --log-facility=$LOG_FACILITY" [ -z "$LOG_FACILITY" ] || OPTIONS="$OPTIONS --log-facility=$LOG_FACILITY"
[ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR" [ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR"
[ -z "$LOG_FACILITY_RTCP" ] || OPTIONS="$OPTIONS --log-facility-rtcp=$LOG_FACILITY_RTCP"
[ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS" [ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS"
[ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY" [ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY"
[ -z "$GRAPHITE" ] || OPTIONS="$OPTIONS --graphite=$GRAPHITE" [ -z "$GRAPHITE" ] || OPTIONS="$OPTIONS --graphite=$GRAPHITE"
[ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL" [ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL"
[ -z "$GRAPHITE_PREFIX" ] || OPTIONS="$OPTIONS --graphite-prefix=$GRAPHITE_PREFIX"
if test "$FORK" = "no" ; then if test "$FORK" = "no" ; then
OPTIONS="$OPTIONS --foreground" OPTIONS="$OPTIONS --foreground"
fi fi


+ 10
- 0
el/rtpengine.init View File

@ -153,10 +153,20 @@ build_opts() {
OPTS+=" --graphite-interval=$GRAPHITE_INTERVAL" OPTS+=" --graphite-interval=$GRAPHITE_INTERVAL"
fi fi
if [[ -n "$GRAPHITE_PREFIX" ]]
then
OPTS+=" --graphite-prefix=$GRAPHITE_PREFIX"
fi
if [[ -n "$LOG_FACILITY_CDR" ]] if [[ -n "$LOG_FACILITY_CDR" ]]
then then
OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR" OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR"
fi fi
if [[ -n "$LOG_FACILITY_RTCP" ]]
then
OPTS+=" --log-facility-rtcp=$LOG_FACILITY_RTCP"
fi
} }
start() { start() {


+ 15
- 0
kernel-module/rtpengine_config.h View File

@ -0,0 +1,15 @@
/*
* rtpengine_config.h
*
* Description: Config file with preprocessor config makros
*
* Created on: Mar 18, 2015
* Author: fmetz
*/
#ifndef RTPENGINE_CONFIG_H_
#define RTPENGINE_CONFIG_H_
#define RE_HAS_MEASUREDELAY 1
#endif /* RTPENGINE_CONFIG_H_ */

+ 108
- 7
kernel-module/xt_RTPENGINE.c View File

@ -29,6 +29,8 @@
#include "xt_RTPENGINE.h" #include "xt_RTPENGINE.h"
#endif #endif
#include "rtpengine_config.h"
MODULE_LICENSE("GPL"); MODULE_LICENSE("GPL");
@ -147,6 +149,10 @@ struct rtpengine_stats_a {
atomic64_t packets; atomic64_t packets;
atomic64_t bytes; atomic64_t bytes;
atomic64_t errors; atomic64_t errors;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
atomic_t in_tos;
}; };
struct rtpengine_rtp_stats_a { struct rtpengine_rtp_stats_a {
atomic64_t packets; atomic64_t packets;
@ -854,6 +860,10 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
op.stats.packets = atomic64_read(&g->stats.packets); op.stats.packets = atomic64_read(&g->stats.packets);
op.stats.bytes = atomic64_read(&g->stats.bytes); op.stats.bytes = atomic64_read(&g->stats.bytes);
op.stats.errors = atomic64_read(&g->stats.errors); op.stats.errors = atomic64_read(&g->stats.errors);
op.stats.delay_min = g->stats.delay_min;
op.stats.delay_max = g->stats.delay_max;
op.stats.delay_avg = g->stats.delay_avg;
op.stats.in_tos = atomic64_read(&g->stats.in_tos);
for (i = 0; i < g->target.num_payload_types; i++) { for (i = 0; i < g->target.num_payload_types; i++) {
op.rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets); op.rtp_stats[i].packets = atomic64_read(&g->rtp_stats[i].packets);
@ -882,10 +892,6 @@ err:
return err; return err;
} }
static int proc_list_open(struct inode *i, struct file *f) { static int proc_list_open(struct inode *i, struct file *f) {
int err; int err;
struct seq_file *p; struct seq_file *p;
@ -1474,6 +1480,9 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
atomic64_set(&g->stats.packets, atomic64_read(&og->stats.packets)); atomic64_set(&g->stats.packets, atomic64_read(&og->stats.packets));
atomic64_set(&g->stats.bytes, atomic64_read(&og->stats.bytes)); atomic64_set(&g->stats.bytes, atomic64_read(&og->stats.bytes));
atomic64_set(&g->stats.errors, atomic64_read(&og->stats.errors)); atomic64_set(&g->stats.errors, atomic64_read(&og->stats.errors));
g->stats.delay_min = og->stats.delay_min;
g->stats.delay_max = og->stats.delay_max;
g->stats.delay_avg = og->stats.delay_avg;
for (j = 0; j < NUM_PAYLOAD_TYPES; j++) { for (j = 0; j < NUM_PAYLOAD_TYPES; j++) {
atomic64_set(&g->rtp_stats[j].packets, atomic64_read(&og->rtp_stats[j].packets)); atomic64_set(&g->rtp_stats[j].packets, atomic64_read(&og->rtp_stats[j].packets));
@ -1816,6 +1825,7 @@ drop:
static int send_proxy_packet(struct sk_buff *skb, struct re_address *src, struct re_address *dst, unsigned char tos) { static int send_proxy_packet(struct sk_buff *skb, struct re_address *src, struct re_address *dst, unsigned char tos) {
if (src->family != dst->family) if (src->family != dst->family)
goto drop; goto drop;
@ -2184,7 +2194,48 @@ static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rt
return match - tg->payload_types; return match - tg->payload_types;
} }
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) {
static void re_timespec_subtract (struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec - (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec - (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_multiply(struct timespec *result, const struct timespec *a, const long long multiplier) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec * multiplier;
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_devide(struct timespec *result, const struct timespec *a, const long devisor) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec / devisor;
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_add(struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec + (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec + (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
/* Return negative, zero, positive if A < B, A == B, A > B, respectively.
Assume the nanosecond components are in range, or close to it. */
static int re_timespec_cmp (struct timespec *a, struct timespec *b)
{
return (a->tv_sec < b->tv_sec ? -1
: a->tv_sec > b->tv_sec ? 1
: a->tv_nsec - b->tv_nsec);
}
#if (RE_HAS_MEASUREDELAY)
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, struct timespec *starttime, u_int8_t in_tos) {
#else
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, u_int8_t in_tos) {
#endif
struct udphdr *uh; struct udphdr *uh;
struct rtpengine_target *g; struct rtpengine_target *g;
struct sk_buff *skb2; struct sk_buff *skb2;
@ -2194,6 +2245,10 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
struct rtp_parsed rtp; struct rtp_parsed rtp;
u_int64_t pkt_idx = 0, pkt_idx_u; u_int64_t pkt_idx = 0, pkt_idx_u;
#if (RE_HAS_MEASUREDELAY)
struct timespec endtime, delay;
#endif
skb_reset_transport_header(skb); skb_reset_transport_header(skb);
uh = udp_hdr(skb); uh = udp_hdr(skb);
skb_pull(skb, sizeof(*uh)); skb_pull(skb, sizeof(*uh));
@ -2299,6 +2354,10 @@ not_rtp:
err = send_proxy_packet(skb, &g->target.src_addr, &g->target.dst_addr, g->target.tos); err = send_proxy_packet(skb, &g->target.src_addr, &g->target.dst_addr, g->target.tos);
out: out:
if (atomic64_read(&g->stats.packets)==0)
atomic_set(&g->stats.in_tos,in_tos);
if (err) if (err)
atomic64_inc(&g->stats.errors); atomic64_inc(&g->stats.errors);
else { else {
@ -2308,6 +2367,29 @@ out:
if (rtp_pt_idx >= 0) { if (rtp_pt_idx >= 0) {
atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets); atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets);
atomic64_add(datalen, &g->rtp_stats[rtp_pt_idx].bytes); atomic64_add(datalen, &g->rtp_stats[rtp_pt_idx].bytes);
#if (RE_HAS_MEASUREDELAY)
getnstimeofday(&endtime);
re_timespec_subtract(&delay,&endtime, starttime);
if (atomic64_read(&g->stats.packets)==1) {
g->stats.delay_min=delay;
g->stats.delay_avg=delay;
g->stats.delay_max=delay;
} else {
if (re_timespec_cmp(&g->stats.delay_min,&delay)>0) {
g->stats.delay_min = delay;
}
if (re_timespec_cmp(&g->stats.delay_max,&delay)<0) {
g->stats.delay_max = delay;
}
re_timespec_multiply(&g->stats.delay_avg,&g->stats.delay_avg,atomic64_read(&g->stats.packets)-1);
re_timespec_add(&g->stats.delay_avg,&g->stats.delay_avg,&delay);
re_timespec_devide(&g->stats.delay_avg,&g->stats.delay_avg,atomic64_read(&g->stats.packets));
}
#endif
} }
else if (rtp_pt_idx == -2) else if (rtp_pt_idx == -2)
/* not RTP */ ; /* not RTP */ ;
@ -2345,6 +2427,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
struct rtpengine_table *t; struct rtpengine_table *t;
struct re_address src; struct re_address src;
#if (RE_HAS_MEASUREDELAY)
struct timespec starttime;
getnstimeofday(&starttime);
#endif
t = get_table(pinfo->id); t = get_table(pinfo->id);
if (!t) if (!t)
goto skip; goto skip;
@ -2363,7 +2450,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
src.family = AF_INET; src.family = AF_INET;
src.u.ipv4 = ih->saddr; src.u.ipv4 = ih->saddr;
return rtpengine46(skb, t, &src);
#if (RE_HAS_MEASUREDELAY)
return rtpengine46(skb, t, &src, &starttime, (u_int8_t)ih->tos);
#else
return rtpengine46(skb, t, &src, (u_int8_t)ih->tos);
#endif
skip2: skip2:
kfree_skb(skb); kfree_skb(skb);
@ -2387,6 +2478,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
struct rtpengine_table *t; struct rtpengine_table *t;
struct re_address src; struct re_address src;
#if (RE_HAS_MEASUREDELAY)
struct timespec starttime;
getnstimeofday(&starttime);
#endif
t = get_table(pinfo->id); t = get_table(pinfo->id);
if (!t) if (!t)
goto skip; goto skip;
@ -2397,6 +2493,7 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
skb_reset_network_header(skb); skb_reset_network_header(skb);
ih = ipv6_hdr(skb); ih = ipv6_hdr(skb);
skb_pull(skb, sizeof(*ih)); skb_pull(skb, sizeof(*ih));
if (ih->nexthdr != IPPROTO_UDP) if (ih->nexthdr != IPPROTO_UDP)
goto skip2; goto skip2;
@ -2405,7 +2502,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
src.family = AF_INET6; src.family = AF_INET6;
memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6)); memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6));
return rtpengine46(skb, t, &src);
#if (RE_HAS_MEASUREDELAY)
return rtpengine46(skb, t, &src, &starttime, ipv6_get_dsfield(ih));
#else
return rtpengine46(skb, t, &src, ipv6_get_dsfield(ih));
#endif
skip2: skip2:
kfree_skb(skb); kfree_skb(skb);


+ 5
- 1
kernel-module/xt_RTPENGINE.h View File

@ -15,6 +15,10 @@ struct rtpengine_stats {
u_int64_t packets; u_int64_t packets;
u_int64_t bytes; u_int64_t bytes;
u_int64_t errors; u_int64_t errors;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
u_int8_t in_tos;
}; };
struct rtpengine_rtp_stats { struct rtpengine_rtp_stats {
u_int64_t packets; u_int64_t packets;
@ -99,7 +103,7 @@ struct rtpengine_message {
MMG_NOOP = 1, MMG_NOOP = 1,
MMG_ADD, MMG_ADD,
MMG_DEL, MMG_DEL,
MMG_UPDATE,
MMG_UPDATE
} cmd; } cmd;
struct rtpengine_target_info target; struct rtpengine_target_info target;


+ 67
- 61
utils/rtpengine-ctl View File

@ -1,70 +1,76 @@
#!/bin/bash
#
#!/usr/bin/perl
host=127.0.0.1
port=9900
error_rc=255
use IO::Socket::INET;
prgname=${0##*/}
prgdir=${0%$prgname}
$num_args = $#ARGV + 1;
if ($num_args == 0) {
showusage();
exit;
}
# auto-flush on socket
$| = 1;
my $argumentstring = "";
my $ip = "127.0.0.1";
my $port = "9900";
showusage() {
echo ""
echo " $0 [ -ip <ipaddress> -port <port> ] <command>"
echo ""
echo " Supported commands are:"
echo ""
echo " list [ numsessions | sessions | session <callid> ]"
echo " numsessions : prints the number of sessions"
echo " sessions : print one-liner session information"
echo " session <callid> : print detail about one session"
echo " totals : print total statistics (does not include current sessions)"
echo ""
echo " terminate [ all | <callid> ]"
echo " all : terminates all current sessions"
echo " <callid> : session is immediately terminated"
echo ""
echo ""
echo " Return Value:"
echo " 0 on success with ouput from server side, other values for failure."
echo ""
exit 0
for (my $argnum=0; $argnum <= $#ARGV; $argnum++) {
if ($ARGV[$argnum] eq "-ip") {
die "No argument after -ip\n" unless $argnum+1<=$#ARGV;
$argnum = $argnum+1;
$ip = $ARGV[$argnum];
} elsif ($ARGV[$argnum] eq "-port") {
die "No argument after -port\n" unless $argnum+1<=$#ARGV;
$argnum = $argnum+1;
$port = $ARGV[$argnum];
} else {
$argumentstring .= "$ARGV[$argnum] ";
}
} }
if [ $# -eq 0 ]; then showusage; fi
# create a connecting socket
my $socket = new IO::Socket::INET (
PeerHost => $ip,
PeerPort => $port,
Proto => 'tcp',
);
die "Cannot connect to the rtpengine $!\n" unless $socket;
$argumentstring = trim($argumentstring);
my $size = $socket->send($argumentstring);
# notify server that request has been sent
shutdown($socket, 1);
# receive a response of up to 10MB
my $response = "";
$socket->recv($response, 1024*1024*10);
print $response;
command -v nc 2>&1 >/dev/null
if [ $? -ne 0 ]; then
echo "Error: $0 requires netcat to be installed."
exit 0
fi
$socket->close();
while [ $# -gt 0 ]; do
case $1 in
"-?"|"-help"|"-h")
showusage
;;
"-ip")
shift
if [ $# -gt 0 ]; then
host=$1
else
echo "Missing parameter for option '-ip'" >&2
fi
;;
"-port")
shift
if [ $# -gt 0 ]; then
port=$1
else
echo "Missing parameter for option '-port'" >&2
fi
;;
*)
varargs="$varargs $1"
esac
shift
done
sub showusage {
print "\n";
print " rtpengine-ctl [ -ip <ipaddress> -port <port> ] <command>\n";
print "\n";
print " Supported commands are:\n";
print "\n";
print " list [ numsessions | sessions | session <callid> | totals ]\n";
print " numsessions : prints the number of sessions\n";
print " sessions : print one-liner session information\n";
print " session <callid> : print detail about one session\n";
print " totals : print total statistics\n";
print "\n";
print " terminate [ all | <callid> ]\n";
print " all : terminates all current sessions\n";
print " <callid> : session is immediately terminated\n";
print "\n";
print "\n";
print " Return Value:\n";
print " 0 on success with ouput from server side, other values for failure.\n";
print "\n";
}
echo -n ${varargs} | nc ${host} ${port}
sub trim { my $s = shift; $s =~ s/^\s+|\s+$//g; return $s };

Loading…
Cancel
Save