From 133e6304b3f57b90b8f3ccb2f8f4e625ba80e735 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 22 Mar 2016 14:10:01 -0400 Subject: [PATCH] MT#18599 support sending RTCP stats to Homer via HEP Change-Id: Icaf28f28c910318726e446a1a2ad4c7ee5e79f85 --- README.md | 50 ++- daemon/Makefile | 2 +- daemon/call.h | 3 + daemon/graphite.c | 10 +- daemon/homer.c | 549 +++++++++++++++++++++++++++ daemon/homer.h | 14 + daemon/main.c | 44 ++- daemon/media_socket.c | 4 +- daemon/rtcp.c | 89 ++++- daemon/rtcp.h | 2 +- daemon/rtcp_xr.c | 34 +- daemon/rtcp_xr.h | 4 +- daemon/socket.c | 37 +- daemon/socket.h | 17 +- debian/ngcp-rtpengine-daemon.default | 3 + debian/ngcp-rtpengine-daemon.init | 3 + 16 files changed, 777 insertions(+), 88 deletions(-) create mode 100644 daemon/homer.c create mode 100644 daemon/homer.h diff --git a/README.md b/README.md index d06929d84..6e7465840 100644 --- a/README.md +++ b/README.md @@ -158,9 +158,12 @@ option and which are reproduced below: -F, --no-fallback Only start when kernel module is available -i, --interface=[NAME/]IP[!IP] Local interface for RTP -l, --listen-tcp=[IP:]PORT TCP port to listen on - -c, --listen-cli=[IP46:]PORT TCP port to listen on, CLI (command line interface) -u, --listen-udp=[IP46:]PORT UDP port to listen on -n, --listen-ng=[IP46:]PORT UDP port to listen on, NG protocol + -c, --listen-cli=[IP46:]PORT TCP port to listen on, CLI (command line interface) + -g, --graphite=IP46:PORT TCP address of graphite statistics server + -G, --graphite-interval=INT Graphite data statistics send interval + --graphite-prefix=STRING Graphite prefix for every line -T, --tos=INT TOS value to set on streams -o, --timeout=SECS RTP timeout -s, --silent-timeout=SECS RTP timeout for muted @@ -184,10 +187,10 @@ option and which are reproduced below: -d, --delete-delay Delay for deleting a session from memory. --sip-source Use SIP source address by default --dtls-passive Always prefer DTLS passive role - -g, --graphite=[IP46:]PORT TCP address of graphite statistics server - -G, --graphite-interval=INT Graphite data statistics send interval - --graphite-prefix=STRING Graphite prefix for every line --max-sessions=INT Limit the number of maximum concurrent sessions + --homer=IP46:PORT Address of Homer server for RTCP stats + --homer-protocol=udp|tcp Transport protocol for Homer (default udp) + --homer-id=INT 'Capture ID' to use within the HEP protocol Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local IP address through `--interface`, and at least one of the `--listen-...` options must be given. @@ -277,6 +280,18 @@ The options are described in more detail below. TCP ip and port to listen for the CLI (command line interface). +* -g, --graphite + + Address of the graphite statistics server. + +* -w, --graphite-interval + + Interval of the time when information is sent to the graphite server. + +* --graphite-prefix + + Add a prefix for every graphite line. + * -t, --tos Takes an integer as argument and if given, specifies the TOS value that should be set in outgoing @@ -415,18 +430,6 @@ The options are described in more detail below. Selects the internal format of the XMLRPC callback message for B2BUA call teardown. 0 is for SEMS, 1 is for a generic format containing the call-ID only. -* -g, --graphite - - Address of the graphite statistics server. - -* -w, --graphite-interval - - Interval of the time when information is sent to the graphite server. - -* --graphite-prefix - - Add a prefix for every graphite line. - * --max-sessions Limit the number of maximum concurrent sessions. Set at startup via MAX_SESSIONS in config file. Set at runtime via rtpengine-ctl util. @@ -436,6 +439,21 @@ The options are described in more detail below. Disable feature: 'rtpengine-ctl set maxsessions -1' By default, the feature is disabled (i.e. maxsessions == -1). +* --homer + + Enables sending the decoded contents of RTCP packets to a Homer SIP capture server. The transport + is HEP version 3 and payload format is JSON. This argument takes an IP address and a port number + as value. + +* --homer-protocol + + Can be either "udp" or "tcp" with "udp" being the default. + +* --homer-id + + The HEP protocol used by Homer contains a "capture ID" used to distinguish different sources + of capture data. This ID can be specified using this argument. + A typical command line (enabling both UDP and NG protocols) thus may look like: diff --git a/daemon/Makefile b/daemon/Makefile index 0b1f1b27b..8fa3d06e7 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -65,7 +65,7 @@ endif SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ - media_socket.c rtcp_xr.c + media_socket.c rtcp_xr.c homer.c OBJS= $(SRCS:.c=.o) diff --git a/daemon/call.h b/daemon/call.h index 64cf0b8e8..321424c20 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -206,6 +206,7 @@ struct sdp_ng_flags; struct local_interface; struct call_monologue; struct ice_agent; +struct homer_sender; typedef bencode_buffer_t call_buffer_t; @@ -478,6 +479,8 @@ struct callmaster { struct callmaster_config conf; struct timeval latest_graphite_interval_start; + + struct homer_sender *homer; }; struct call_stats { diff --git a/daemon/graphite.c b/daemon/graphite.c index dd798b0ea..f461940e7 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -240,8 +240,6 @@ void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int secon fd_set wfds; FD_ZERO(&wfds); struct timeval tv; - int optval=0; - socklen_t optlen=sizeof(optval); // sanity checks if (!cm) { @@ -275,10 +273,10 @@ void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int secon connection_state = STATE_DISCONNECTED; return; } - rc = getsockopt(graphite_sock.fd, SOL_SOCKET, SO_ERROR, &optval, &optlen); - if (rc) ilog(LOG_ERROR,"getsockopt failure."); - if (optval != 0) { - ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock.fd, strerror(optval)); + rc = socket_error(&graphite_sock); + if (rc < 0) ilog(LOG_ERROR,"getsockopt failure."); + if (rc != 0) { + ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock.fd, strerror(rc)); close_socket(&graphite_sock); connection_state = STATE_DISCONNECTED; return; diff --git a/daemon/homer.c b/daemon/homer.c new file mode 100644 index 000000000..2f53502ec --- /dev/null +++ b/daemon/homer.c @@ -0,0 +1,549 @@ +#include "homer.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "log.h" +#include "aux.h" + + + + +struct homer_sender { + mutex_t lock; + + endpoint_t endpoint; + int protocol; + int capture_id; + socket_t socket; + time_t retry; + + GQueue send_queue; + GString *partial; + + int (*state)(struct homer_sender *); +}; + + + + +static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst); + +// state handlers +static int __established(struct homer_sender *hs); +static int __in_progress(struct homer_sender *hs); +static int __no_socket(struct homer_sender *hs); + + + + +static void __reset(struct homer_sender *hs) { + close_socket(&hs->socket); + hs->state = __no_socket; + hs->retry = time(NULL) + 30; + + // discard partially written packet + if (hs->partial) + g_string_free(hs->partial, TRUE); + hs->partial = NULL; +} + +static int __attempt_send(struct homer_sender *hs, GString *gs) { + int ret; + + ret = write(hs->socket.fd, gs->str, gs->len); + if (ret == gs->len) { + // full write + g_string_free(gs, TRUE); + return 0; + } + if (ret < 0) { + if (errno != EWOULDBLOCK && errno != EAGAIN) { + ilog(LOG_ERR, "Write error to Homer at %s: %s", + endpoint_print_buf(&hs->endpoint), strerror(errno)); + __reset(hs); + return 1; + } + ilog(LOG_DEBUG, "Home write blocked"); + // XXX use poller for blocked writes? + return 2; + } + // partial write + ilog(LOG_DEBUG, "Home write blocked (partial write)"); + g_string_erase(gs, 0, ret); + return 3; +} + +static int __established(struct homer_sender *hs) { + char buf[16]; + int ret; + GString *gs; + + // test connection with a dummy read + ret = read(hs->socket.fd, buf, sizeof(buf)); + if (ret < 0) { + if (errno != EWOULDBLOCK && errno != EAGAIN) { + ilog(LOG_ERR, "Connection error from Homer at %s: %s", + endpoint_print_buf(&hs->endpoint), strerror(errno)); + __reset(hs); + return -1; + } + } + // XXX handle return data from Homer? + + if (hs->partial) { + ilog(LOG_DEBUG, "dequeue partial packet to Homer"); + ret = __attempt_send(hs, hs->partial); + if (ret == 3 || ret == 2) // partial write or not sent at all + return 0; + if (ret == 1) // write error, takes care of deleting hs->partial + return -1; + // ret == 0 -> sent OK, drop through to unqueue + g_string_free(hs->partial, TRUE); + hs->partial = NULL; + } + + // unqueue as much as we can + while ((gs = g_queue_pop_head(&hs->send_queue))) { + ilog(LOG_DEBUG, "dequeue send queue to Homer"); + ret = __attempt_send(hs, gs); + if (ret == 0) // everything sent OK + continue; + if (ret == 3) { // partial write + hs->partial = gs; + return 0; + } + g_queue_push_head(&hs->send_queue, gs); + if (ret == 1) // write error + return -1; + // ret == 2 -> blocked + return 0; + } + + // everything unqueued + return 0; +} + +static int __check_conn(struct homer_sender *hs, int ret) { + if (ret == 0) { + ilog(LOG_INFO, "Connection to Homer at %s has been established", + endpoint_print_buf(&hs->endpoint)); + hs->state = __established; + return hs->state(hs); + } + if (ret == 1) { + ilog(LOG_DEBUG, "connection to Homer is in progress"); + hs->state = __in_progress; + return 0; + } + + ilog(LOG_ERR, "Failed to connect to Homer at %s: %s", + endpoint_print_buf(&hs->endpoint), strerror(errno)); + + __reset(hs); + return -1; +} + +static int __in_progress(struct homer_sender *hs) { + int ret; + + ilog(LOG_DEBUG, "connection to Homer is in progress - checking"); + + ret = connect_socket_retry(&hs->socket); + return __check_conn(hs, ret); +} + +static int __no_socket(struct homer_sender *hs) { + int ret; + + if (hs->retry > time(NULL)) + return 0; + + ilog(LOG_INFO, "Connecting to Homer at %s", endpoint_print_buf(&hs->endpoint)); + + ret = connect_socket_nb(&hs->socket, hs->protocol, &hs->endpoint); + return __check_conn(hs, ret); +} + +struct homer_sender *homer_sender_new(const endpoint_t *ep, int protocol, int capture_id) { + struct homer_sender *ret; + + if (is_addr_unspecified(&ep->address)) + return NULL; + + ret = malloc(sizeof(*ret)); + ZERO(*ret); + mutex_init(&ret->lock); + ret->endpoint = *ep; + ret->protocol = protocol; + ret->capture_id = capture_id; + ret->retry = time(NULL); + + ret->state = __no_socket; + + return ret; +} + +// takes over the GString +int homer_send(struct homer_sender *hs, GString *s, const str *id, const endpoint_t *src, + const endpoint_t *dst) +{ + if (!hs) + goto out; + if (!s) + goto out; + if (!s->len) // empty write, shouldn't happen + goto out; + + if (send_hepv3(s, id, hs->capture_id, src, dst)) + goto out; + + mutex_lock(&hs->lock); + // XXX limit size of send queue + g_queue_push_tail(&hs->send_queue, s); + hs->state(hs); + mutex_unlock(&hs->lock); + + goto done; + +out: + if (s) + g_string_free(s, TRUE); +done: + return 0; +} + + + + +// from captagent transport_hep.[ch] + +struct hep_chunk { + u_int16_t vendor_id; + u_int16_t type_id; + u_int16_t length; +} __attribute__((packed)); + +typedef struct hep_chunk hep_chunk_t; + +struct hep_chunk_uint8 { + hep_chunk_t chunk; + u_int8_t data; +} __attribute__((packed)); + +typedef struct hep_chunk_uint8 hep_chunk_uint8_t; + +struct hep_chunk_uint16 { + hep_chunk_t chunk; + u_int16_t data; +} __attribute__((packed)); + +typedef struct hep_chunk_uint16 hep_chunk_uint16_t; + +struct hep_chunk_uint32 { + hep_chunk_t chunk; + u_int32_t data; +} __attribute__((packed)); + +typedef struct hep_chunk_uint32 hep_chunk_uint32_t; + +struct hep_chunk_str { + hep_chunk_t chunk; + char *data; +} __attribute__((packed)); + +typedef struct hep_chunk_str hep_chunk_str_t; + +struct hep_chunk_ip4 { + hep_chunk_t chunk; + struct in_addr data; +} __attribute__((packed)); + +typedef struct hep_chunk_ip4 hep_chunk_ip4_t; + +struct hep_chunk_ip6 { + hep_chunk_t chunk; + struct in6_addr data; +} __attribute__((packed)); + +typedef struct hep_chunk_ip6 hep_chunk_ip6_t; + +struct hep_ctrl { + char id[4]; + u_int16_t length; +} __attribute__((packed)); + +typedef struct hep_ctrl hep_ctrl_t; + +struct hep_chunk_payload { + hep_chunk_t chunk; + char *data; +} __attribute__((packed)); + +typedef struct hep_chunk_payload hep_chunk_payload_t; + +/* Structure of HEP */ + +struct hep_generic { + hep_ctrl_t header; + hep_chunk_uint8_t ip_family; + hep_chunk_uint8_t ip_proto; + hep_chunk_uint16_t src_port; + hep_chunk_uint16_t dst_port; + hep_chunk_uint32_t time_sec; + hep_chunk_uint32_t time_usec; + hep_chunk_uint8_t proto_t; + hep_chunk_uint32_t capt_id; +} __attribute__((packed)); + +typedef struct hep_generic hep_generic_t; + +#define PROTO_RTCP_JSON 0x05 + +// modifies the GString in place +static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst) { + + struct hep_generic *hg=NULL; + void* buffer; + unsigned int buflen=0, iplen=0,tlen=0; + hep_chunk_ip4_t src_ip4, dst_ip4; + hep_chunk_ip6_t src_ip6, dst_ip6; + hep_chunk_t payload_chunk; + //hep_chunk_t authkey_chunk; + hep_chunk_t correlation_chunk; + //static int errors = 0; + struct timeval now; + + hg = malloc(sizeof(struct hep_generic)); + memset(hg, 0, sizeof(struct hep_generic)); + + + /* header set */ + memcpy(hg->header.id, "\x48\x45\x50\x33", 4); + + /* IP proto */ + hg->ip_family.chunk.vendor_id = htons(0x0000); + hg->ip_family.chunk.type_id = htons(0x0001); + hg->ip_family.data = src->address.family->af; + hg->ip_family.chunk.length = htons(sizeof(hg->ip_family)); + + /* Proto ID */ + hg->ip_proto.chunk.vendor_id = htons(0x0000); + hg->ip_proto.chunk.type_id = htons(0x0002); + hg->ip_proto.data = IPPROTO_UDP; + hg->ip_proto.chunk.length = htons(sizeof(hg->ip_proto)); + + + /* IPv4 */ + if(hg->ip_family.data == AF_INET) { + /* SRC IP */ + src_ip4.chunk.vendor_id = htons(0x0000); + src_ip4.chunk.type_id = htons(0x0003); + src_ip4.data = src->address.u.ipv4; + src_ip4.chunk.length = htons(sizeof(src_ip4)); + + /* DST IP */ + dst_ip4.chunk.vendor_id = htons(0x0000); + dst_ip4.chunk.type_id = htons(0x0004); + dst_ip4.data = dst->address.u.ipv4; + dst_ip4.chunk.length = htons(sizeof(dst_ip4)); + + iplen = sizeof(dst_ip4) + sizeof(src_ip4); + } + /* IPv6 */ + else if(hg->ip_family.data == AF_INET6) { + /* SRC IPv6 */ + src_ip6.chunk.vendor_id = htons(0x0000); + src_ip6.chunk.type_id = htons(0x0005); + src_ip6.data = src->address.u.ipv6; + src_ip6.chunk.length = htons(sizeof(src_ip6)); + + /* DST IPv6 */ + dst_ip6.chunk.vendor_id = htons(0x0000); + dst_ip6.chunk.type_id = htons(0x0006); + dst_ip6.data = dst->address.u.ipv6; + dst_ip6.chunk.length = htons(sizeof(dst_ip6)); + + iplen = sizeof(dst_ip6) + sizeof(src_ip6); + } + + /* SRC PORT */ + hg->src_port.chunk.vendor_id = htons(0x0000); + hg->src_port.chunk.type_id = htons(0x0007); + hg->src_port.data = htons(src->port); + hg->src_port.chunk.length = htons(sizeof(hg->src_port)); + + /* DST PORT */ + hg->dst_port.chunk.vendor_id = htons(0x0000); + hg->dst_port.chunk.type_id = htons(0x0008); + hg->dst_port.data = htons(dst->port); + hg->dst_port.chunk.length = htons(sizeof(hg->dst_port)); + + + gettimeofday(&now, NULL); // XXX replace with timestamp from actual packet + /* TIMESTAMP SEC */ + hg->time_sec.chunk.vendor_id = htons(0x0000); + hg->time_sec.chunk.type_id = htons(0x0009); + hg->time_sec.data = htonl(now.tv_sec); + hg->time_sec.chunk.length = htons(sizeof(hg->time_sec)); + + + /* TIMESTAMP USEC */ + hg->time_usec.chunk.vendor_id = htons(0x0000); + hg->time_usec.chunk.type_id = htons(0x000a); + hg->time_usec.data = htonl(now.tv_usec); + hg->time_usec.chunk.length = htons(sizeof(hg->time_usec)); + + /* Protocol TYPE */ + hg->proto_t.chunk.vendor_id = htons(0x0000); + hg->proto_t.chunk.type_id = htons(0x000b); + hg->proto_t.data = PROTO_RTCP_JSON; + hg->proto_t.chunk.length = htons(sizeof(hg->proto_t)); + + /* Capture ID */ + hg->capt_id.chunk.vendor_id = htons(0x0000); + hg->capt_id.chunk.type_id = htons(0x000c); + hg->capt_id.data = capt_id; + hg->capt_id.chunk.length = htons(sizeof(hg->capt_id)); + + /* Payload */ + payload_chunk.vendor_id = htons(0x0000); + payload_chunk.type_id = 0 ? htons(0x0010) : htons(0x000f); + payload_chunk.length = htons(sizeof(payload_chunk) + s->len); + + tlen = sizeof(struct hep_generic) + s->len + iplen + sizeof(hep_chunk_t); + +#if 0 + /* auth key */ + if(profile_transport[idx].capt_password != NULL) { + + tlen += sizeof(hep_chunk_t); + /* Auth key */ + authkey_chunk.vendor_id = htons(0x0000); + authkey_chunk.type_id = htons(0x000e); + authkey_chunk.length = htons(sizeof(authkey_chunk) + strlen(profile_transport[idx].capt_password)); + tlen += strlen(profile_transport[idx].capt_password); + } +#endif + + /* correlation key */ + //if(rcinfo->correlation_id.s && rcinfo->correlation_id.len > 0) { + + tlen += sizeof(hep_chunk_t); + /* Correlation key */ + correlation_chunk.vendor_id = htons(0x0000); + correlation_chunk.type_id = htons(0x0011); + correlation_chunk.length = htons(sizeof(correlation_chunk) + id->len); + tlen += id->len; + //} + + /* total */ + hg->header.length = htons(tlen); + + buffer = (void*)malloc(tlen); + if (buffer==0){ + ilog(LOG_ERR, "ERROR: out of memory"); + free(hg); + return -1; + } + + memcpy((void*) buffer, hg, sizeof(struct hep_generic)); + buflen = sizeof(struct hep_generic); + + /* IPv4 */ + if(hg->ip_family.data == AF_INET) { + /* SRC IP */ + memcpy((void*) buffer+buflen, &src_ip4, sizeof(struct hep_chunk_ip4)); + buflen += sizeof(struct hep_chunk_ip4); + + memcpy((void*) buffer+buflen, &dst_ip4, sizeof(struct hep_chunk_ip4)); + buflen += sizeof(struct hep_chunk_ip4); + } + /* IPv6 */ + else if(hg->ip_family.data == AF_INET6) { + /* SRC IPv6 */ + memcpy((void*) buffer+buflen, &src_ip4, sizeof(struct hep_chunk_ip6)); + buflen += sizeof(struct hep_chunk_ip6); + + memcpy((void*) buffer+buflen, &dst_ip6, sizeof(struct hep_chunk_ip6)); + buflen += sizeof(struct hep_chunk_ip6); + } + +#if 0 + /* AUTH KEY CHUNK */ + if(profile_transport[idx].capt_password != NULL) { + + memcpy((void*) buffer+buflen, &authkey_chunk, sizeof(struct hep_chunk)); + buflen += sizeof(struct hep_chunk); + + /* Now copying payload self */ + memcpy((void*) buffer+buflen, profile_transport[idx].capt_password, strlen(profile_transport[idx].capt_password)); + buflen+=strlen(profile_transport[idx].capt_password); + } +#endif + + /* Correlation KEY CHUNK */ + //if(rcinfo->correlation_id.s && rcinfo->correlation_id.len > 0) { + + memcpy((void*) buffer+buflen, &correlation_chunk, sizeof(struct hep_chunk)); + buflen += sizeof(struct hep_chunk); + + /* Now copying payload self */ + memcpy((void*) buffer+buflen, id->s, id->len); + buflen+= id->len; + //} + + /* PAYLOAD CHUNK */ + memcpy((void*) buffer+buflen, &payload_chunk, sizeof(struct hep_chunk)); + buflen += sizeof(struct hep_chunk); + + /* Now copying payload self */ + memcpy((void*) buffer+buflen, s->str, s->len); + buflen+=s->len; + +#if 0 + /* make sleep after 100 errors */ + if(errors > 50) { + LERR( "HEP server is down... retrying after sleep..."); + if(!profile_transport[idx].usessl) { + sleep(2); + if(init_hepsocket_blocking(idx)) { + profile_transport[idx].initfails++; + } + + errors=0; + } +#ifdef USE_SSL + else { + sleep(2); + + if(initSSL(idx)) profile_transport[idx].initfails++; + + errors=0; + } +#endif /* USE SSL */ + + } + + /* send this packet out of our socket */ + if(send_data(buffer, buflen, idx)) { + errors++; + stats.errors_total++; + } +#endif + + g_string_truncate(s, 0); + g_string_append_len(s, buffer, buflen); + + /* FREE */ + if(buffer) free(buffer); + if(hg) free(hg); + + return 0; +} diff --git a/daemon/homer.h b/daemon/homer.h new file mode 100644 index 000000000..c819004d4 --- /dev/null +++ b/daemon/homer.h @@ -0,0 +1,14 @@ +#ifndef __HOMER_H__ +#define __HOMER_H__ + +#include "socket.h" + + +struct homer_sender; + + +struct homer_sender *homer_sender_new(const endpoint_t *, int, int); +int homer_send(struct homer_sender *, GString *, const str *, const endpoint_t *, const endpoint_t *); + + +#endif diff --git a/daemon/main.c b/daemon/main.c index 4b1b56f50..3c9080ef2 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -29,6 +29,7 @@ #include "ice.h" #include "socket.h" #include "media_socket.h" +#include "homer.h" @@ -54,13 +55,16 @@ static mutex_t *openssl_locks; static char *pidfile; static gboolean foreground; static GQueue interfaces = G_QUEUE_INIT; -endpoint_t tcp_listen_ep; -endpoint_t udp_listen_ep; -endpoint_t ng_listen_ep; -endpoint_t cli_listen_ep; -endpoint_t graphite_ep; -endpoint_t redis_ep; -endpoint_t redis_write_ep; +static endpoint_t tcp_listen_ep; +static endpoint_t udp_listen_ep; +static endpoint_t ng_listen_ep; +static endpoint_t cli_listen_ep; +static endpoint_t graphite_ep; +static endpoint_t redis_ep; +static endpoint_t redis_write_ep; +static endpoint_t homer_ep; +static int homer_protocol = SOCK_DGRAM; +static int homer_id = 2001; static int tos; static int table = -1; static int no_fallback; @@ -246,7 +250,7 @@ static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth if (l < 0) return -1; *db = l; - if (endpoint_parse_any(ep, str)) + if (endpoint_parse_any_full(ep, str)) return -1; return 0; } @@ -270,6 +274,8 @@ static void options(int *argc, char ***argv) { char *log_facility_rtcp_s = NULL; int version = 0; int sip_source = 0; + char *homerp = NULL; + char *homerproto = NULL; GOptionEntry e[] = { { "version", 'v', 0, G_OPTION_ARG_NONE, &version, "Print build time and exit", NULL }, @@ -280,7 +286,7 @@ static void options(int *argc, char ***argv) { { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, - { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" }, + { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46:PORT" }, { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, { "graphite-prefix",0, 0, G_OPTION_ARG_STRING, &graphite_prefix_s, "Prefix for graphite line", "STRING"}, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, @@ -307,6 +313,9 @@ static void options(int *argc, char ***argv) { { "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL }, { "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL }, { "max-sessions", 0, 0, G_OPTION_ARG_INT, &max_sessions, "Limit of maximum number of sessions", "INT" }, + { "homer", 0, 0, G_OPTION_ARG_STRING, &homerp, "Address of Homer server for RTCP stats","IP46:PORT"}, + { "homer-protocol",0,0,G_OPTION_ARG_STRING, &homerproto, "Transport protocol for Homer (default udp)", "udp|tcp" }, + { "homer-id", 0, 0, G_OPTION_ARG_STRING, &homer_id, "'Capture ID' to use within the HEP protocol", "INT" }, { NULL, } }; @@ -350,13 +359,26 @@ static void options(int *argc, char ***argv) { die("Invalid IP or port (--listen-cli)"); } - if (graphitep) {if (endpoint_parse_any(&graphite_ep, graphitep)) + if (graphitep) {if (endpoint_parse_any_full(&graphite_ep, graphitep)) die("Invalid IP or port (--graphite)"); } if (graphite_prefix_s) set_prefix(graphite_prefix_s); + if (homerp) { + if (endpoint_parse_any_full(&homer_ep, homerp)) + die("Invalid IP or port (--homer)"); + } + if (homerproto) { + if (!strcmp(homerproto, "tcp")) + homer_protocol = SOCK_STREAM; + else if (!strcmp(homerproto, "udp")) + homer_protocol = SOCK_DGRAM; + else + die("Invalid protocol (--homer-protocol)"); + } + if (tos < 0 || tos > 255) die("Invalid TOS value"); @@ -614,6 +636,8 @@ no_kernel: daemonize(); wpidfile(); + ctx->m->homer = homer_sender_new(&homer_ep, homer_protocol, homer_id); + if (mc.redis) { // start redis restore timer gettimeofday(&redis_start, NULL); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 4d719171b..3986f18fc 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1173,8 +1173,8 @@ loop_ok: if (rwf_in) handler_ret = rwf_in(s, in_srtp); if (handler_ret >= 0) { - if (rtcp && _log_facility_rtcp) - parse_and_log_rtcp_report(sfd, s->s, s->len); + if (rtcp) + parse_and_log_rtcp_report(sfd, s, fsin); if (rwf_out) handler_ret += rwf_out(s, out_srtp); } diff --git a/daemon/rtcp.c b/daemon/rtcp.c index bf226c853..56ee0761f 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -12,6 +12,7 @@ #include "rtp.h" #include "crypto.h" #include "rtcp_xr.h" +#include "homer.h" @@ -474,8 +475,9 @@ int rtcp_demux_is_rtcp(const str *s) { return 1; } -void print_rtcp_common(char** cdrbufcur, const pjmedia_rtcp_common *common) { - *cdrbufcur += sprintf(*cdrbufcur,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ", +static void print_rtcp_common(char** cdrbufcur, const pjmedia_rtcp_common *common) { + if (*cdrbufcur) + *cdrbufcur += sprintf(*cdrbufcur,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ", common->version, common->p, common->count, @@ -484,23 +486,34 @@ void print_rtcp_common(char** cdrbufcur, const pjmedia_rtcp_common *common) { ntohl(common->ssrc)); } -void print_rtcp_sr(char** cdrbufcur, const pjmedia_rtcp_sr* sr) { - *cdrbufcur += sprintf(*cdrbufcur,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, sender_bytes=%u, ", +static void print_rtcp_sr(char** cdrbufcur, const pjmedia_rtcp_sr* sr, GString *json) { + if (*cdrbufcur) + *cdrbufcur += sprintf(*cdrbufcur,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, sender_bytes=%u, ", ntohl(sr->ntp_sec), ntohl(sr->ntp_frac), ntohl(sr->rtp_ts), ntohl(sr->sender_pcount), ntohl(sr->sender_bcount)); + + if (json) + g_string_append_printf(json, "\"sender_information\":{\"ntp_timestamp_sec\":%u," + "\"ntp_timestamp_usec\":%u,\"octets\":%u,\"rtp_timestamp\":%u, \"packets\":%u},", + ntohl(sr->ntp_sec), + ntohl(sr->ntp_frac), + ntohl(sr->sender_bcount), + ntohl(sr->rtp_ts), + ntohl(sr->sender_pcount)); } -void print_rtcp_rr(char** cdrbufcur, const pjmedia_rtcp_rr* rr) { +void print_rtcp_rr(char** cdrbufcur, const pjmedia_rtcp_rr* rr, pjmedia_rtcp_common *common, GString *json) { /* Get packet loss */ u_int32_t packet_loss=0; packet_loss = (rr->total_lost_2 << 16) + (rr->total_lost_1 << 8) + rr->total_lost_0; - *cdrbufcur += sprintf(*cdrbufcur,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ", + if (*cdrbufcur) + *cdrbufcur += sprintf(*cdrbufcur,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ", ntohl(rr->ssrc), rr->fract_lost, packet_loss, @@ -508,47 +521,81 @@ void print_rtcp_rr(char** cdrbufcur, const pjmedia_rtcp_rr* rr) { ntohl(rr->jitter), ntohl(rr->lsr), ntohl(rr->dlsr)); + + if (json) + g_string_append_printf(json, "\"ssrc\":%u,\"type\":%u, \"report_blocks\":[{\"source_ssrc\":%u," + "\"highest_seq_no\":%u,\"fraction_lost\":%u,\"ia_jitter\":%u," + "\"packets_lost\":%u,\"lsr\":%u,\"dlsr\":%u}],\"report_count\":1,", + ntohl(rr->ssrc), + common->pt, + ntohl(rr->ssrc), + ntohl(rr->last_seq), + rr->fract_lost, + ntohl(rr->jitter), + packet_loss, + ntohl(rr->lsr), + ntohl(rr->dlsr)); } -void parse_and_log_rtcp_report(struct stream_fd *sfd, const void *pkt, long size) { +void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *s, const endpoint_t *src) { + // XXX replace with GString static const int CDRBUFLENGTH = 1024*1024*1; // 1 MB char cdrbuffer[CDRBUFLENGTH]; char* cdrbufcur = cdrbuffer; - pjmedia_rtcp_common *common = (pjmedia_rtcp_common*) pkt; + pjmedia_rtcp_common *common = (pjmedia_rtcp_common*) s->s; const pjmedia_rtcp_rr *rr = NULL; const pjmedia_rtcp_sr *sr = NULL; + GString *json; + struct call *c = sfd->call; + struct callmaster *cm = c->callmaster; + + if (s->len < sizeof(*common)) + return; - cdrbufcur += sprintf(cdrbufcur,"["STR_FORMAT"] ", STR_FMT(&sfd->stream->call->callid)); + cdrbufcur = _log_facility_rtcp ? cdrbuffer : NULL; + json = cm->homer ? g_string_new("{ ") : NULL; - if (size < sizeof(*common)) + // anything to do? + if (!cdrbufcur && !json) return; + if (cdrbufcur) + cdrbufcur += sprintf(cdrbufcur,"["STR_FORMAT"] ", STR_FMT(&sfd->stream->call->callid)); + print_rtcp_common(&cdrbufcur,common); /* Parse RTCP */ if (common->pt == RTCP_PT_SR) { - if (size < (sizeof(*common) + sizeof(*sr))) + if (s->len < (sizeof(*common) + sizeof(*sr))) return; - sr = (pjmedia_rtcp_sr*) (((char*)pkt) + sizeof(pjmedia_rtcp_common)); + sr = (pjmedia_rtcp_sr*) ((s->s) + sizeof(pjmedia_rtcp_common)); - print_rtcp_sr(&cdrbufcur,sr); + print_rtcp_sr(&cdrbufcur,sr, json); - if (common->count > 0 && size >= (sizeof(pjmedia_rtcp_sr_pkt))) { - rr = (pjmedia_rtcp_rr*)(((char*)pkt) + (sizeof(pjmedia_rtcp_common) + if (common->count > 0 && s->len >= (sizeof(pjmedia_rtcp_sr_pkt))) { + rr = (pjmedia_rtcp_rr*)((s->s) + (sizeof(pjmedia_rtcp_common) + sizeof(pjmedia_rtcp_sr))); - print_rtcp_rr(&cdrbufcur,rr); + print_rtcp_rr(&cdrbufcur,rr, common, json); } } else if (common->pt == RTCP_PT_RR && common->count > 0) { - if (size < (sizeof(*common) + sizeof(*rr))) + if (s->len < (sizeof(*common) + sizeof(*rr))) return; - rr = (pjmedia_rtcp_rr*)(((char*)pkt) + sizeof(pjmedia_rtcp_common)); - print_rtcp_rr(&cdrbufcur,rr); + rr = (pjmedia_rtcp_rr*)((s->s) + sizeof(pjmedia_rtcp_common)); + print_rtcp_rr(&cdrbufcur,rr, common, json); } else if (common->pt == RTCP_PT_XR) { - pjmedia_rtcp_xr_rx_rtcp_xr(cdrbufcur, pkt, size); + pjmedia_rtcp_xr_rx_rtcp_xr(&cdrbufcur, s); + } + // XXX parse/support additional RTCP types + + if (cdrbufcur) + rtcplog(cdrbuffer); + + if (json) { + g_string_append(json, " }"); + homer_send(cm->homer, json, &c->callid, src, &sfd->socket.local); } - rtcplog(cdrbuffer); } diff --git a/daemon/rtcp.h b/daemon/rtcp.h index 96f88fa8c..2233a28e7 100644 --- a/daemon/rtcp.h +++ b/daemon/rtcp.h @@ -105,6 +105,6 @@ int rtcp_savp2avp(str *, struct crypto_context *); int rtcp_demux_is_rtcp(const str *); -void parse_and_log_rtcp_report(struct stream_fd *sfd, const void *pkt, long size); +void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *, const endpoint_t *); #endif diff --git a/daemon/rtcp_xr.c b/daemon/rtcp_xr.c index 39e1a7764..a87cf161d 100644 --- a/daemon/rtcp_xr.c +++ b/daemon/rtcp_xr.c @@ -21,8 +21,8 @@ #define BT_VOIP_METRICS 7 -void print_rtcp_xr_common(char* cdrbufcur,const pjmedia_rtcp_xr_pkt *rtcp_xr) { - cdrbufcur += sprintf(cdrbufcur,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ", +void print_rtcp_xr_common(char** cdrbufcur,const pjmedia_rtcp_xr_pkt *rtcp_xr) { + *cdrbufcur += sprintf(*cdrbufcur,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ", rtcp_xr->common.version, rtcp_xr->common.p, rtcp_xr->common.count, @@ -31,31 +31,31 @@ void print_rtcp_xr_common(char* cdrbufcur,const pjmedia_rtcp_xr_pkt *rtcp_xr) { ntohl(rtcp_xr->common.ssrc)); } -void print_rtcp_xr_rb_header(char* cdrbufcur,const pjmedia_rtcp_xr_rb_header *rb_header) { - cdrbufcur += sprintf(cdrbufcur,"rb_header_blocktype=%u, rb_header_blockspecdata=%u, rb_header_blocklength=%u, ", +void print_rtcp_xr_rb_header(char** cdrbufcur,const pjmedia_rtcp_xr_rb_header *rb_header) { + *cdrbufcur += sprintf(*cdrbufcur,"rb_header_blocktype=%u, rb_header_blockspecdata=%u, rb_header_blocklength=%u, ", rb_header->bt, rb_header->specific, ntohs(rb_header->length)); } -void print_rtcp_xr_rb_rr_time(char* cdrbufcur,const pjmedia_rtcp_xr_rb_rr_time *rb_rr_time) { +void print_rtcp_xr_rb_rr_time(char** cdrbufcur,const pjmedia_rtcp_xr_rb_rr_time *rb_rr_time) { print_rtcp_xr_rb_header(cdrbufcur,&rb_rr_time->header); - cdrbufcur += sprintf(cdrbufcur,"rb_rr_time_ntp_sec=%u, rb_rr_time_ntp_frac=%u, ", + *cdrbufcur += sprintf(*cdrbufcur,"rb_rr_time_ntp_sec=%u, rb_rr_time_ntp_frac=%u, ", ntohl(rb_rr_time->ntp_sec), ntohl(rb_rr_time->ntp_frac)); } -void print_rtcp_xr_rb_dlrr(char* cdrbufcur,const pjmedia_rtcp_xr_rb_dlrr *rb_dlrr) { +void print_rtcp_xr_rb_dlrr(char** cdrbufcur,const pjmedia_rtcp_xr_rb_dlrr *rb_dlrr) { print_rtcp_xr_rb_header(cdrbufcur,&rb_dlrr->header); - cdrbufcur += sprintf(cdrbufcur,"rb_dlrr_ssrc=%u, rb_dlrr_lrr=%u, rb_dlrr_dlrr=%u, ", + *cdrbufcur += sprintf(*cdrbufcur,"rb_dlrr_ssrc=%u, rb_dlrr_lrr=%u, rb_dlrr_dlrr=%u, ", ntohl(rb_dlrr->item.ssrc), ntohl(rb_dlrr->item.lrr), ntohl(rb_dlrr->item.dlrr)); } -void print_rtcp_xr_rb_stats(char* cdrbufcur,const pjmedia_rtcp_xr_rb_stats *rb_stats) { +void print_rtcp_xr_rb_stats(char** cdrbufcur,const pjmedia_rtcp_xr_rb_stats *rb_stats) { print_rtcp_xr_rb_header(cdrbufcur,&rb_stats->header); - cdrbufcur += sprintf(cdrbufcur,"rb_stats_ssrc=%u, rb_stats_begin_seq=%u, rb_stats_end_seq=%u, rb_stats_lost_packets=%u, rb_stats_duplicate_packets=%u," + *cdrbufcur += sprintf(*cdrbufcur,"rb_stats_ssrc=%u, rb_stats_begin_seq=%u, rb_stats_end_seq=%u, rb_stats_lost_packets=%u, rb_stats_duplicate_packets=%u," "rb_stats_jitter_min=%u, rb_stats_jitter_max=%u, rb_stats_jitter_mean=%u, rb_stats_jitter_deviation=%u," "rb_stats_toh_min=%u, rb_stats_toh_max=%u, rb_stats_toh_mean=%u, rb_stats_toh_deviation=%u, ", ntohl(rb_stats->ssrc), @@ -73,9 +73,9 @@ void print_rtcp_xr_rb_stats(char* cdrbufcur,const pjmedia_rtcp_xr_rb_stats *rb_s ntohl(rb_stats->toh_dev)); } -void print_rtcp_xr_rb_voip_mtc(char* cdrbufcur,const pjmedia_rtcp_xr_rb_voip_mtc *rb_voip_mtc) { +void print_rtcp_xr_rb_voip_mtc(char** cdrbufcur,const pjmedia_rtcp_xr_rb_voip_mtc *rb_voip_mtc) { print_rtcp_xr_rb_header(cdrbufcur,&rb_voip_mtc->header); - cdrbufcur += sprintf(cdrbufcur,"rb_voip_mtc_ssrc=%u, rb_voip_mtc_loss_rate=%u, rb_voip_mtc_discard_rate=%u, rb_voip_mtc_burst_den=%u, " + *cdrbufcur += sprintf(*cdrbufcur,"rb_voip_mtc_ssrc=%u, rb_voip_mtc_loss_rate=%u, rb_voip_mtc_discard_rate=%u, rb_voip_mtc_burst_den=%u, " "rb_voip_mtc_gap_den=%u, rb_voip_mtc_burst_dur=%u, rb_voip_mtc_gap_dur=%u, rb_voip_mtc_rnd_trip_delay=%u, " "rb_voip_mtc_end_sys_delay=%u, rb_voip_mtc_signal_lvl=%u, rb_voip_mtc_noise_lvl=%u, rb_voip_mtc_rerl=%u, " "rb_voip_mtc_gmin=%u, rb_voip_mtc_r_factor=%u, rb_voip_mtc_ext_r_factor=%u, rb_voip_mtc_mos_lq=%u, " @@ -104,9 +104,9 @@ void print_rtcp_xr_rb_voip_mtc(char* cdrbufcur,const pjmedia_rtcp_xr_rb_voip_mtc ntohs(rb_voip_mtc->jb_abs_max)); } -void pjmedia_rtcp_xr_rx_rtcp_xr(char* cdrbufcur, const void *pkt, size_t size) { +void pjmedia_rtcp_xr_rx_rtcp_xr(char** cdrbufcur, const str *s) { - const pjmedia_rtcp_xr_pkt *rtcp_xr = (pjmedia_rtcp_xr_pkt*) pkt; + const pjmedia_rtcp_xr_pkt *rtcp_xr = (pjmedia_rtcp_xr_pkt*) s->s; const pjmedia_rtcp_xr_rb_rr_time *rb_rr_time = NULL; const pjmedia_rtcp_xr_rb_dlrr *rb_dlrr = NULL; const pjmedia_rtcp_xr_rb_stats *rb_stats = NULL; @@ -115,7 +115,7 @@ void pjmedia_rtcp_xr_rx_rtcp_xr(char* cdrbufcur, const void *pkt, size_t size) { rtcp_xr->buf; unsigned pkt_len, rb_len; - if (size < sizeof(*rtcp_xr)) + if (s->len < sizeof(*rtcp_xr)) return; if (rtcp_xr->common.pt != RTCP_XR) @@ -125,11 +125,11 @@ void pjmedia_rtcp_xr_rx_rtcp_xr(char* cdrbufcur, const void *pkt, size_t size) { pkt_len = ntohs((u_int16_t)rtcp_xr->common.length); - if ((pkt_len + 1) > (size / 4)) + if ((pkt_len + 1) > (s->len / 4)) return; /* Parse report rpt_types */ - while ((int32_t*)rb_hdr < (int32_t*)pkt + pkt_len) + while ((int32_t*)rb_hdr < (int32_t*)s->s + pkt_len) { rb_len = ntohs((u_int16_t)rb_hdr->length); diff --git a/daemon/rtcp_xr.h b/daemon/rtcp_xr.h index b8cbb5b5d..a90476568 100644 --- a/daemon/rtcp_xr.h +++ b/daemon/rtcp_xr.h @@ -11,6 +11,8 @@ #include #include +#include "str.h" + /** * @defgroup PJMED_RTCP_XR RTCP Extended Report (XR) - RFC 3611 * @ingroup PJMEDIA_SESSION @@ -223,7 +225,7 @@ typedef struct pjmedia_rtcp_xr_pkt * @param rtcp_pkt The received RTCP XR packet. * @param size Size of the incoming packet. */ -void pjmedia_rtcp_xr_rx_rtcp_xr( char* cdrbufcur, const void *rtcp_pkt, size_t size); +void pjmedia_rtcp_xr_rx_rtcp_xr( char** cdrbufcur, const str *s); #endif /* RTCP_XR_H_ */ diff --git a/daemon/socket.c b/daemon/socket.c index 3f69074d6..15f942f6f 100644 --- a/daemon/socket.c +++ b/daemon/socket.c @@ -31,6 +31,7 @@ static ssize_t __ip_sendmsg(socket_t *s, struct msghdr *mh, const endpoint_t *ep static ssize_t __ip_sendto(socket_t *s, const void *buf, size_t len, const endpoint_t *ep); static int __ip4_tos(socket_t *, unsigned int); static int __ip6_tos(socket_t *, unsigned int); +static int __ip_error(socket_t *s); static void __ip4_endpoint2kernel(struct re_address *, const endpoint_t *); static void __ip6_endpoint2kernel(struct re_address *, const endpoint_t *); static void __ip4_kernel2endpoint(endpoint_t *ep, const struct re_address *ra); @@ -67,6 +68,7 @@ static struct socket_family __socket_families[__SF_LAST] = { .sendmsg = __ip_sendmsg, .sendto = __ip_sendto, .tos = __ip4_tos, + .error = __ip_error, .endpoint2kernel = __ip4_endpoint2kernel, .kernel2endpoint = __ip4_kernel2endpoint, }, @@ -91,6 +93,7 @@ static struct socket_family __socket_families[__SF_LAST] = { .sendmsg = __ip_sendmsg, .sendto = __ip_sendto, .tos = __ip6_tos, + .error = __ip_error, .endpoint2kernel = __ip6_endpoint2kernel, .kernel2endpoint = __ip6_kernel2endpoint, }, @@ -270,6 +273,13 @@ static int __ip6_tos(socket_t *s, unsigned int tos) { setsockopt(s->fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof(tos)); return 0; } +static int __ip_error(socket_t *s) { + int optval; + socklen_t optlen = sizeof(optval); + if (getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen)) + return -1; + return optval; +} static void __ip4_endpoint2kernel(struct re_address *ra, const endpoint_t *ep) { ZERO(*ra); ra->family = AF_INET; @@ -478,23 +488,15 @@ fail: return -1; } -int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep) { - sockfamily_t *fam; +int connect_socket_retry(socket_t *r) { int ret = 0; - fam = ep->address.family; - - if (__socket(r, type, fam)) - return -1; - nonblock(r->fd); - if (fam->connect(r, ep)) { - if (errno != EINPROGRESS) + if (r->family->connect(r, &r->remote)) { + if (errno != EINPROGRESS && errno != EALREADY) goto fail; ret = 1; } - r->remote = *ep; - return ret; fail: @@ -502,6 +504,19 @@ fail: return -1; } +int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep) { + sockfamily_t *fam; + + fam = ep->address.family; + + if (__socket(r, type, fam)) + return -1; + nonblock(r->fd); + r->remote = *ep; + + return connect_socket_retry(r); +} + int close_socket(socket_t *r) { if (!r || r->fd == -1) { __C_DBG("close() syscall not called, fd=%d", r->fd); diff --git a/daemon/socket.h b/daemon/socket.h index 771ca68d7..ca2c856e8 100644 --- a/daemon/socket.h +++ b/daemon/socket.h @@ -63,6 +63,7 @@ struct socket_family { ssize_t (*sendmsg)(socket_t *, struct msghdr *, const endpoint_t *); ssize_t (*sendto)(socket_t *, const void *, size_t, const endpoint_t *); int (*tos)(socket_t *, unsigned int); + int (*error)(socket_t *); void (*endpoint2kernel)(struct re_address *, const endpoint_t *); void (*kernel2endpoint)(endpoint_t *, const struct re_address *); }; @@ -146,6 +147,7 @@ INLINE int is_addr_unspecified(const sockaddr_t *a) { #define socket_recvfrom(s,a...) (s)->family->recvfrom((s), a) #define socket_sendmsg(s,a...) (s)->family->sendmsg((s), a) #define socket_sendto(s,a...) (s)->family->sendto((s), a) +#define socket_error(s) (s)->family->error((s)) INLINE ssize_t socket_sendiov(socket_t *s, const struct iovec *v, unsigned int len, const endpoint_t *dst) { struct msghdr mh; ZERO(mh); @@ -174,7 +176,8 @@ void socket_init(void); int open_socket(socket_t *r, int type, unsigned int port, const sockaddr_t *); int connect_socket(socket_t *r, int type, const endpoint_t *ep); -int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); +int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress +int connect_socket_retry(socket_t *r); // retries connect() while in progress int close_socket(socket_t *r); sockfamily_t *get_socket_family_rfc(const str *s); @@ -182,7 +185,7 @@ sockfamily_t *__get_socket_family_enum(enum socket_families); int sockaddr_parse_any(sockaddr_t *dst, const char *src); int sockaddr_parse_any_str(sockaddr_t *dst, const str *src); int sockaddr_parse_str(sockaddr_t *dst, sockfamily_t *fam, const str *src); -int endpoint_parse_any(endpoint_t *, const char *); +int endpoint_parse_any(endpoint_t *, const char *); // address optional void kernel2endpoint(endpoint_t *ep, const struct re_address *ra); unsigned int sockaddr_hash(const sockaddr_t *); @@ -206,6 +209,16 @@ INLINE int endpoint_parse_port_any(endpoint_t *e, const char *p, unsigned int po e->port = port; return sockaddr_parse_any(&e->address, p); } +// address required +INLINE int endpoint_parse_any_full(endpoint_t *d, const char *s) { + int ret; + ret = endpoint_parse_any(d, s); + if (ret) + return ret; + if (is_addr_unspecified(&d->address)) + return -1; + return 0; +} INLINE int ipv46_any_convert(endpoint_t *ep) { if (ep->address.family->af != AF_INET) return 0; diff --git a/debian/ngcp-rtpengine-daemon.default b/debian/ngcp-rtpengine-daemon.default index a296a9c88..3c31e387b 100644 --- a/debian/ngcp-rtpengine-daemon.default +++ b/debian/ngcp-rtpengine-daemon.default @@ -36,3 +36,6 @@ TABLE=0 # GRAPHITE_PREFIX=myownprefix. # MAX_SESSIONS=5000 # CREATE_IPTABLES_CHAIN=no +# HOMER=123.234.345.456:65432 +# HOMER_PROTOCOL=udp +# HOMER_ID=2001 diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index f9a193d2d..46ba93358 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -82,6 +82,9 @@ OPTIONS="$OPTIONS --table=$TABLE" [ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL" [ -z "$GRAPHITE_PREFIX" ] || OPTIONS="$OPTIONS --graphite-prefix=$GRAPHITE_PREFIX" [ -z "$MAX_SESSIONS" ] || OPTIONS="$OPTIONS --max-sessions=$MAX_SESSIONS" +[ -z "$HOMER" ] || OPTIONS="$OPTIONS --homer=$HOMER" +[ -z "$HOMER_PROTOCOL" ] || OPTIONS="$OPTIONS --homer-protocol=$HOMER_PROTOCOL" +[ -z "$HOMER_ID" ] || OPTIONS="$OPTIONS --homer-id=$HOMER_ID" if test "$FORK" = "no" ; then OPTIONS="$OPTIONS --foreground"