From 29545e5434e267932d8674a17c675ba64846e0f9 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 29 Mar 2016 13:19:21 -0400 Subject: [PATCH] MT#18599 use kernel socket receive timestamping Change-Id: I6fde08be7942df7986eea39d2b233ce60d1fa862 --- daemon/homer.c | 17 ++++++------- daemon/homer.h | 3 ++- daemon/media_socket.c | 12 ++++++---- daemon/rtcp.c | 6 +++-- daemon/rtcp.h | 2 +- daemon/socket.c | 56 +++++++++++++++++++++++++++++++++++++++---- daemon/socket.h | 4 ++++ 7 files changed, 78 insertions(+), 22 deletions(-) diff --git a/daemon/homer.c b/daemon/homer.c index 3e1b35615..37442cd17 100644 --- a/daemon/homer.c +++ b/daemon/homer.c @@ -39,7 +39,8 @@ struct homer_sender { -static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst); +static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst, + const struct timeval *); // state handlers static int __established(struct homer_sender *hs); @@ -198,7 +199,7 @@ struct homer_sender *homer_sender_new(const endpoint_t *ep, int protocol, int ca // takes over the GString int homer_send(struct homer_sender *hs, GString *s, const str *id, const endpoint_t *src, - const endpoint_t *dst) + const endpoint_t *dst, const struct timeval *tv) { if (!hs) goto out; @@ -209,7 +210,7 @@ int homer_send(struct homer_sender *hs, GString *s, const str *id, const endpoin ilog(LOG_DEBUG, "JSON to send to Homer: '"STR_FORMAT"'", G_STR_FMT(s)); - if (send_hepv3(s, id, hs->capture_id, src, dst)) + if (send_hepv3(s, id, hs->capture_id, src, dst, tv)) goto out; mutex_lock(&hs->lock); @@ -316,7 +317,9 @@ typedef struct hep_generic hep_generic_t; #define PROTO_RTCP_JSON 0x05 // modifies the GString in place -static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst) { +static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst, + const struct timeval *tv) +{ struct hep_generic *hg=NULL; void* buffer; @@ -327,7 +330,6 @@ static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t //hep_chunk_t authkey_chunk; hep_chunk_t correlation_chunk; //static int errors = 0; - struct timeval now; hg = malloc(sizeof(struct hep_generic)); memset(hg, 0, sizeof(struct hep_generic)); @@ -395,18 +397,17 @@ static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t hg->dst_port.chunk.length = htons(sizeof(hg->dst_port)); - gettimeofday(&now, NULL); // XXX replace with timestamp from actual packet /* TIMESTAMP SEC */ hg->time_sec.chunk.vendor_id = htons(0x0000); hg->time_sec.chunk.type_id = htons(0x0009); - hg->time_sec.data = htonl(now.tv_sec); + hg->time_sec.data = htonl(tv->tv_sec); hg->time_sec.chunk.length = htons(sizeof(hg->time_sec)); /* TIMESTAMP USEC */ hg->time_usec.chunk.vendor_id = htons(0x0000); hg->time_usec.chunk.type_id = htons(0x000a); - hg->time_usec.data = htonl(now.tv_usec); + hg->time_usec.data = htonl(tv->tv_usec); hg->time_usec.chunk.length = htons(sizeof(hg->time_usec)); /* Protocol TYPE */ diff --git a/daemon/homer.h b/daemon/homer.h index c819004d4..690bc200e 100644 --- a/daemon/homer.h +++ b/daemon/homer.h @@ -8,7 +8,8 @@ struct homer_sender; struct homer_sender *homer_sender_new(const endpoint_t *, int, int); -int homer_send(struct homer_sender *, GString *, const str *, const endpoint_t *, const endpoint_t *); +int homer_send(struct homer_sender *, GString *, const str *, const endpoint_t *, const endpoint_t *, + const struct timeval *tv); #endif diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 3986f18fc..202c1229f 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -537,7 +537,7 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc static int get_port6(socket_t *r, unsigned int port, struct intf_spec *spec) { if (open_socket(r, SOCK_DGRAM, port, &spec->local_address.addr)) return -1; - + socket_timestamping(r); return 0; } @@ -1013,7 +1013,7 @@ noop: /* XXX split this function into pieces */ /* called lock-free */ -static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin) { +static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) { struct packet_stream *stream, *sink = NULL, *in_srtp, *out_srtp; @@ -1174,7 +1174,7 @@ loop_ok: handler_ret = rwf_in(s, in_srtp); if (handler_ret >= 0) { if (rtcp) - parse_and_log_rtcp_report(sfd, s, fsin); + parse_and_log_rtcp_report(sfd, s, fsin, tv); if (rwf_out) handler_ret += rwf_out(s, out_srtp); } @@ -1358,6 +1358,7 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { struct call *ca; str s; endpoint_t ep; + struct timeval tv; if (sfd->socket.fd != fd) goto out; @@ -1373,7 +1374,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { } #endif - ret = socket_recvfrom(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, &ep); + ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, + &ep, &tv); if (ret < 0) { if (errno == EINTR) @@ -1387,7 +1389,7 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { ilog(LOG_WARNING, "UDP packet possibly truncated"); str_init_len(&s, buf + RTP_BUFFER_HEAD_ROOM, ret); - ret = stream_packet(sfd, &s, &ep); + ret = stream_packet(sfd, &s, &ep, &tv); if (ret < 0) { ilog(LOG_WARNING, "Write error on RTP socket: %s", strerror(-ret)); call_destroy(sfd->call); diff --git a/daemon/rtcp.c b/daemon/rtcp.c index d636c7511..5380f8511 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -605,7 +605,9 @@ static void print_rtcp_list_end(GString *json) { } } -void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *ori_s, const endpoint_t *src) { +void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *ori_s, const endpoint_t *src, + const struct timeval *tv) +{ GString *log; str iter_s, comp_s; @@ -728,7 +730,7 @@ void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *ori_s, const en if (json) { str_sanitize(json); g_string_append(json, " }"); - homer_send(cm->homer, json, &c->callid, src, &sfd->socket.local); + homer_send(cm->homer, json, &c->callid, src, &sfd->socket.local, tv); json = NULL; } diff --git a/daemon/rtcp.h b/daemon/rtcp.h index cd5c16431..c266460f9 100644 --- a/daemon/rtcp.h +++ b/daemon/rtcp.h @@ -99,6 +99,6 @@ int rtcp_savp2avp(str *, struct crypto_context *); int rtcp_demux_is_rtcp(const str *); -void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *, const endpoint_t *); +void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *, const endpoint_t *, const struct timeval *); #endif diff --git a/daemon/socket.c b/daemon/socket.c index 15f942f6f..951195e8b 100644 --- a/daemon/socket.c +++ b/daemon/socket.c @@ -20,6 +20,7 @@ static int __ip4_is_specified(const sockaddr_t *a); static int __ip6_is_specified(const sockaddr_t *a); static int __ip_bind(socket_t *s, unsigned int, const sockaddr_t *); static int __ip_connect(socket_t *s, const endpoint_t *); +static int __ip_timestamping(socket_t *s); static int __ip4_sockaddr2endpoint(endpoint_t *, const void *); static int __ip6_sockaddr2endpoint(endpoint_t *, const void *); static int __ip4_endpoint2sockaddr(void *, const endpoint_t *); @@ -27,6 +28,7 @@ static int __ip6_endpoint2sockaddr(void *, const endpoint_t *); static int __ip4_addrport2sockaddr(void *, const sockaddr_t *, unsigned int); static int __ip6_addrport2sockaddr(void *, const sockaddr_t *, unsigned int); static ssize_t __ip_recvfrom(socket_t *s, void *buf, size_t len, endpoint_t *ep); +static ssize_t __ip_recvfrom_ts(socket_t *s, void *buf, size_t len, endpoint_t *ep, struct timeval *); static ssize_t __ip_sendmsg(socket_t *s, struct msghdr *mh, const endpoint_t *ep); static ssize_t __ip_sendto(socket_t *s, const void *buf, size_t len, const endpoint_t *ep); static int __ip4_tos(socket_t *, unsigned int); @@ -64,7 +66,9 @@ static struct socket_family __socket_families[__SF_LAST] = { .addrport2sockaddr = __ip4_addrport2sockaddr, .bind = __ip_bind, .connect = __ip_connect, + .timestamping = __ip_timestamping, .recvfrom = __ip_recvfrom, + .recvfrom_ts = __ip_recvfrom_ts, .sendmsg = __ip_sendmsg, .sendto = __ip_sendto, .tos = __ip4_tos, @@ -89,7 +93,9 @@ static struct socket_family __socket_families[__SF_LAST] = { .addrport2sockaddr = __ip6_addrport2sockaddr, .bind = __ip_bind, .connect = __ip_connect, + .timestamping = __ip_timestamping, .recvfrom = __ip_recvfrom, + .recvfrom_ts = __ip_recvfrom_ts, .sendmsg = __ip_sendmsg, .sendto = __ip_sendto, .tos = __ip6_tos, @@ -236,18 +242,52 @@ static int __ip_connect(socket_t *s, const endpoint_t *ep) { } return 0; } -static ssize_t __ip_recvfrom(socket_t *s, void *buf, size_t len, endpoint_t *ep) { +static ssize_t __ip_recvfrom_ts(socket_t *s, void *buf, size_t len, endpoint_t *ep, struct timeval *tv) { ssize_t ret; struct sockaddr_storage sin; - socklen_t sinlen; - - sinlen = s->family->sockaddr_size; - ret = recvfrom(s->fd, buf, len, 0, (void *) &sin, &sinlen); + struct msghdr msg; + struct iovec iov; + char ctrl[32]; + struct cmsghdr *cm; + + ZERO(msg); + msg.msg_name = &sin; + msg.msg_namelen = s->family->sockaddr_size; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = ctrl; + msg.msg_controllen = sizeof(ctrl); + ZERO(iov); + iov.iov_base = buf; + iov.iov_len = len; + + ret = recvmsg(s->fd, &msg, 0); if (ret < 0) return ret; s->family->sockaddr2endpoint(ep, &sin); + + if (tv) { + for (cm = CMSG_FIRSTHDR(&msg); cm; cm = CMSG_NXTHDR(&msg, cm)) { + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { + *tv = *((struct timeval *) CMSG_DATA(cm)); + tv = NULL; + } + } + if (G_UNLIKELY(tv)) { + ilog(LOG_WARNING, "No receive timestamp received from kernel"); + ZERO(*tv); + } + } + if (G_UNLIKELY((msg.msg_flags & MSG_TRUNC))) + ilog(LOG_WARNING, "Kernel indicates that data was truncated"); + if (G_UNLIKELY((msg.msg_flags & MSG_CTRUNC))) + ilog(LOG_WARNING, "Kernel indicates that ancillary data was truncated"); + return ret; } +static ssize_t __ip_recvfrom(socket_t *s, void *buf, size_t len, endpoint_t *ep) { + return __ip_recvfrom_ts(s, buf, len, ep, NULL); +} static ssize_t __ip_sendmsg(socket_t *s, struct msghdr *mh, const endpoint_t *ep) { struct sockaddr_storage sin; @@ -280,6 +320,12 @@ static int __ip_error(socket_t *s) { return -1; return optval; } +static int __ip_timestamping(socket_t *s) { + int one = 1; + if (setsockopt(s->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one))) + return -1; + return 0; +} static void __ip4_endpoint2kernel(struct re_address *ra, const endpoint_t *ep) { ZERO(*ra); ra->family = AF_INET; diff --git a/daemon/socket.h b/daemon/socket.h index ca2c856e8..aff67bb63 100644 --- a/daemon/socket.h +++ b/daemon/socket.h @@ -59,7 +59,9 @@ struct socket_family { int (*addrport2sockaddr)(void *, const sockaddr_t *, unsigned int); int (*bind)(socket_t *, unsigned int, const sockaddr_t *); int (*connect)(socket_t *, const endpoint_t *); + int (*timestamping)(socket_t *); ssize_t (*recvfrom)(socket_t *, void *, size_t, endpoint_t *); + ssize_t (*recvfrom_ts)(socket_t *, void *, size_t, endpoint_t *, struct timeval *); ssize_t (*sendmsg)(socket_t *, struct msghdr *, const endpoint_t *); ssize_t (*sendto)(socket_t *, const void *, size_t, const endpoint_t *); int (*tos)(socket_t *, unsigned int); @@ -145,9 +147,11 @@ INLINE int is_addr_unspecified(const sockaddr_t *a) { return !a->family->is_specified(a); } #define socket_recvfrom(s,a...) (s)->family->recvfrom((s), a) +#define socket_recvfrom_ts(s,a...) (s)->family->recvfrom_ts((s), a) #define socket_sendmsg(s,a...) (s)->family->sendmsg((s), a) #define socket_sendto(s,a...) (s)->family->sendto((s), a) #define socket_error(s) (s)->family->error((s)) +#define socket_timestamping(s) (s)->family->timestamping((s)) INLINE ssize_t socket_sendiov(socket_t *s, const struct iovec *v, unsigned int len, const endpoint_t *dst) { struct msghdr mh; ZERO(mh);