Browse Source

MT#18599 use kernel socket receive timestamping

Change-Id: I6fde08be7942df7986eea39d2b233ce60d1fa862
changes/48/5548/1
Richard Fuchs 10 years ago
parent
commit
29545e5434
7 changed files with 78 additions and 22 deletions
  1. +9
    -8
      daemon/homer.c
  2. +2
    -1
      daemon/homer.h
  3. +7
    -5
      daemon/media_socket.c
  4. +4
    -2
      daemon/rtcp.c
  5. +1
    -1
      daemon/rtcp.h
  6. +51
    -5
      daemon/socket.c
  7. +4
    -0
      daemon/socket.h

+ 9
- 8
daemon/homer.c View File

@ -39,7 +39,8 @@ struct homer_sender {
static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst);
static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst,
const struct timeval *);
// state handlers
static int __established(struct homer_sender *hs);
@ -198,7 +199,7 @@ struct homer_sender *homer_sender_new(const endpoint_t *ep, int protocol, int ca
// takes over the GString
int homer_send(struct homer_sender *hs, GString *s, const str *id, const endpoint_t *src,
const endpoint_t *dst)
const endpoint_t *dst, const struct timeval *tv)
{
if (!hs)
goto out;
@ -209,7 +210,7 @@ int homer_send(struct homer_sender *hs, GString *s, const str *id, const endpoin
ilog(LOG_DEBUG, "JSON to send to Homer: '"STR_FORMAT"'", G_STR_FMT(s));
if (send_hepv3(s, id, hs->capture_id, src, dst))
if (send_hepv3(s, id, hs->capture_id, src, dst, tv))
goto out;
mutex_lock(&hs->lock);
@ -316,7 +317,9 @@ typedef struct hep_generic hep_generic_t;
#define PROTO_RTCP_JSON 0x05
// modifies the GString in place
static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst) {
static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst,
const struct timeval *tv)
{
struct hep_generic *hg=NULL;
void* buffer;
@ -327,7 +330,6 @@ static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t
//hep_chunk_t authkey_chunk;
hep_chunk_t correlation_chunk;
//static int errors = 0;
struct timeval now;
hg = malloc(sizeof(struct hep_generic));
memset(hg, 0, sizeof(struct hep_generic));
@ -395,18 +397,17 @@ static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t
hg->dst_port.chunk.length = htons(sizeof(hg->dst_port));
gettimeofday(&now, NULL); // XXX replace with timestamp from actual packet
/* TIMESTAMP SEC */
hg->time_sec.chunk.vendor_id = htons(0x0000);
hg->time_sec.chunk.type_id = htons(0x0009);
hg->time_sec.data = htonl(now.tv_sec);
hg->time_sec.data = htonl(tv->tv_sec);
hg->time_sec.chunk.length = htons(sizeof(hg->time_sec));
/* TIMESTAMP USEC */
hg->time_usec.chunk.vendor_id = htons(0x0000);
hg->time_usec.chunk.type_id = htons(0x000a);
hg->time_usec.data = htonl(now.tv_usec);
hg->time_usec.data = htonl(tv->tv_usec);
hg->time_usec.chunk.length = htons(sizeof(hg->time_usec));
/* Protocol TYPE */


+ 2
- 1
daemon/homer.h View File

@ -8,7 +8,8 @@ struct homer_sender;
struct homer_sender *homer_sender_new(const endpoint_t *, int, int);
int homer_send(struct homer_sender *, GString *, const str *, const endpoint_t *, const endpoint_t *);
int homer_send(struct homer_sender *, GString *, const str *, const endpoint_t *, const endpoint_t *,
const struct timeval *tv);
#endif

+ 7
- 5
daemon/media_socket.c View File

@ -537,7 +537,7 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc
static int get_port6(socket_t *r, unsigned int port, struct intf_spec *spec) {
if (open_socket(r, SOCK_DGRAM, port, &spec->local_address.addr))
return -1;
socket_timestamping(r);
return 0;
}
@ -1013,7 +1013,7 @@ noop:
/* XXX split this function into pieces */
/* called lock-free */
static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin) {
static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) {
struct packet_stream *stream,
*sink = NULL,
*in_srtp, *out_srtp;
@ -1174,7 +1174,7 @@ loop_ok:
handler_ret = rwf_in(s, in_srtp);
if (handler_ret >= 0) {
if (rtcp)
parse_and_log_rtcp_report(sfd, s, fsin);
parse_and_log_rtcp_report(sfd, s, fsin, tv);
if (rwf_out)
handler_ret += rwf_out(s, out_srtp);
}
@ -1358,6 +1358,7 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
struct call *ca;
str s;
endpoint_t ep;
struct timeval tv;
if (sfd->socket.fd != fd)
goto out;
@ -1373,7 +1374,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
}
#endif
ret = socket_recvfrom(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, &ep);
ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE,
&ep, &tv);
if (ret < 0) {
if (errno == EINTR)
@ -1387,7 +1389,7 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
ilog(LOG_WARNING, "UDP packet possibly truncated");
str_init_len(&s, buf + RTP_BUFFER_HEAD_ROOM, ret);
ret = stream_packet(sfd, &s, &ep);
ret = stream_packet(sfd, &s, &ep, &tv);
if (ret < 0) {
ilog(LOG_WARNING, "Write error on RTP socket: %s", strerror(-ret));
call_destroy(sfd->call);


+ 4
- 2
daemon/rtcp.c View File

@ -605,7 +605,9 @@ static void print_rtcp_list_end(GString *json) {
}
}
void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *ori_s, const endpoint_t *src) {
void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *ori_s, const endpoint_t *src,
const struct timeval *tv)
{
GString *log;
str iter_s, comp_s;
@ -728,7 +730,7 @@ void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *ori_s, const en
if (json) {
str_sanitize(json);
g_string_append(json, " }");
homer_send(cm->homer, json, &c->callid, src, &sfd->socket.local);
homer_send(cm->homer, json, &c->callid, src, &sfd->socket.local, tv);
json = NULL;
}


+ 1
- 1
daemon/rtcp.h View File

@ -99,6 +99,6 @@ int rtcp_savp2avp(str *, struct crypto_context *);
int rtcp_demux_is_rtcp(const str *);
void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *, const endpoint_t *);
void parse_and_log_rtcp_report(struct stream_fd *sfd, const str *, const endpoint_t *, const struct timeval *);
#endif

+ 51
- 5
daemon/socket.c View File

@ -20,6 +20,7 @@ static int __ip4_is_specified(const sockaddr_t *a);
static int __ip6_is_specified(const sockaddr_t *a);
static int __ip_bind(socket_t *s, unsigned int, const sockaddr_t *);
static int __ip_connect(socket_t *s, const endpoint_t *);
static int __ip_timestamping(socket_t *s);
static int __ip4_sockaddr2endpoint(endpoint_t *, const void *);
static int __ip6_sockaddr2endpoint(endpoint_t *, const void *);
static int __ip4_endpoint2sockaddr(void *, const endpoint_t *);
@ -27,6 +28,7 @@ static int __ip6_endpoint2sockaddr(void *, const endpoint_t *);
static int __ip4_addrport2sockaddr(void *, const sockaddr_t *, unsigned int);
static int __ip6_addrport2sockaddr(void *, const sockaddr_t *, unsigned int);
static ssize_t __ip_recvfrom(socket_t *s, void *buf, size_t len, endpoint_t *ep);
static ssize_t __ip_recvfrom_ts(socket_t *s, void *buf, size_t len, endpoint_t *ep, struct timeval *);
static ssize_t __ip_sendmsg(socket_t *s, struct msghdr *mh, const endpoint_t *ep);
static ssize_t __ip_sendto(socket_t *s, const void *buf, size_t len, const endpoint_t *ep);
static int __ip4_tos(socket_t *, unsigned int);
@ -64,7 +66,9 @@ static struct socket_family __socket_families[__SF_LAST] = {
.addrport2sockaddr = __ip4_addrport2sockaddr,
.bind = __ip_bind,
.connect = __ip_connect,
.timestamping = __ip_timestamping,
.recvfrom = __ip_recvfrom,
.recvfrom_ts = __ip_recvfrom_ts,
.sendmsg = __ip_sendmsg,
.sendto = __ip_sendto,
.tos = __ip4_tos,
@ -89,7 +93,9 @@ static struct socket_family __socket_families[__SF_LAST] = {
.addrport2sockaddr = __ip6_addrport2sockaddr,
.bind = __ip_bind,
.connect = __ip_connect,
.timestamping = __ip_timestamping,
.recvfrom = __ip_recvfrom,
.recvfrom_ts = __ip_recvfrom_ts,
.sendmsg = __ip_sendmsg,
.sendto = __ip_sendto,
.tos = __ip6_tos,
@ -236,18 +242,52 @@ static int __ip_connect(socket_t *s, const endpoint_t *ep) {
}
return 0;
}
static ssize_t __ip_recvfrom(socket_t *s, void *buf, size_t len, endpoint_t *ep) {
static ssize_t __ip_recvfrom_ts(socket_t *s, void *buf, size_t len, endpoint_t *ep, struct timeval *tv) {
ssize_t ret;
struct sockaddr_storage sin;
socklen_t sinlen;
sinlen = s->family->sockaddr_size;
ret = recvfrom(s->fd, buf, len, 0, (void *) &sin, &sinlen);
struct msghdr msg;
struct iovec iov;
char ctrl[32];
struct cmsghdr *cm;
ZERO(msg);
msg.msg_name = &sin;
msg.msg_namelen = s->family->sockaddr_size;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = ctrl;
msg.msg_controllen = sizeof(ctrl);
ZERO(iov);
iov.iov_base = buf;
iov.iov_len = len;
ret = recvmsg(s->fd, &msg, 0);
if (ret < 0)
return ret;
s->family->sockaddr2endpoint(ep, &sin);
if (tv) {
for (cm = CMSG_FIRSTHDR(&msg); cm; cm = CMSG_NXTHDR(&msg, cm)) {
if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) {
*tv = *((struct timeval *) CMSG_DATA(cm));
tv = NULL;
}
}
if (G_UNLIKELY(tv)) {
ilog(LOG_WARNING, "No receive timestamp received from kernel");
ZERO(*tv);
}
}
if (G_UNLIKELY((msg.msg_flags & MSG_TRUNC)))
ilog(LOG_WARNING, "Kernel indicates that data was truncated");
if (G_UNLIKELY((msg.msg_flags & MSG_CTRUNC)))
ilog(LOG_WARNING, "Kernel indicates that ancillary data was truncated");
return ret;
}
static ssize_t __ip_recvfrom(socket_t *s, void *buf, size_t len, endpoint_t *ep) {
return __ip_recvfrom_ts(s, buf, len, ep, NULL);
}
static ssize_t __ip_sendmsg(socket_t *s, struct msghdr *mh, const endpoint_t *ep) {
struct sockaddr_storage sin;
@ -280,6 +320,12 @@ static int __ip_error(socket_t *s) {
return -1;
return optval;
}
static int __ip_timestamping(socket_t *s) {
int one = 1;
if (setsockopt(s->fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)))
return -1;
return 0;
}
static void __ip4_endpoint2kernel(struct re_address *ra, const endpoint_t *ep) {
ZERO(*ra);
ra->family = AF_INET;


+ 4
- 0
daemon/socket.h View File

@ -59,7 +59,9 @@ struct socket_family {
int (*addrport2sockaddr)(void *, const sockaddr_t *, unsigned int);
int (*bind)(socket_t *, unsigned int, const sockaddr_t *);
int (*connect)(socket_t *, const endpoint_t *);
int (*timestamping)(socket_t *);
ssize_t (*recvfrom)(socket_t *, void *, size_t, endpoint_t *);
ssize_t (*recvfrom_ts)(socket_t *, void *, size_t, endpoint_t *, struct timeval *);
ssize_t (*sendmsg)(socket_t *, struct msghdr *, const endpoint_t *);
ssize_t (*sendto)(socket_t *, const void *, size_t, const endpoint_t *);
int (*tos)(socket_t *, unsigned int);
@ -145,9 +147,11 @@ INLINE int is_addr_unspecified(const sockaddr_t *a) {
return !a->family->is_specified(a);
}
#define socket_recvfrom(s,a...) (s)->family->recvfrom((s), a)
#define socket_recvfrom_ts(s,a...) (s)->family->recvfrom_ts((s), a)
#define socket_sendmsg(s,a...) (s)->family->sendmsg((s), a)
#define socket_sendto(s,a...) (s)->family->sendto((s), a)
#define socket_error(s) (s)->family->error((s))
#define socket_timestamping(s) (s)->family->timestamping((s))
INLINE ssize_t socket_sendiov(socket_t *s, const struct iovec *v, unsigned int len, const endpoint_t *dst) {
struct msghdr mh;
ZERO(mh);


Loading…
Cancel
Save