From eb5de58dfcd8af970414d14f193d46424eed61d1 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 9 Mar 2015 08:31:29 -0400 Subject: [PATCH] further split up call.c and rework socket handling --- daemon/call.c | 1175 +++----------------------------------- daemon/call.h | 15 +- daemon/call_interfaces.c | 8 +- daemon/cli.c | 2 +- daemon/dtls.c | 26 +- daemon/ice.c | 29 +- daemon/media_socket.c | 1095 ++++++++++++++++++++++++++++++++++- daemon/media_socket.h | 20 +- daemon/sdp.c | 28 +- daemon/stun.c | 4 +- 10 files changed, 1250 insertions(+), 1152 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 85ae62842..0e879c83e 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -51,7 +51,6 @@ -typedef int (*rewrite_func)(str *, struct packet_stream *); /* also serves as array index for callstream->peers[] */ struct iterator_helper { @@ -65,16 +64,6 @@ struct xmlrpc_helper { GSList *tags_urls; }; -struct streamhandler_io { - rewrite_func rtp; - rewrite_func rtcp; - int (*kernel)(struct rtpengine_srtp *, struct packet_stream *); -}; -struct streamhandler { - const struct streamhandler_io *in; - const struct streamhandler_io *out; -}; - const struct transport_protocol transport_protocols[] = { [PROTO_RTP_AVP] = { .index = PROTO_RTP_AVP, @@ -146,896 +135,26 @@ const char * get_tag_type_text(enum tag_type t) { return get_enum_array_text(__tag_type_texts, t, "UNKNOWN"); } -static void determine_handler(struct packet_stream *in, const struct packet_stream *out); -static void __call_media_state_machine(struct call_media *m); - -static int __k_null(struct rtpengine_srtp *s, struct packet_stream *); -static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *); -static int __k_srtp_decrypt(struct rtpengine_srtp *s, struct packet_stream *); - -static int call_avp2savp_rtp(str *s, struct packet_stream *); -static int call_savp2avp_rtp(str *s, struct packet_stream *); -static int call_avp2savp_rtcp(str *s, struct packet_stream *); -static int call_savp2avp_rtcp(str *s, struct packet_stream *); -static int call_avpf2avp_rtcp(str *s, struct packet_stream *); -//static int call_avpf2savp_rtcp(str *s, struct packet_stream *); -static int call_savpf2avp_rtcp(str *s, struct packet_stream *); -//static int call_savpf2savp_rtcp(str *s, struct packet_stream *); - -/* ********** */ - -static const struct streamhandler_io __shio_noop = { - .kernel = __k_null, -}; -static const struct streamhandler_io __shio_decrypt = { - .kernel = __k_srtp_decrypt, - .rtp = call_savp2avp_rtp, - .rtcp = call_savp2avp_rtcp, -}; -static const struct streamhandler_io __shio_encrypt = { - .kernel = __k_srtp_encrypt, - .rtp = call_avp2savp_rtp, - .rtcp = call_avp2savp_rtcp, -}; -static const struct streamhandler_io __shio_avpf_strip = { - .kernel = __k_null, - .rtcp = call_avpf2avp_rtcp, -}; -static const struct streamhandler_io __shio_decrypt_avpf_strip = { - .kernel = __k_srtp_decrypt, - .rtp = call_savp2avp_rtp, - .rtcp = call_savpf2avp_rtcp, -}; - -/* ********** */ - -static const struct streamhandler __sh_noop = { - .in = &__shio_noop, - .out = &__shio_noop, -}; -static const struct streamhandler __sh_savp2avp = { - .in = &__shio_decrypt, - .out = &__shio_noop, -}; -static const struct streamhandler __sh_avp2savp = { - .in = &__shio_noop, - .out = &__shio_encrypt, -}; -static const struct streamhandler __sh_avpf2avp = { - .in = &__shio_avpf_strip, - .out = &__shio_noop, -}; -static const struct streamhandler __sh_avpf2savp = { - .in = &__shio_avpf_strip, - .out = &__shio_encrypt, -}; -static const struct streamhandler __sh_savpf2avp = { - .in = &__shio_decrypt_avpf_strip, - .out = &__shio_noop, -}; -static const struct streamhandler __sh_savp2savp = { - .in = &__shio_decrypt, - .out = &__shio_encrypt, -}; -static const struct streamhandler __sh_savpf2savp = { - .in = &__shio_decrypt_avpf_strip, - .out = &__shio_encrypt, -}; - -/* ********** */ - -static const struct streamhandler *__sh_matrix_in_rtp_avp[] = { - [PROTO_RTP_AVP] = &__sh_noop, - [PROTO_RTP_AVPF] = &__sh_noop, - [PROTO_RTP_SAVP] = &__sh_avp2savp, - [PROTO_RTP_SAVPF] = &__sh_avp2savp, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_avp2savp, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_avp2savp, - [PROTO_UDPTL] = &__sh_noop, -}; -static const struct streamhandler *__sh_matrix_in_rtp_avpf[] = { - [PROTO_RTP_AVP] = &__sh_avpf2avp, - [PROTO_RTP_AVPF] = &__sh_noop, - [PROTO_RTP_SAVP] = &__sh_avpf2savp, - [PROTO_RTP_SAVPF] = &__sh_avp2savp, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_avpf2savp, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_avp2savp, - [PROTO_UDPTL] = &__sh_noop, -}; -static const struct streamhandler *__sh_matrix_in_rtp_savp[] = { - [PROTO_RTP_AVP] = &__sh_savp2avp, - [PROTO_RTP_AVPF] = &__sh_savp2avp, - [PROTO_RTP_SAVP] = &__sh_noop, - [PROTO_RTP_SAVPF] = &__sh_noop, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_noop, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_noop, - [PROTO_UDPTL] = &__sh_noop, -}; -static const struct streamhandler *__sh_matrix_in_rtp_savpf[] = { - [PROTO_RTP_AVP] = &__sh_savpf2avp, - [PROTO_RTP_AVPF] = &__sh_savp2avp, - [PROTO_RTP_SAVP] = &__sh_savpf2savp, - [PROTO_RTP_SAVPF] = &__sh_noop, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_savpf2savp, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_noop, - [PROTO_UDPTL] = &__sh_noop, -}; -static const struct streamhandler *__sh_matrix_in_rtp_savp_recrypt[] = { - [PROTO_RTP_AVP] = &__sh_savp2avp, - [PROTO_RTP_AVPF] = &__sh_savp2avp, - [PROTO_RTP_SAVP] = &__sh_savp2savp, - [PROTO_RTP_SAVPF] = &__sh_savp2savp, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_savp2savp, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_savp2savp, - [PROTO_UDPTL] = &__sh_noop, -}; -static const struct streamhandler *__sh_matrix_in_rtp_savpf_recrypt[] = { - [PROTO_RTP_AVP] = &__sh_savpf2avp, - [PROTO_RTP_AVPF] = &__sh_savp2avp, - [PROTO_RTP_SAVP] = &__sh_savpf2savp, - [PROTO_RTP_SAVPF] = &__sh_savp2savp, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_savpf2savp, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_savp2savp, - [PROTO_UDPTL] = &__sh_noop, -}; -static const struct streamhandler *__sh_matrix_noop[] = { - [PROTO_RTP_AVP] = &__sh_noop, - [PROTO_RTP_AVPF] = &__sh_noop, - [PROTO_RTP_SAVP] = &__sh_noop, - [PROTO_RTP_SAVPF] = &__sh_noop, - [PROTO_UDP_TLS_RTP_SAVP] = &__sh_noop, - [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_noop, - [PROTO_UDPTL] = &__sh_noop, -}; - -/* ********** */ - -static const struct streamhandler **__sh_matrix[] = { - [PROTO_RTP_AVP] = __sh_matrix_in_rtp_avp, - [PROTO_RTP_AVPF] = __sh_matrix_in_rtp_avpf, - [PROTO_RTP_SAVP] = __sh_matrix_in_rtp_savp, - [PROTO_RTP_SAVPF] = __sh_matrix_in_rtp_savpf, - [PROTO_UDP_TLS_RTP_SAVP] = __sh_matrix_in_rtp_savp, - [PROTO_UDP_TLS_RTP_SAVPF] = __sh_matrix_in_rtp_savpf, - [PROTO_UDPTL] = __sh_matrix_noop, -}; -/* special case for DTLS as we can't pass through SRTP<>SRTP */ -static const struct streamhandler **__sh_matrix_recrypt[] = { - [PROTO_RTP_AVP] = __sh_matrix_in_rtp_avp, - [PROTO_RTP_AVPF] = __sh_matrix_in_rtp_avpf, - [PROTO_RTP_SAVP] = __sh_matrix_in_rtp_savp_recrypt, - [PROTO_RTP_SAVPF] = __sh_matrix_in_rtp_savpf_recrypt, - [PROTO_UDP_TLS_RTP_SAVP] = __sh_matrix_in_rtp_savp_recrypt, - [PROTO_UDP_TLS_RTP_SAVPF] = __sh_matrix_in_rtp_savpf_recrypt, - [PROTO_UDPTL] = __sh_matrix_noop, -}; /* ********** */ -static const struct rtpengine_srtp __res_null = { - .cipher = REC_NULL, - .hmac = REH_NULL, -}; - - - -static void __unkernelize(struct packet_stream *); -static void __stream_unconfirm(struct packet_stream *ps); -static void stream_unconfirm(struct packet_stream *ps); static void __monologue_destroy(struct call_monologue *monologue); static int monologue_destroy(struct call_monologue *ml); -/* called lock-free */ -static void stream_fd_closed(int fd, void *p, uintptr_t u) { - struct stream_fd *sfd = p; - struct call *c; - int i; - socklen_t j; - - assert(sfd->socket.fd == fd); - c = sfd->call; - if (!c) - return; - - j = sizeof(i); - getsockopt(fd, SOL_SOCKET, SO_ERROR, &i, &j); - ilog(LOG_WARNING, "Read error on media socket: %i (%s) -- closing call", i, strerror(i)); - - call_destroy(c); -} - - - - -/* XXX unify this */ -INLINE void __re_address_translate(struct re_address *o, const sockaddr_t *address) { - o->family = address->family->af; - if (o->family == AF_INET) - o->u.ipv4 = address->u.ipv4.s_addr; - else - memcpy(o->u.ipv6, &address->u.ipv6, sizeof(o->u.ipv6)); -} -INLINE void __re_address_translate_ep(struct re_address *o, const endpoint_t *ep) { - __re_address_translate(o, &ep->address); - o->port = ep->port; -} - -static int __rtp_stats_pt_sort(const void *ap, const void *bp) { - const struct rtp_stats *a = ap, *b = bp; - - if (a->payload_type < b->payload_type) - return -1; - if (a->payload_type > b->payload_type) - return 1; - return 0; -} - -/* called with in_lock held */ -void kernelize(struct packet_stream *stream) { - struct rtpengine_target_info reti; - struct call *call = stream->call; - struct callmaster *cm = call->callmaster; - struct packet_stream *sink = NULL; - struct local_intf *ifa; - const char *nk_warn_msg; - - if (PS_ISSET(stream, KERNELIZED)) - return; - if (cm->conf.kernelid < 0) - goto no_kernel; - nk_warn_msg = "interface to kernel module not open"; - if (cm->conf.kernelfd < 0) - goto no_kernel_warn; - if (!PS_ISSET(stream, RTP)) - goto no_kernel; - if (!stream->sfd) - goto no_kernel; - - ilog(LOG_INFO, "Kernelizing media stream"); - - sink = packet_stream_sink(stream); - if (!sink) { - ilog(LOG_WARNING, "Attempt to kernelize stream without sink"); - goto no_kernel; - } - - determine_handler(stream, sink); - - if (is_addr_unspecified(&sink->advertised_endpoint.address) - || !sink->advertised_endpoint.port) - goto no_kernel; - nk_warn_msg = "protocol not supported by kernel module"; - if (!stream->handler->in->kernel - || !stream->handler->out->kernel) - goto no_kernel_warn; - - ZERO(reti); - - if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { - mutex_lock(&stream->out_lock); - __re_address_translate_ep(&reti.expected_src, &stream->endpoint); - mutex_unlock(&stream->out_lock); - if (PS_ISSET(stream, STRICT_SOURCE)) - reti.src_mismatch = MSM_DROP; - else if (PS_ISSET(stream, MEDIA_HANDOVER)) - reti.src_mismatch = MSM_PROPAGATE; - } - - mutex_lock(&sink->out_lock); - - reti.target_port = stream->sfd->socket.local.port; - reti.tos = call->tos; - reti.rtcp_mux = MEDIA_ISSET(stream->media, RTCP_MUX); - reti.dtls = MEDIA_ISSET(stream->media, DTLS); - reti.stun = stream->media->ice_agent ? 1 : 0; - - ifa = g_atomic_pointer_get(&sink->media->local_intf); - __re_address_translate_ep(&reti.dst_addr, &sink->endpoint); - __re_address_translate(&reti.src_addr, &ifa->spec->address.addr); - reti.src_addr.port = sink->sfd->socket.local.port; - reti.ssrc = sink->crypto.ssrc; - - stream->handler->in->kernel(&reti.decrypt, stream); - stream->handler->out->kernel(&reti.encrypt, sink); - - mutex_unlock(&sink->out_lock); - - nk_warn_msg = "encryption cipher or HMAC not supported by kernel module"; - if (!reti.encrypt.cipher || !reti.encrypt.hmac) - goto no_kernel_warn; - nk_warn_msg = "decryption cipher or HMAC not supported by kernel module"; - if (!reti.decrypt.cipher || !reti.decrypt.hmac) - goto no_kernel_warn; - - ZERO(stream->kernel_stats); - - if (stream->media->protocol && stream->media->protocol->rtp) { - GList *values, *l; - struct rtp_stats *rs; - - reti.rtp = 1; - values = g_hash_table_get_values(stream->rtp_stats); - values = g_list_sort(values, __rtp_stats_pt_sort); - for (l = values; l; l = l->next) { - if (reti.num_payload_types >= G_N_ELEMENTS(reti.payload_types)) { - ilog(LOG_WARNING, "Too many RTP payload types for kernel module"); - break; - } - rs = l->data; - reti.payload_types[reti.num_payload_types++] = rs->payload_type; - } - } - - kernel_add_stream(cm->conf.kernelfd, &reti, 0); - PS_SET(stream, KERNELIZED); - - return; - -no_kernel_warn: - ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg); -no_kernel: - PS_SET(stream, KERNELIZED); - PS_SET(stream, NO_KERNEL_SUPPORT); -} - - - - -/* returns: 0 = not a muxed stream, 1 = muxed, RTP, 2 = muxed, RTCP */ -static int rtcp_demux(str *s, struct call_media *media) { - if (!MEDIA_ISSET(media, RTCP_MUX)) - return 0; - return rtcp_demux_is_rtcp(s) ? 2 : 1; -} - -static int call_avpf2avp_rtcp(str *s, struct packet_stream *stream) { - return rtcp_avpf2avp(s); -} -static int call_avp2savp_rtp(str *s, struct packet_stream *stream) { - return rtp_avp2savp(s, &stream->crypto); -} -static int call_avp2savp_rtcp(str *s, struct packet_stream *stream) { - return rtcp_avp2savp(s, &stream->crypto); -} -static int call_savp2avp_rtp(str *s, struct packet_stream *stream) { - return rtp_savp2avp(s, &stream->sfd->crypto); -} -static int call_savp2avp_rtcp(str *s, struct packet_stream *stream) { - return rtcp_savp2avp(s, &stream->sfd->crypto); -} -static int call_savpf2avp_rtcp(str *s, struct packet_stream *stream) { - int ret; - ret = rtcp_savp2avp(s, &stream->sfd->crypto); - if (ret < 0) - return ret; - return rtcp_avpf2avp(s); -} - - -static int __k_null(struct rtpengine_srtp *s, struct packet_stream *stream) { - *s = __res_null; - return 0; -} -static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c) { - if (!c->params.crypto_suite) - return -1; - - *s = (struct rtpengine_srtp) { - .cipher = c->params.crypto_suite->kernel_cipher, - .hmac = c->params.crypto_suite->kernel_hmac, - .mki_len = c->params.mki_len, - .last_index = c->last_index, - .auth_tag_len = c->params.crypto_suite->srtp_auth_tag, - }; - if (c->params.mki_len) - memcpy(s->mki, c->params.mki, c->params.mki_len); - memcpy(s->master_key, c->params.master_key, c->params.crypto_suite->master_key_len); - memcpy(s->master_salt, c->params.master_salt, c->params.crypto_suite->master_salt_len); - - if (c->params.session_params.unencrypted_srtp) - s->cipher = REC_NULL; - if (c->params.session_params.unauthenticated_srtp) - s->auth_tag_len = 0; - - return 0; -} -static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *stream) { - return __k_srtp_crypt(s, &stream->crypto); -} -static int __k_srtp_decrypt(struct rtpengine_srtp *s, struct packet_stream *stream) { - return __k_srtp_crypt(s, &stream->sfd->crypto); -} - -/* must be called with call->master_lock held in R, and in->in_lock held */ -static void determine_handler(struct packet_stream *in, const struct packet_stream *out) { - const struct streamhandler **sh_pp, *sh; - const struct streamhandler ***matrix; - - if (in->handler) - return; - if (MEDIA_ISSET(in->media, PASSTHRU)) - goto noop; - - if (!in->media->protocol) - goto err; - if (!out->media->protocol) - goto err; - - matrix = __sh_matrix; - if (MEDIA_ISSET(in->media, DTLS) || MEDIA_ISSET(out->media, DTLS)) - matrix = __sh_matrix_recrypt; - else if (in->media->protocol->srtp && out->media->protocol->srtp - && in->sfd && out->sfd - && (crypto_params_cmp(&in->crypto.params, &out->sfd->crypto.params) - || crypto_params_cmp(&out->crypto.params, &in->sfd->crypto.params))) - matrix = __sh_matrix_recrypt; - - sh_pp = matrix[in->media->protocol->index]; - if (!sh_pp) - goto err; - sh = sh_pp[out->media->protocol->index]; - if (!sh) - goto err; - in->handler = sh; - - return; - -err: - ilog(LOG_WARNING, "Unknown transport protocol encountered"); -noop: - in->handler = &__sh_noop; - return; -} void stream_msg_mh_src(struct packet_stream *ps, struct msghdr *mh) { - struct local_intf *ifa; + const struct local_intf *ifa; - ifa = g_atomic_pointer_get(&ps->media->local_intf); + ifa = ps->selected_sfd->local_intf; msg_mh_src(&ifa->spec->address.addr, mh); } -/* XXX split this function into pieces */ -/* called lock-free */ -static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const sockaddr_t *dst) { - struct packet_stream *stream, - *sink = NULL, - *in_srtp, *out_srtp; - struct call_media *media; - int ret = 0, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0, rtcp = 0, - unk = 0; - int i; - struct msghdr mh; - struct iovec iov; - unsigned char buf[256]; - struct call *call; - struct callmaster *cm; - /*unsigned char cc;*/ - char *addr; - struct endpoint endpoint; - rewrite_func rwf_in, rwf_out; - struct local_intf *loc_addr; - struct rtp_header *rtp_h; - struct rtp_stats *rtp_s; - - call = sfd->call; - cm = call->callmaster; - addr = endpoint_print_buf(fsin); - - rwlock_lock_r(&call->master_lock); - - stream = sfd->stream; - if (!stream) - goto unlock_out; - - - media = stream->media; - - if (!stream->sfd) - goto unlock_out; - - - /* demux other protocols running on this port */ - - if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) { - mutex_lock(&stream->in_lock); - ret = dtls(stream, s, fsin); - mutex_unlock(&stream->in_lock); - if (!ret) - goto unlock_out; - } - - if (media->ice_agent && is_stun(s)) { - stun_ret = stun(s, stream, fsin, dst); - if (!stun_ret) - goto unlock_out; - if (stun_ret == 1) { - __call_media_state_machine(media); - mutex_lock(&stream->in_lock); /* for the jump */ - goto kernel_check; - } - else /* not an stun packet */ - stun_ret = 0; - } - -#if RTP_LOOP_PROTECT - mutex_lock(&stream->in_lock); - - for (i = 0; i < RTP_LOOP_PACKETS; i++) { - if (stream->lp_buf[i].len != s->len) - continue; - if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT))) - continue; - - __C_DBG("packet dupe"); - if (stream->lp_count >= RTP_LOOP_MAX_COUNT) { - ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " - "to avoid potential loop", RTP_LOOP_MAX_COUNT); - goto done; - } - - stream->lp_count++; - goto loop_ok; - } - - /* not a dupe */ - stream->lp_count = 0; - stream->lp_buf[stream->lp_idx].len = s->len; - memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)); - stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS; -loop_ok: - mutex_unlock(&stream->in_lock); -#endif - - - /* demux RTCP */ - - in_srtp = stream; - sink = stream->rtp_sink; - if (!sink && PS_ISSET(stream, RTCP)) { - sink = stream->rtcp_sink; - rtcp = 1; - } - else if (stream->rtcp_sink) { - muxed_rtcp = rtcp_demux(s, media); - if (muxed_rtcp == 2) { - sink = stream->rtcp_sink; - rtcp = 1; - in_srtp = stream->rtcp_sibling; - } - } - out_srtp = sink; - if (rtcp && sink && sink->rtcp_sibling) - out_srtp = sink->rtcp_sibling; - - - /* stats per RTP payload type */ - - if (media->protocol && media->protocol->rtp && !rtcp && !rtp_payload(&rtp_h, NULL, s)) { - i = (rtp_h->m_pt & 0x7f); - - rtp_s = g_hash_table_lookup(stream->rtp_stats, &i); - if (!rtp_s) { - ilog(LOG_WARNING | LOG_FLAG_LIMIT, - "RTP packet with unknown payload type %u received", i); - atomic64_inc(&stream->stats.errors); - atomic64_inc(&cm->statsps.errors); - } - - else { - atomic64_inc(&rtp_s->packets); - atomic64_add(&rtp_s->bytes, s->len); - } - } - - - /* do we have somewhere to forward it to? */ - - if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) { - ilog(LOG_WARNING, "RTP packet from %s discarded", addr); - atomic64_inc(&stream->stats.errors); - atomic64_inc(&cm->statsps.errors); - goto unlock_out; - } - - - /* transcoding stuff, in and out */ - - mutex_lock(&in_srtp->in_lock); - - determine_handler(in_srtp, sink); - - if (!rtcp) { - rwf_in = in_srtp->handler->in->rtp; - rwf_out = in_srtp->handler->out->rtp; - } - else { - rwf_in = in_srtp->handler->in->rtcp; - rwf_out = in_srtp->handler->out->rtcp; - } - - mutex_lock(&out_srtp->out_lock); - - /* return values are: 0 = forward packet, -1 = error/dont forward, - * 1 = forward and push update to redis and kernel */ - if (rwf_in) - handler_ret = rwf_in(s, in_srtp); - if (handler_ret >= 0) { - if (rtcp && _log_facility_rtcp) - parse_and_log_rtcp_report(sfd, s->s, s->len); - if (rwf_out) - handler_ret += rwf_out(s, out_srtp); - } - - if (handler_ret > 0) { - __unkernelize(stream); - update = 1; - } - - mutex_unlock(&out_srtp->out_lock); - mutex_unlock(&in_srtp->in_lock); - - - /* endpoint address handling */ - - mutex_lock(&stream->in_lock); - - /* we're OK to (potentially) use the source address of this packet as destination - * in the other direction. */ - /* if the other side hasn't been signalled yet, just forward the packet */ - if (!PS_ISSET(stream, FILLED)) - goto forward; - - /* do not pay attention to source addresses of incoming packets for asymmetric streams */ - if (MEDIA_ISSET(media, ASYMMETRIC)) - PS_SET(stream, CONFIRMED); - - /* if we have already updated the endpoint in the past ... */ - if (PS_ISSET(stream, CONFIRMED)) { - /* see if we need to compare the source address with the known endpoint */ - if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { - endpoint = *fsin; - mutex_lock(&stream->out_lock); - - int tmp = memcmp(&endpoint, &stream->endpoint, sizeof(endpoint)); - if (tmp && PS_ISSET(stream, MEDIA_HANDOVER)) { - /* out_lock remains locked */ - ilog(LOG_INFO, "Peer address changed to %s", addr); - unk = 1; - goto update_addr; - } - - mutex_unlock(&stream->out_lock); - - if (tmp && PS_ISSET(stream, STRICT_SOURCE)) { - atomic64_inc(&stream->stats.errors); - goto drop; - } - } - goto kernel_check; - } - - /* wait at least 3 seconds after last signal before committing to a particular - * endpoint address */ - if (!call->last_signal || poller_now <= call->last_signal + 3) - goto update_peerinfo; - - ilog(LOG_INFO, "Confirmed peer address as %s", addr); - - PS_SET(stream, CONFIRMED); - update = 1; - -update_peerinfo: - mutex_lock(&stream->out_lock); -update_addr: - endpoint = stream->endpoint; - stream->endpoint = *fsin; - if (memcmp(&endpoint, &stream->endpoint, sizeof(endpoint))) - update = 1; - mutex_unlock(&stream->out_lock); - - /* check the destination address of the received packet against what we think our - * local interface to use is */ - loc_addr = g_atomic_pointer_get(&media->local_intf); - if (dst && !sockaddr_eq(dst, &loc_addr->spec->address.addr)) { - // XXX restore this -// struct interface_address *ifa; -// ifa = get_interface_from_address(media->logical_intf, dst); -// if (!ifa) { -// ilog(LOG_ERROR, "No matching local interface for destination address %s found", -// smart_ntop_buf(dst)); -// goto drop; -// } -// if (g_atomic_pointer_compare_and_exchange(&media->local_address, loc_addr, ifa)) { -// ilog(LOG_INFO, "Switching local interface to %s", -// smart_ntop_buf(dst)); -// update = 1; -// } - } - - -kernel_check: - if (PS_ISSET(stream, NO_KERNEL_SUPPORT)) - goto forward; - - if (PS_ISSET(stream, CONFIRMED) && sink && PS_ARESET2(sink, CONFIRMED, FILLED)) - kernelize(stream); - -forward: - if (sink) - mutex_lock(&sink->out_lock); - - if (!sink - || !sink->advertised_endpoint.port - || (is_addr_unspecified(&sink->advertised_endpoint.address) - && !is_trickle_ice_address(&sink->advertised_endpoint)) - || stun_ret || handler_ret < 0) - goto drop; - - ZERO(mh); - mh.msg_control = buf; - mh.msg_controllen = sizeof(buf); - - mutex_unlock(&sink->out_lock); - - stream_msg_mh_src(sink, &mh); - - ZERO(iov); - iov.iov_base = s->s; - iov.iov_len = s->len; - - mh.msg_iov = &iov; - mh.msg_iovlen = 1; - - ret = socket_sendmsg(&sink->sfd->socket, &mh, &sink->endpoint); - - if (ret == -1) { - ret = 0; /* temp for address family mismatches */ - ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); - atomic64_inc(&stream->stats.errors); - atomic64_inc(&cm->statsps.errors); - goto out; - } - - sink = NULL; - -drop: - if (sink) - mutex_unlock(&sink->out_lock); - ret = 0; - atomic64_inc(&stream->stats.packets); - atomic64_add(&stream->stats.bytes, s->len); - atomic64_set(&stream->last_packet, poller_now); - atomic64_inc(&cm->statsps.packets); - atomic64_add(&cm->statsps.bytes, s->len); - -out: - if (ret == 0 && update) - ret = 1; - -done: - if (unk) - __stream_unconfirm(stream); - mutex_unlock(&stream->in_lock); - if (unk) { - stream_unconfirm(stream->rtp_sink); - stream_unconfirm(stream->rtcp_sink); - } -unlock_out: - rwlock_unlock_r(&call->master_lock); - - return ret; -} - - - - -static void stream_fd_readable(int fd, void *p, uintptr_t u) { - struct stream_fd *sfd = p; - char buf[RTP_BUFFER_SIZE]; - int ret, iters; - struct sockaddr_in6 sin6_src; - int update = 0; - struct call *ca; - str s; - struct msghdr mh; - struct iovec iov; - char control[128]; - struct cmsghdr *cmh; - struct in6_pktinfo *pi6; - struct in6_addr /*dst_buf,*/ *dst; - //struct in_pktinfo *pi; - endpoint_t ep; - sockaddr_t sa; - - if (sfd->socket.fd != fd) - goto out; - - log_info_stream_fd(sfd); - - for (iters = 0; ; iters++) { -#if MAX_RECV_ITERS - if (iters >= MAX_RECV_ITERS) { - ilog(LOG_ERROR, "Too many packets in UDP receive queue (more than %d), " - "aborting loop. Dropped packets possible", iters); - break; - } -#endif - - ZERO(mh); - mh.msg_name = &sin6_src; - mh.msg_namelen = sizeof(sin6_src); - mh.msg_iov = &iov; - mh.msg_iovlen = 1; - mh.msg_control = control; - mh.msg_controllen = sizeof(control); - iov.iov_base = buf + RTP_BUFFER_HEAD_ROOM; - iov.iov_len = MAX_RTP_PACKET_SIZE; - - ret = recvmsg(fd, &mh, 0); - - if (ret < 0) { - if (errno == EINTR) - continue; - if (errno == EAGAIN || errno == EWOULDBLOCK) - break; - stream_fd_closed(fd, sfd, 0); - goto done; - } - if (ret >= MAX_RTP_PACKET_SIZE) - ilog(LOG_WARNING, "UDP packet possibly truncated"); - - for (cmh = CMSG_FIRSTHDR(&mh); cmh; cmh = CMSG_NXTHDR(&mh, cmh)) { - if (cmh->cmsg_level == IPPROTO_IPV6 && cmh->cmsg_type == IPV6_PKTINFO) { - pi6 = (void *) CMSG_DATA(cmh); - dst = &pi6->ipi6_addr; - goto got_dst; - } - // XXX -// if (cmh->cmsg_level == IPPROTO_IP && cmh->cmsg_type == IP_PKTINFO) { -// pi = (void *) CMSG_DATA(cmh); -// in4_to_6(&dst_buf, pi->ipi_addr.s_addr); -// dst = &dst_buf; -// goto got_dst; -// } - } - - ilog(LOG_WARNING, "No pkt_info present in received UDP packet, cannot handle packet"); - goto done; - -got_dst: - str_init_len(&s, buf + RTP_BUFFER_HEAD_ROOM, ret); - // XXX - ep.port = ntohs(sin6_src.sin6_port); - ep.address.u.ipv6 = sin6_src.sin6_addr; - sa.u.ipv6 = *dst; - ret = stream_packet(sfd, &s, &ep, &sa); - if (ret < 0) { - ilog(LOG_WARNING, "Write error on RTP socket: %s", strerror(-ret)); - call_destroy(sfd->call); - goto done; - } - if (ret == 1) - update = 1; - } - -out: - ca = sfd->call ? : NULL; - - if (ca && update) - redis_update(ca, sfd->call->callmaster->conf.redis); -done: - log_info_clear(); -} - - - - /* called with call->master_lock held in R */ static int call_timer_delete_monologues(struct call *c) { @@ -1116,7 +235,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { if (!ps->media) goto next; - sfd = ps->sfd; + sfd = ps->selected_sfd; if (!sfd) goto no_sfd; @@ -1398,7 +517,7 @@ static void callmaster_timer(void *ptr) { rwlock_lock_r(&sfd->call->master_lock); ps = sfd->stream; - if (!ps || ps->sfd != sfd) { + if (!ps || ps->selected_sfd != sfd) { rwlock_unlock_r(&sfd->call->master_lock); goto next; } @@ -1530,73 +649,6 @@ fail: -int __get_consecutive_ports(socket_t *array, int array_len, int wanted_start_port, - const struct call_media *media) -{ - int i, j, cycle = 0; - socket_t *it; - int port; - struct intf_spec *spec; - struct port_pool *pp; - const struct call *c = media->call; - const struct local_intf *lif = g_atomic_pointer_get(&media->local_intf); - - memset(array, -1, sizeof(*array) * array_len); - spec = lif->spec; - pp = &spec->port_pool; - - if (wanted_start_port > 0) - port = wanted_start_port; - else { - port = g_atomic_int_get(&pp->last_used); -#if PORT_RANDOM_MIN && PORT_RANDOM_MAX - port += PORT_RANDOM_MIN + (random() % (PORT_RANDOM_MAX - PORT_RANDOM_MIN)); -#endif - } - - while (1) { - if (!wanted_start_port) { - if (port < pp->min) - port = pp->min; - if ((port & 1)) - port++; - } - - for (i = 0; i < array_len; i++) { - it = &array[i]; - - if (!wanted_start_port && port > pp->max) { - port = 0; - cycle++; - goto release_restart; - } - - if (get_port(it, port++, lif, c)) - goto release_restart; - } - break; - -release_restart: - for (j = 0; j < i; j++) - release_port(&array[j], lif); - - if (cycle >= 2 || wanted_start_port > 0) - goto fail; - } - - /* success */ - g_atomic_int_set(&pp->last_used, port); - - ilog(LOG_DEBUG, "Opened ports %u..%u for media relay", - array[0].local.port, array[array_len - 1].local.port); - return 0; - -fail: - ilog(LOG_ERR, "Failed to get %u consecutive UDP ports for relay", - array_len); - return -1; -} - static void __payload_type_free(void *p) { g_slice_free1(sizeof(struct rtp_payload_type), p); } @@ -1634,51 +686,21 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con return med; } -static void stream_fd_free(void *p) { - struct stream_fd *f = p; - - release_port(&f->socket, f->local_intf); - crypto_cleanup(&f->crypto); - dtls_connection_cleanup(&f->dtls); - - obj_put(f->call); -} - -struct stream_fd *__stream_fd_new(socket_t *fd, struct call_media *media) { - struct call *call = media->call; - struct stream_fd *sfd; - struct poller_item pi; - struct poller *po = call->callmaster->poller; - - sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); - sfd->socket = *fd; - sfd->call = obj_get(call); - sfd->local_intf = g_atomic_pointer_get(&media->local_intf); - call->stream_fds = g_slist_prepend(call->stream_fds, sfd); /* hand over ref */ - - ZERO(pi); - pi.fd = sfd->socket.fd; - pi.obj = &sfd->obj; - pi.readable = stream_fd_readable; - pi.closed = stream_fd_closed; - - poller_add_item(po, &pi); - - return sfd; -} - static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigned int num_ports, const struct endpoint *ep) { GSList *l; struct endpoint_map *em; - socket_t fd_arr[16]; - unsigned int i; struct stream_fd *sfd; + GQueue intf_sockets = G_QUEUE_INIT; + socket_t *sock; + struct intf_list *il, *em_il; for (l = media->endpoint_maps; l; l = l->next) { em = l->data; - if (em->wildcard && em->sfds.length >= num_ports) { + if (em->logical_intf != media->logical_intf) + continue; + if (em->wildcard && em->num_ports >= num_ports) { __C_DBG("found a wildcard endpoint map%s", ep ? " and filling it in" : ""); if (ep) { em->endpoint = *ep; @@ -1695,7 +717,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne } else if (memcmp(&em->endpoint, ep, sizeof(*ep))) continue; - if (em->sfds.length >= num_ports) { + if (em->num_ports >= num_ports) { if (is_addr_unspecified(&em->endpoint.address)) em->endpoint.address = ep->address; return em; @@ -1703,7 +725,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne /* endpoint matches, but not enough ports. flush existing ports * and allocate a new set. */ __C_DBG("endpoint matches, doesn't have enough ports"); - g_queue_clear(&em->sfds); + g_queue_clear_full(&em->intf_sfds, (void *) free_intf_list); goto alloc; } @@ -1713,49 +735,72 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne em->endpoint = *ep; else em->wildcard = 1; - g_queue_init(&em->sfds); + em->logical_intf = media->logical_intf; + em->num_ports = num_ports; + g_queue_init(&em->intf_sfds); media->endpoint_maps = g_slist_prepend(media->endpoint_maps, em); alloc: - if (num_ports > G_N_ELEMENTS(fd_arr)) + if (num_ports > 16) return NULL; - if (__get_consecutive_ports(fd_arr, num_ports, 0, media)) + if (get_consecutive_ports(&intf_sockets, num_ports, media->logical_intf)) return NULL; __C_DBG("allocating stream_fds for %u ports", num_ports); - for (i = 0; i < num_ports; i++) { - sfd = __stream_fd_new(&fd_arr[i], media); - g_queue_push_tail(&em->sfds, sfd); /* not referenced */ + + while ((il = g_queue_pop_head(&intf_sockets))) { + if (il->list.length != num_ports) + goto next_il; + + em_il = g_slice_alloc0(sizeof(*em_il)); + em_il->local_intf = il->local_intf; + g_queue_push_tail(&em->intf_sfds, em_il); + + while ((sock = g_queue_pop_head(&il->list))) { + sfd = stream_fd_new(sock, media->call, il->local_intf); + g_queue_push_tail(&em_il->list, sfd); /* not referenced */ + } + +next_il: + free_socket_intf_list(il); } return em; } -static void __assign_stream_fds(struct call_media *media, GList *sfds) { - GList *l; +static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { + GList *l, *k, *m; struct packet_stream *ps; struct stream_fd *sfd; - int reset = 0; + struct intf_list *il; + int first = 1; - for (l = media->streams.head; l; l = l->next) { - assert(sfds != NULL); - ps = l->data; - sfd = sfds->data; + for (l = intf_sfds->head; l; l = l->next) { + il = l->data; + + for (m = il->list.head, k = media->streams.head; m && k; m = m->next, k = k->next) { + sfd = m->data; + ps = k->data; + + if (first) + g_queue_clear(&ps->sfds); + + g_queue_push_tail(&ps->sfds, sfd); - /* if we switch local ports, we reset crypto params and ICE */ - if (ps->sfd && ps->sfd != sfd) { - dtls_shutdown(ps); - crypto_reset(&ps->sfd->crypto); - reset = 1; + if (!ps->selected_sfd) + ps->selected_sfd = sfd; + + /* XXX: + * check whether previous/currect selected_sfd is actually part of + * current sfds list. + * if selected_sfd changes, take previously selected interface into account. + * handle crypto/dtls resets by moving contexts into sfd struct. + * handle ice resets too. + */ } - ps->sfd = sfd; - sfd->stream = ps; - sfds = sfds->next; + first = 0; } - - if (reset && media->ice_agent) - ice_restart(media->ice_agent); } static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_ports) { @@ -1765,7 +810,7 @@ static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_po if (!em) return -1; - __assign_stream_fds(media, em->sfds.head); + __assign_stream_fds(media, &em->intf_sfds); return 0; } @@ -1830,7 +875,7 @@ static void __fill_stream(struct packet_stream *ps, const struct endpoint *epp, enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { struct call_media *media = ps->media; - if (!ps->sfd) + if (!ps->selected_sfd || !ps->sfds.length) return CSS_SHUTDOWN; if (MEDIA_ISSET(media, ICE) && !ice_has_finished(media)) @@ -1838,7 +883,7 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { if (MEDIA_ISSET(media, DTLS)) { mutex_lock(&ps->in_lock); - if (ps->sfd->dtls.init && !ps->sfd->dtls.connected) { + if (ps->selected_sfd->dtls.init && !ps->selected_sfd->dtls.connected) { dtls(ps, NULL, NULL); mutex_unlock(&ps->in_lock); return CSS_DTLS; @@ -1849,7 +894,7 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { return CSS_RUNNING; } -static void __call_media_state_machine(struct call_media *m) { +void call_media_state_machine(struct call_media *m) { GList *l; for (l = m->streams.head; l; l = l->next) @@ -1861,9 +906,9 @@ static int __init_stream(struct packet_stream *ps) { struct call *call = ps->call; int active; - if (ps->sfd) { + if (ps->selected_sfd) { if (MEDIA_ISSET(media, SDES)) - crypto_init(&ps->sfd->crypto, &media->sdes_in.params); + crypto_init(&ps->selected_sfd->crypto, &media->sdes_in.params); if (MEDIA_ISSET(media, DTLS) && !PS_ISSET(ps, FALLBACK_RTCP)) { active = (PS_ISSET(ps, FILLED) && MEDIA_ISSET(media, SETUP_ACTIVE)); @@ -2165,7 +1210,8 @@ static void __disable_streams(struct call_media *media, unsigned int num_ports) for (l = media->streams.head; l; l = l->next) { ps = l->data; - ps->sfd = NULL; + g_queue_clear(&ps->sfds); + ps->selected_sfd = NULL; } } @@ -2254,7 +1300,7 @@ static void __set_all_tos(struct call *c) { for (l = c->stream_fds; l; l = l->next) { sfd = l->data; - set_tos(sfd->socket.fd, c->tos); + set_tos(&sfd->socket, c->tos); } } @@ -2282,9 +1328,9 @@ static void __tos_change(struct call *call, const struct sdp_ng_flags *flags) { static void __init_interface(struct call_media *media, const str *ifname) { /* we're holding master_lock in W mode here, so we can safely ignore the * atomic ops */ - struct local_intf *ifa = (void *) media->local_intf; + //struct local_intf *ifa = (void *) media->local_intf; - if (!media->logical_intf || !ifa) + if (!media->logical_intf /* || !ifa */) goto get; if (!ifname || !ifname->s) return; @@ -2302,13 +1348,13 @@ get: ilog(LOG_WARNING, "Interface '"STR_FORMAT"' not found, using default", STR_FMT(ifname)); media->logical_intf = get_logical_interface(NULL, media->desired_family); } - media->local_intf = ifa = get_interface_address(media->logical_intf, media->desired_family); - if (!ifa) { - ilog(LOG_WARNING, "No usable address in interface '"STR_FORMAT"' found, using default", - STR_FMT(ifname)); - media->local_intf = ifa = get_any_interface_address(media->logical_intf, media->desired_family); - media->desired_family = ifa->spec->address.addr.family; - } +// media->local_intf = ifa = get_interface_address(media->logical_intf, media->desired_family); +// if (!ifa) { +// ilog(LOG_WARNING, "No usable address in interface '"STR_FORMAT"' found, using default", +// STR_FMT(ifname)); +// media->local_intf = ifa = get_any_interface_address(media->logical_intf, media->desired_family); +// media->desired_family = ifa->spec->address.addr.family; +// } } @@ -2517,7 +1563,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, goto error; __num_media_streams(media, num_ports); - __assign_stream_fds(media, em->sfds.head); + __assign_stream_fds(media, &em->intf_sfds); if (__num_media_streams(other_media, num_ports)) { /* new streams created on OTHER side. normally only happens in @@ -2545,19 +1591,6 @@ error: return -1; } -/* must be called with in_lock held or call->master_lock held in W */ -static void __unkernelize(struct packet_stream *p) { - if (!PS_ISSET(p, KERNELIZED)) - return; - if (PS_ISSET(p, NO_KERNEL_SUPPORT)) - return; - - if (p->call->callmaster->conf.kernelfd >= 0) - kernel_del_stream(p->call->callmaster->conf.kernelfd, p->sfd->socket.local.port); - - PS_CLEAR(p, KERNELIZED); -} - static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { struct timeval dp, oa; @@ -2746,7 +1779,7 @@ void call_destroy(struct call *c) { cdrlinecnt, md->index, protocol, addr, cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, - (ps->sfd ? ps->sfd->socket.local.port : 0), + (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), cdrlinecnt, md->index, protocol, atomic64_get(&ps->stats.packets), cdrlinecnt, md->index, protocol, @@ -2800,7 +1833,7 @@ void call_destroy(struct call *c) { cdrlinecnt, md->index, protocol, addr, cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, - (ps->sfd ? ps->sfd->socket.local.port : 0), + (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), cdrlinecnt, md->index, protocol, atomic64_get(&ps->stats.packets), cdrlinecnt, md->index, protocol, @@ -2818,7 +1851,7 @@ void call_destroy(struct call *c) { ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5u%s, " ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet", - (unsigned int) (ps->sfd ? ps->sfd->socket.local.port : 0), + (unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), addr, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", atomic64_get(&ps->stats.packets), @@ -2926,7 +1959,8 @@ void call_destroy(struct call *c) { __unkernelize(ps); dtls_shutdown(ps); - ps->sfd = NULL; + ps->selected_sfd = NULL; + g_queue_clear(&ps->sfds); crypto_cleanup(&ps->crypto); ps->rtp_sink = NULL; @@ -2947,7 +1981,7 @@ void call_destroy(struct call *c) { /* XXX unify and move these */ static int call_stream_address4(char *o, struct packet_stream *ps, enum stream_address_format format, - int *len, struct local_intf *ifa) + int *len, const struct local_intf *ifa) { int l = 0; @@ -2970,7 +2004,7 @@ static int call_stream_address4(char *o, struct packet_stream *ps, enum stream_a } static int call_stream_address6(char *o, struct packet_stream *ps, enum stream_address_format format, - int *len, struct local_intf *ifa) + int *len, const struct local_intf *ifa) { int l = 0; @@ -2994,29 +2028,18 @@ static int call_stream_address6(char *o, struct packet_stream *ps, enum stream_a int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address_format format, - int *len, struct local_intf *ifa) + int *len, const struct local_intf *ifa) { struct packet_stream *sink; sink = packet_stream_sink(ps); + if (!ifa) + ifa = sink->selected_sfd->local_intf; if (ifa->spec->address.addr.family->af == AF_INET) /* XXX fix */ return call_stream_address4(o, sink, format, len, ifa); return call_stream_address6(o, sink, format, len, ifa); } -int call_stream_address(char *o, struct packet_stream *ps, enum stream_address_format format, int *len) { - struct local_intf *ifa; - struct call_media *media; - - media = ps->media; - - ifa = g_atomic_pointer_get(&media->local_intf); - if (!ifa) - return -1; - - return call_stream_address46(o, ps, format, len, ifa); -} - static void __call_free(void *p) { struct call *c = p; @@ -3045,7 +2068,7 @@ static void __call_free(void *p) { while (md->endpoint_maps) { em = md->endpoint_maps->data; md->endpoint_maps = g_slist_delete_link(md->endpoint_maps, md->endpoint_maps); - g_queue_clear(&em->sfds); + g_queue_clear_full(&em->intf_sfds, (void *) free_intf_list); g_slice_free1(sizeof(*em), em); } g_hash_table_destroy(md->rtp_payload_types); @@ -3187,26 +2210,6 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) { g_hash_table_insert(call->viabranches, &ml->viabranch, ml); } -static void __stream_unconfirm(struct packet_stream *ps) { - __unkernelize(ps); - PS_CLEAR(ps, CONFIRMED); - ps->handler = NULL; -} -static void stream_unconfirm(struct packet_stream *ps) { - if (!ps) - return; - mutex_lock(&ps->in_lock); - __stream_unconfirm(ps); - mutex_unlock(&ps->in_lock); -} -static void unkernelize(struct packet_stream *ps) { - if (!ps) - return; - mutex_lock(&ps->in_lock); - __unkernelize(ps); - mutex_unlock(&ps->in_lock); -} - /* must be called with call->master_lock held in W */ static void __monologue_unkernelize(struct call_monologue *monologue) { GList *l, *m; diff --git a/daemon/call.h b/daemon/call.h index 043143e39..b824846cf 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -259,7 +259,9 @@ struct stream_params { struct endpoint_map { struct endpoint endpoint; - GQueue sfds; + unsigned int num_ports; + const struct logical_intf *logical_intf; + GQueue intf_sfds; /* list of struct intf_list - contains stream_fd list */ int wildcard:1; }; @@ -288,7 +290,8 @@ struct packet_stream { struct call *call; /* RO */ unsigned int component; /* RO, starts with 1 */ - struct stream_fd *sfd; /* LOCK: call->master_lock */ + GQueue sfds; /* LOCK: call->master_lock */ + struct stream_fd * volatile selected_sfd; struct packet_stream *rtp_sink; /* LOCK: call->master_lock */ struct packet_stream *rtcp_sink; /* LOCK: call->master_lock */ struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ @@ -329,7 +332,7 @@ struct call_media { /* local_address is protected by call->master_lock in W mode, but may * still be modified if the lock is held in R mode, therefore we use * atomic ops to access it when holding an R lock. */ - const volatile struct local_intf *local_intf; + //const volatile struct local_intf *local_intf; struct ice_agent *ice_agent; @@ -449,7 +452,6 @@ struct call_monologue *__monologue_create(struct call *call); void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct stream_fd *__stream_fd_new(socket_t *fd, struct call_media *); -int __get_consecutive_ports(socket_t *array, int array_len, int wanted_start_port, const struct call_media *); struct packet_stream *__packet_stream_new(struct call *call); @@ -463,12 +465,11 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay); void call_destroy(struct call *); enum call_stream_state call_stream_state_machine(struct packet_stream *); +void call_media_state_machine(struct call_media *m); void call_media_unkernelize(struct call_media *media); -void kernelize(struct packet_stream *); -int call_stream_address(char *, struct packet_stream *, enum stream_address_format, int *); int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address_format format, - int *len, struct local_intf *ifa); + int *len, const struct local_intf *ifa); const struct transport_protocol *transport_protocol(const str *s); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index a89183900..ee02b9bf0 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -31,7 +31,7 @@ static int call_stream_address_gstring(GString *o, struct packet_stream *ps, enu int len, ret; char buf[64]; /* 64 bytes ought to be enough for anybody */ - ret = call_stream_address(buf, ps, format, &len); + ret = call_stream_address46(buf, ps, format, &len, NULL); g_string_append_len(o, buf, len); return ret; } @@ -66,7 +66,7 @@ found: if (format == SAF_TCP) call_stream_address_gstring(o, ps, format); - port = ps->sfd ? ps->sfd->socket.local.port : 0; + port = ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0; g_string_append_printf(o, (format == 1) ? "%i " : " %i", port); if (format == SAF_UDP) { @@ -811,8 +811,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps dict = bencode_list_add_dictionary(list); - if (ps->sfd) - bencode_dictionary_add_integer(dict, "local port", ps->sfd->socket.local.port); + if (ps->selected_sfd) + bencode_dictionary_add_integer(dict, "local port", ps->selected_sfd->socket.local.port); ng_stats_endpoint(bencode_dictionary_add_dictionary(dict, "endpoint"), &ps->endpoint); ng_stats_endpoint(bencode_dictionary_add_dictionary(dict, "advertised endpoint"), &ps->advertised_endpoint); diff --git a/daemon/cli.c b/daemon/cli.c index 719b2c5b8..2b876201c 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -184,7 +184,7 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5u%s, " ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n", md->index, - (unsigned int) (ps->sfd ? ps->sfd->socket.local.port : 0), + (unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", atomic64_get(&ps->stats.packets), diff --git a/daemon/dtls.c b/daemon/dtls.c index 2c9d902f8..c856741ea 100644 --- a/daemon/dtls.c +++ b/daemon/dtls.c @@ -475,12 +475,12 @@ int dtls_connection_init(struct packet_stream *ps, int active, struct dtls_cert struct dtls_connection *d; unsigned long err; - if (!ps || !ps->sfd) + if (!ps || !ps->selected_sfd) return 0; __DBG("dtls_connection_init(%i)", active); - d = &ps->sfd->dtls; + d = &ps->selected_sfd->dtls; if (d->init) { if ((d->active && active) || (!d->active && !active)) @@ -516,7 +516,7 @@ int dtls_connection_init(struct packet_stream *ps, int active, struct dtls_cert if (!d->r_bio || !d->w_bio) goto error; - SSL_set_app_data(d->ssl, ps->sfd); /* XXX obj reference here? */ + SSL_set_app_data(d->ssl, ps->selected_sfd); /* XXX obj reference here? */ SSL_set_bio(d->ssl, d->r_bio, d->w_bio); SSL_set_mode(d->ssl, SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); @@ -606,15 +606,15 @@ found: if (d->active) { /* we're the client */ crypto_init(&ps->crypto, &client); - crypto_init(&ps->sfd->crypto, &server); + crypto_init(&ps->selected_sfd->crypto, &server); } else { /* we're the server */ crypto_init(&ps->crypto, &server); - crypto_init(&ps->sfd->crypto, &client); + crypto_init(&ps->selected_sfd->crypto, &client); } - crypto_dump_keys(&ps->crypto, &ps->sfd->crypto); + crypto_dump_keys(&ps->crypto, &ps->selected_sfd->crypto); return 0; @@ -635,12 +635,12 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) { struct msghdr mh; struct iovec iov; - if (!ps || !ps->sfd) + if (!ps || !ps->selected_sfd) return 0; if (!MEDIA_ISSET(ps->media, DTLS)) return 0; - d = &ps->sfd->dtls; + d = &ps->selected_sfd->dtls; if (s) __DBG("dtls packet input: len %u %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", @@ -664,7 +664,7 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) { ret = try_connect(d); if (ret == -1) { - ilog(LOG_ERROR, "DTLS error on local port %u", ps->sfd->socket.local.port); + ilog(LOG_ERROR, "DTLS error on local port %u", ps->selected_sfd->socket.local.port); /* fatal error */ dtls_connection_cleanup(d); return 0; @@ -717,7 +717,7 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) { stream_msg_mh_src(ps, &mh); - socket_sendmsg(&ps->sfd->socket, &mh, fsin); + socket_sendmsg(&ps->selected_sfd->socket, &mh, fsin); return 0; } @@ -726,12 +726,12 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) { void dtls_shutdown(struct packet_stream *ps) { struct dtls_connection *d; - if (!ps || !ps->sfd) + if (!ps || !ps->selected_sfd) return; __DBG("dtls_shutdown"); - d = &ps->sfd->dtls; + d = &ps->selected_sfd->dtls; if (!d->init) return; @@ -748,7 +748,7 @@ void dtls_shutdown(struct packet_stream *ps) { } crypto_reset(&ps->crypto); - crypto_reset(&ps->sfd->crypto); + crypto_reset(&ps->selected_sfd->crypto); } void dtls_connection_cleanup(struct dtls_connection *c) { diff --git a/daemon/ice.c b/daemon/ice.c index 46452632f..c1b875118 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -620,7 +620,7 @@ static void __do_ice_check(struct ice_candidate_pair *pair) { stun_binding_request(&pair->remote_candidate->endpoint, transact, &ag->pwd[0], ag->ufrag, AGENT_ISSET(ag, CONTROLLING), tie_breaker, - prio, &pair->local_intf->spec->address.addr, &ps->sfd->socket, + prio, &pair->local_intf->spec->address.addr, &ps->selected_sfd->socket, PAIR_ISSET(pair, TO_USE)); } @@ -711,7 +711,7 @@ static void __do_ice_checks(struct ice_agent *ag) { /* skip dead streams */ ps = pair->packet_stream; - if (!ps || !ps->sfd) + if (!ps || !ps->selected_sfd) continue; if (PAIR_ISSET(pair, FAILED)) continue; @@ -972,10 +972,11 @@ found: static int __check_valid(struct ice_agent *ag) { struct call_media *media = ag->media; struct packet_stream *ps; - GList *l, *k; + GList *l, *k, *m; GQueue all_compos; struct ice_candidate_pair *pair; - const struct local_intf *ifa; +// const struct local_intf *ifa; + struct stream_fd *sfd; __get_complete_valid_pairs(&all_compos, ag); @@ -988,12 +989,13 @@ static int __check_valid(struct ice_agent *ag) { ilog(LOG_DEBUG, "ICE completed, using pair "PAIR_FORMAT, PAIR_FMT(pair)); AGENT_SET(ag, COMPLETED); - ifa = g_atomic_pointer_get(&media->local_intf); - if (ifa != pair->local_intf - && g_atomic_pointer_compare_and_exchange(&media->local_intf, ifa, - pair->local_intf)) - ilog(LOG_INFO, "ICE negotiated: local interface %s", - sockaddr_print_buf(&pair->local_intf->spec->address.addr)); +// XXX restore this +// ifa = g_atomic_pointer_get(&media->local_intf); +// if (ifa != pair->local_intf +// && g_atomic_pointer_compare_and_exchange(&media->local_intf, ifa, +// pair->local_intf)) +// ilog(LOG_INFO, "ICE negotiated: local interface %s", +// sockaddr_print_buf(&pair->local_intf->spec->address.addr)); for (l = media->streams.head, k = all_compos.head; l && k; l = l->next, k = k->next) { ps = l->data; @@ -1006,6 +1008,13 @@ static int __check_valid(struct ice_agent *ag) { ps->endpoint = pair->remote_candidate->endpoint; } mutex_unlock(&ps->out_lock); + + for (m = ps->sfds.head; m; m = m->next) { + sfd = m->data; + if (sfd->local_intf != pair->local_intf) + continue; + ps->selected_sfd = sfd; + } } call_media_unkernelize(media); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 13e521a9c..365fa4e89 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2,9 +2,206 @@ #include #include #include +#include #include "str.h" #include "ice.h" #include "socket.h" +#include "redis.h" +#include "rtp.h" +#include "ice.h" +#include "stun.h" +#include "kernel.h" +#include "xt_RTPENGINE.h" +#include "rtcp.h" +#include "sdp.h" + + +typedef int (*rewrite_func)(str *, struct packet_stream *); + + +struct streamhandler_io { + rewrite_func rtp; + rewrite_func rtcp; + int (*kernel)(struct rtpengine_srtp *, struct packet_stream *); +}; +struct streamhandler { + const struct streamhandler_io *in; + const struct streamhandler_io *out; +}; + + +static void determine_handler(struct packet_stream *in, const struct packet_stream *out); + +static int __k_null(struct rtpengine_srtp *s, struct packet_stream *); +static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *); +static int __k_srtp_decrypt(struct rtpengine_srtp *s, struct packet_stream *); + +static int call_avp2savp_rtp(str *s, struct packet_stream *); +static int call_savp2avp_rtp(str *s, struct packet_stream *); +static int call_avp2savp_rtcp(str *s, struct packet_stream *); +static int call_savp2avp_rtcp(str *s, struct packet_stream *); +static int call_avpf2avp_rtcp(str *s, struct packet_stream *); +//static int call_avpf2savp_rtcp(str *s, struct packet_stream *); +static int call_savpf2avp_rtcp(str *s, struct packet_stream *); +//static int call_savpf2savp_rtcp(str *s, struct packet_stream *); + + + + + +static const struct streamhandler_io __shio_noop = { + .kernel = __k_null, +}; +static const struct streamhandler_io __shio_decrypt = { + .kernel = __k_srtp_decrypt, + .rtp = call_savp2avp_rtp, + .rtcp = call_savp2avp_rtcp, +}; +static const struct streamhandler_io __shio_encrypt = { + .kernel = __k_srtp_encrypt, + .rtp = call_avp2savp_rtp, + .rtcp = call_avp2savp_rtcp, +}; +static const struct streamhandler_io __shio_avpf_strip = { + .kernel = __k_null, + .rtcp = call_avpf2avp_rtcp, +}; +static const struct streamhandler_io __shio_decrypt_avpf_strip = { + .kernel = __k_srtp_decrypt, + .rtp = call_savp2avp_rtp, + .rtcp = call_savpf2avp_rtcp, +}; + +/* ********** */ + +static const struct streamhandler __sh_noop = { + .in = &__shio_noop, + .out = &__shio_noop, +}; +static const struct streamhandler __sh_savp2avp = { + .in = &__shio_decrypt, + .out = &__shio_noop, +}; +static const struct streamhandler __sh_avp2savp = { + .in = &__shio_noop, + .out = &__shio_encrypt, +}; +static const struct streamhandler __sh_avpf2avp = { + .in = &__shio_avpf_strip, + .out = &__shio_noop, +}; +static const struct streamhandler __sh_avpf2savp = { + .in = &__shio_avpf_strip, + .out = &__shio_encrypt, +}; +static const struct streamhandler __sh_savpf2avp = { + .in = &__shio_decrypt_avpf_strip, + .out = &__shio_noop, +}; +static const struct streamhandler __sh_savp2savp = { + .in = &__shio_decrypt, + .out = &__shio_encrypt, +}; +static const struct streamhandler __sh_savpf2savp = { + .in = &__shio_decrypt_avpf_strip, + .out = &__shio_encrypt, +}; + +/* ********** */ + +static const struct streamhandler *__sh_matrix_in_rtp_avp[] = { + [PROTO_RTP_AVP] = &__sh_noop, + [PROTO_RTP_AVPF] = &__sh_noop, + [PROTO_RTP_SAVP] = &__sh_avp2savp, + [PROTO_RTP_SAVPF] = &__sh_avp2savp, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_avp2savp, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_avp2savp, + [PROTO_UDPTL] = &__sh_noop, +}; +static const struct streamhandler *__sh_matrix_in_rtp_avpf[] = { + [PROTO_RTP_AVP] = &__sh_avpf2avp, + [PROTO_RTP_AVPF] = &__sh_noop, + [PROTO_RTP_SAVP] = &__sh_avpf2savp, + [PROTO_RTP_SAVPF] = &__sh_avp2savp, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_avpf2savp, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_avp2savp, + [PROTO_UDPTL] = &__sh_noop, +}; +static const struct streamhandler *__sh_matrix_in_rtp_savp[] = { + [PROTO_RTP_AVP] = &__sh_savp2avp, + [PROTO_RTP_AVPF] = &__sh_savp2avp, + [PROTO_RTP_SAVP] = &__sh_noop, + [PROTO_RTP_SAVPF] = &__sh_noop, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_noop, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_noop, + [PROTO_UDPTL] = &__sh_noop, +}; +static const struct streamhandler *__sh_matrix_in_rtp_savpf[] = { + [PROTO_RTP_AVP] = &__sh_savpf2avp, + [PROTO_RTP_AVPF] = &__sh_savp2avp, + [PROTO_RTP_SAVP] = &__sh_savpf2savp, + [PROTO_RTP_SAVPF] = &__sh_noop, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_savpf2savp, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_noop, + [PROTO_UDPTL] = &__sh_noop, +}; +static const struct streamhandler *__sh_matrix_in_rtp_savp_recrypt[] = { + [PROTO_RTP_AVP] = &__sh_savp2avp, + [PROTO_RTP_AVPF] = &__sh_savp2avp, + [PROTO_RTP_SAVP] = &__sh_savp2savp, + [PROTO_RTP_SAVPF] = &__sh_savp2savp, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_savp2savp, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_savp2savp, + [PROTO_UDPTL] = &__sh_noop, +}; +static const struct streamhandler *__sh_matrix_in_rtp_savpf_recrypt[] = { + [PROTO_RTP_AVP] = &__sh_savpf2avp, + [PROTO_RTP_AVPF] = &__sh_savp2avp, + [PROTO_RTP_SAVP] = &__sh_savpf2savp, + [PROTO_RTP_SAVPF] = &__sh_savp2savp, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_savpf2savp, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_savp2savp, + [PROTO_UDPTL] = &__sh_noop, +}; +static const struct streamhandler *__sh_matrix_noop[] = { + [PROTO_RTP_AVP] = &__sh_noop, + [PROTO_RTP_AVPF] = &__sh_noop, + [PROTO_RTP_SAVP] = &__sh_noop, + [PROTO_RTP_SAVPF] = &__sh_noop, + [PROTO_UDP_TLS_RTP_SAVP] = &__sh_noop, + [PROTO_UDP_TLS_RTP_SAVPF] = &__sh_noop, + [PROTO_UDPTL] = &__sh_noop, +}; + +/* ********** */ + +static const struct streamhandler **__sh_matrix[] = { + [PROTO_RTP_AVP] = __sh_matrix_in_rtp_avp, + [PROTO_RTP_AVPF] = __sh_matrix_in_rtp_avpf, + [PROTO_RTP_SAVP] = __sh_matrix_in_rtp_savp, + [PROTO_RTP_SAVPF] = __sh_matrix_in_rtp_savpf, + [PROTO_UDP_TLS_RTP_SAVP] = __sh_matrix_in_rtp_savp, + [PROTO_UDP_TLS_RTP_SAVPF] = __sh_matrix_in_rtp_savpf, + [PROTO_UDPTL] = __sh_matrix_noop, +}; +/* special case for DTLS as we can't pass through SRTP<>SRTP */ +static const struct streamhandler **__sh_matrix_recrypt[] = { + [PROTO_RTP_AVP] = __sh_matrix_in_rtp_avp, + [PROTO_RTP_AVPF] = __sh_matrix_in_rtp_avpf, + [PROTO_RTP_SAVP] = __sh_matrix_in_rtp_savp_recrypt, + [PROTO_RTP_SAVPF] = __sh_matrix_in_rtp_savpf_recrypt, + [PROTO_UDP_TLS_RTP_SAVP] = __sh_matrix_in_rtp_savp_recrypt, + [PROTO_UDP_TLS_RTP_SAVPF] = __sh_matrix_in_rtp_savpf_recrypt, + [PROTO_UDPTL] = __sh_matrix_noop, +}; + +/* ********** */ + +static const struct rtpengine_srtp __res_null = { + .cipher = REC_NULL, + .hmac = REH_NULL, +}; + @@ -153,14 +350,14 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc /* XXX family specific */ -void set_tos(int fd, unsigned int tos) { +void set_tos(socket_t *sock, unsigned int tos) { unsigned char ctos; ctos = tos; - setsockopt(fd, IPPROTO_IP, IP_TOS, &ctos, sizeof(tos)); + setsockopt(sock->fd, IPPROTO_IP, IP_TOS, &ctos, sizeof(tos)); #ifdef IPV6_TCLASS - setsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof(tos)); + setsockopt(sock->fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof(tos)); #else #warning "Will not set IPv6 traffic class" #endif @@ -168,22 +365,21 @@ void set_tos(int fd, unsigned int tos) { /* XXX family specific? unify? */ -static int get_port6(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c) { - if (open_socket(r, SOCK_DGRAM, port, &lif->spec->address.addr)) +/* XXX set TOS after opening! */ +static int get_port6(socket_t *r, unsigned int port, struct intf_spec *spec) { + if (open_socket(r, SOCK_DGRAM, port, &spec->address.addr)) return -1; - set_tos(r->fd, c->tos); - return 0; } -int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c) { +static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec) { int ret; struct port_pool *pp; __C_DBG("attempting to open port %u", port); - pp = &lif->spec->port_pool; + pp = &spec->port_pool; if (bit_array_set(pp->ports_used, port)) { __C_DBG("port in use"); @@ -191,7 +387,7 @@ int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const } __C_DBG("port locked"); - ret = get_port6(r, port, lif, c); + ret = get_port6(r, port, spec); if (ret) { __C_DBG("couldn't open port"); @@ -202,8 +398,883 @@ int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const return 0; } -void release_port(socket_t *r, const struct local_intf *lif) { +static void release_port(socket_t *r, struct intf_spec *spec) { __C_DBG("releasing port %u", r->local.port); - bit_array_clear(lif->spec->port_pool.ports_used, r->local.port); + bit_array_clear(spec->port_pool.ports_used, r->local.port); close_socket(r); } +static void free_port(socket_t *r, struct intf_spec *spec) { + release_port(r, spec); + g_slice_free1(sizeof(*r), r); +} + + + +/* puts list of socket_t into "out" */ +static int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port, + struct intf_spec *spec) +{ + int i, cycle = 0; + socket_t *sk; + int port; + struct port_pool *pp; + + if (num_ports == 0) + return 0; + + pp = &spec->port_pool; + + if (wanted_start_port > 0) + port = wanted_start_port; + else { + port = g_atomic_int_get(&pp->last_used); +#if PORT_RANDOM_MIN && PORT_RANDOM_MAX + port += PORT_RANDOM_MIN + (random() % (PORT_RANDOM_MAX - PORT_RANDOM_MIN)); +#endif + } + + while (1) { + if (!wanted_start_port) { + if (port < pp->min) + port = pp->min; + if ((port & 1)) + port++; + } + + for (i = 0; i < num_ports; i++) { + sk = g_slice_alloc0(sizeof(*sk)); + g_queue_push_tail(out, sk); + + if (!wanted_start_port && port > pp->max) { + port = 0; + cycle++; + goto release_restart; + } + + if (get_port(sk, port++, spec)) + goto release_restart; + } + break; + +release_restart: + while ((sk = g_queue_pop_head(out))) + free_port(sk, spec); + + if (cycle >= 2 || wanted_start_port > 0) + goto fail; + } + + /* success */ + g_atomic_int_set(&pp->last_used, port); + + ilog(LOG_DEBUG, "Opened ports %u.. on interface %s for media relay", + ((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->address.addr)); + return 0; + +fail: + ilog(LOG_ERR, "Failed to get %u consecutive ports on interface %s for media relay", + num_ports, sockaddr_print_buf(&spec->address.addr)); + return -1; +} + +/* puts a list of "struct intf_list" into "out", containing socket_t list */ +int get_consecutive_ports(GQueue *out, unsigned int num_ports, const struct logical_intf *log) { + GList *l; + struct intf_list *il; + const struct local_intf *loc; + + for (l = log->list.head; l; l = l->next) { + loc = l->data; + + il = g_slice_alloc0(sizeof(*il)); + il->local_intf = loc; + g_queue_push_tail(out, il); + if (G_LIKELY(!__get_consecutive_ports(&il->list, num_ports, 0, loc->spec))) + continue; /* success */ + + /* error - clean up */ + g_queue_pop_tail(out); + g_slice_free1(sizeof(*il), il); + continue; + } + + return 0; +} +void free_socket_intf_list(struct intf_list *il) { + socket_t *sock; + + while ((sock = g_queue_pop_head(&il->list))) + free_port(sock, il->local_intf->spec); + g_slice_free1(sizeof(*il), il); +} +void free_intf_list(struct intf_list *il) { + g_queue_clear(&il->list); + g_slice_free1(sizeof(*il), il); +} + + + +/* called lock-free */ +static void stream_fd_closed(int fd, void *p, uintptr_t u) { + struct stream_fd *sfd = p; + struct call *c; + int i; + socklen_t j; + + assert(sfd->socket.fd == fd); + c = sfd->call; + if (!c) + return; + + j = sizeof(i); + getsockopt(fd, SOL_SOCKET, SO_ERROR, &i, &j); + ilog(LOG_WARNING, "Read error on media socket: %i (%s) -- closing call", i, strerror(i)); + + call_destroy(c); +} + + + +/* returns: 0 = not a muxed stream, 1 = muxed, RTP, 2 = muxed, RTCP */ +static int rtcp_demux(str *s, struct call_media *media) { + if (!MEDIA_ISSET(media, RTCP_MUX)) + return 0; + return rtcp_demux_is_rtcp(s) ? 2 : 1; +} + +static int call_avpf2avp_rtcp(str *s, struct packet_stream *stream) { + return rtcp_avpf2avp(s); +} +static int call_avp2savp_rtp(str *s, struct packet_stream *stream) { + return rtp_avp2savp(s, &stream->crypto); +} +static int call_avp2savp_rtcp(str *s, struct packet_stream *stream) { + return rtcp_avp2savp(s, &stream->crypto); +} +static int call_savp2avp_rtp(str *s, struct packet_stream *stream) { + return rtp_savp2avp(s, &stream->selected_sfd->crypto); +} +static int call_savp2avp_rtcp(str *s, struct packet_stream *stream) { + return rtcp_savp2avp(s, &stream->selected_sfd->crypto); +} +static int call_savpf2avp_rtcp(str *s, struct packet_stream *stream) { + int ret; + ret = rtcp_savp2avp(s, &stream->selected_sfd->crypto); + if (ret < 0) + return ret; + return rtcp_avpf2avp(s); +} + + +static int __k_null(struct rtpengine_srtp *s, struct packet_stream *stream) { + *s = __res_null; + return 0; +} +static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c) { + if (!c->params.crypto_suite) + return -1; + + *s = (struct rtpengine_srtp) { + .cipher = c->params.crypto_suite->kernel_cipher, + .hmac = c->params.crypto_suite->kernel_hmac, + .mki_len = c->params.mki_len, + .last_index = c->last_index, + .auth_tag_len = c->params.crypto_suite->srtp_auth_tag, + }; + if (c->params.mki_len) + memcpy(s->mki, c->params.mki, c->params.mki_len); + memcpy(s->master_key, c->params.master_key, c->params.crypto_suite->master_key_len); + memcpy(s->master_salt, c->params.master_salt, c->params.crypto_suite->master_salt_len); + + if (c->params.session_params.unencrypted_srtp) + s->cipher = REC_NULL; + if (c->params.session_params.unauthenticated_srtp) + s->auth_tag_len = 0; + + return 0; +} +static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *stream) { + return __k_srtp_crypt(s, &stream->crypto); +} +static int __k_srtp_decrypt(struct rtpengine_srtp *s, struct packet_stream *stream) { + return __k_srtp_crypt(s, &stream->selected_sfd->crypto); +} + +/* XXX unify this */ +INLINE void __re_address_translate(struct re_address *o, const sockaddr_t *address) { + o->family = address->family->af; + if (o->family == AF_INET) + o->u.ipv4 = address->u.ipv4.s_addr; + else + memcpy(o->u.ipv6, &address->u.ipv6, sizeof(o->u.ipv6)); +} +INLINE void __re_address_translate_ep(struct re_address *o, const endpoint_t *ep) { + __re_address_translate(o, &ep->address); + o->port = ep->port; +} + +static int __rtp_stats_pt_sort(const void *ap, const void *bp) { + const struct rtp_stats *a = ap, *b = bp; + + if (a->payload_type < b->payload_type) + return -1; + if (a->payload_type > b->payload_type) + return 1; + return 0; +} + + +/* called with in_lock held */ +void kernelize(struct packet_stream *stream) { + struct rtpengine_target_info reti; + struct call *call = stream->call; + struct callmaster *cm = call->callmaster; + struct packet_stream *sink = NULL; + const struct local_intf *ifa; + const char *nk_warn_msg; + + if (PS_ISSET(stream, KERNELIZED)) + return; + if (cm->conf.kernelid < 0) + goto no_kernel; + nk_warn_msg = "interface to kernel module not open"; + if (cm->conf.kernelfd < 0) + goto no_kernel_warn; + if (!PS_ISSET(stream, RTP)) + goto no_kernel; + if (!stream->selected_sfd) + goto no_kernel; + + ilog(LOG_INFO, "Kernelizing media stream"); + + sink = packet_stream_sink(stream); + if (!sink) { + ilog(LOG_WARNING, "Attempt to kernelize stream without sink"); + goto no_kernel; + } + + determine_handler(stream, sink); + + if (is_addr_unspecified(&sink->advertised_endpoint.address) + || !sink->advertised_endpoint.port) + goto no_kernel; + nk_warn_msg = "protocol not supported by kernel module"; + if (!stream->handler->in->kernel + || !stream->handler->out->kernel) + goto no_kernel_warn; + + ZERO(reti); + + if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { + mutex_lock(&stream->out_lock); + __re_address_translate_ep(&reti.expected_src, &stream->endpoint); + mutex_unlock(&stream->out_lock); + if (PS_ISSET(stream, STRICT_SOURCE)) + reti.src_mismatch = MSM_DROP; + else if (PS_ISSET(stream, MEDIA_HANDOVER)) + reti.src_mismatch = MSM_PROPAGATE; + } + + mutex_lock(&sink->out_lock); + + reti.target_port = stream->selected_sfd->socket.local.port; + reti.tos = call->tos; + reti.rtcp_mux = MEDIA_ISSET(stream->media, RTCP_MUX); + reti.dtls = MEDIA_ISSET(stream->media, DTLS); + reti.stun = stream->media->ice_agent ? 1 : 0; + + ifa = sink->selected_sfd->local_intf; + __re_address_translate_ep(&reti.dst_addr, &sink->endpoint); + __re_address_translate(&reti.src_addr, &ifa->spec->address.addr); + reti.src_addr.port = sink->selected_sfd->socket.local.port; + reti.ssrc = sink->crypto.ssrc; + + stream->handler->in->kernel(&reti.decrypt, stream); + stream->handler->out->kernel(&reti.encrypt, sink); + + mutex_unlock(&sink->out_lock); + + nk_warn_msg = "encryption cipher or HMAC not supported by kernel module"; + if (!reti.encrypt.cipher || !reti.encrypt.hmac) + goto no_kernel_warn; + nk_warn_msg = "decryption cipher or HMAC not supported by kernel module"; + if (!reti.decrypt.cipher || !reti.decrypt.hmac) + goto no_kernel_warn; + + ZERO(stream->kernel_stats); + + if (stream->media->protocol && stream->media->protocol->rtp) { + GList *values, *l; + struct rtp_stats *rs; + + reti.rtp = 1; + values = g_hash_table_get_values(stream->rtp_stats); + values = g_list_sort(values, __rtp_stats_pt_sort); + for (l = values; l; l = l->next) { + if (reti.num_payload_types >= G_N_ELEMENTS(reti.payload_types)) { + ilog(LOG_WARNING, "Too many RTP payload types for kernel module"); + break; + } + rs = l->data; + reti.payload_types[reti.num_payload_types++] = rs->payload_type; + } + } + + kernel_add_stream(cm->conf.kernelfd, &reti, 0); + PS_SET(stream, KERNELIZED); + + return; + +no_kernel_warn: + ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg); +no_kernel: + PS_SET(stream, KERNELIZED); + PS_SET(stream, NO_KERNEL_SUPPORT); +} + +/* must be called with in_lock held or call->master_lock held in W */ +void __unkernelize(struct packet_stream *p) { + if (!PS_ISSET(p, KERNELIZED)) + return; + if (PS_ISSET(p, NO_KERNEL_SUPPORT)) + return; + + if (p->call->callmaster->conf.kernelfd >= 0) + kernel_del_stream(p->call->callmaster->conf.kernelfd, p->selected_sfd->socket.local.port); + + PS_CLEAR(p, KERNELIZED); +} + + +void __stream_unconfirm(struct packet_stream *ps) { + __unkernelize(ps); + PS_CLEAR(ps, CONFIRMED); + ps->handler = NULL; +} +static void stream_unconfirm(struct packet_stream *ps) { + if (!ps) + return; + mutex_lock(&ps->in_lock); + __stream_unconfirm(ps); + mutex_unlock(&ps->in_lock); +} +void unkernelize(struct packet_stream *ps) { + if (!ps) + return; + mutex_lock(&ps->in_lock); + __unkernelize(ps); + mutex_unlock(&ps->in_lock); +} + + + +/* must be called with call->master_lock held in R, and in->in_lock held */ +static void determine_handler(struct packet_stream *in, const struct packet_stream *out) { + const struct streamhandler **sh_pp, *sh; + const struct streamhandler ***matrix; + + if (in->handler) + return; + if (MEDIA_ISSET(in->media, PASSTHRU)) + goto noop; + + if (!in->media->protocol) + goto err; + if (!out->media->protocol) + goto err; + + matrix = __sh_matrix; + if (MEDIA_ISSET(in->media, DTLS) || MEDIA_ISSET(out->media, DTLS)) + matrix = __sh_matrix_recrypt; + else if (in->media->protocol->srtp && out->media->protocol->srtp + && in->selected_sfd && out->selected_sfd + && (crypto_params_cmp(&in->crypto.params, &out->selected_sfd->crypto.params) + || crypto_params_cmp(&out->crypto.params, &in->selected_sfd->crypto.params))) + matrix = __sh_matrix_recrypt; + + + sh_pp = matrix[in->media->protocol->index]; + if (!sh_pp) + goto err; + sh = sh_pp[out->media->protocol->index]; + if (!sh) + goto err; + in->handler = sh; + + return; + +err: + ilog(LOG_WARNING, "Unknown transport protocol encountered"); +noop: + in->handler = &__sh_noop; + return; +} + + +/* XXX split this function into pieces */ +/* called lock-free */ +static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const sockaddr_t *dst) { + struct packet_stream *stream, + *sink = NULL, + *in_srtp, *out_srtp; + struct call_media *media; + int ret = 0, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0, rtcp = 0, + unk = 0; + int i; + struct msghdr mh; + struct iovec iov; + unsigned char buf[256]; + struct call *call; + struct callmaster *cm; + /*unsigned char cc;*/ + char *addr; + struct endpoint endpoint; + rewrite_func rwf_in, rwf_out; + //struct local_intf *loc_addr; + struct rtp_header *rtp_h; + struct rtp_stats *rtp_s; + + call = sfd->call; + cm = call->callmaster; + addr = endpoint_print_buf(fsin); + + rwlock_lock_r(&call->master_lock); + + stream = sfd->stream; + if (!stream) + goto unlock_out; + + + media = stream->media; + + if (!stream->selected_sfd) + goto unlock_out; + + + /* demux other protocols running on this port */ + + if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) { + mutex_lock(&stream->in_lock); + ret = dtls(stream, s, fsin); + mutex_unlock(&stream->in_lock); + if (!ret) + goto unlock_out; + } + + if (media->ice_agent && is_stun(s)) { + stun_ret = stun(s, stream, fsin, dst); + if (!stun_ret) + goto unlock_out; + if (stun_ret == 1) { + call_media_state_machine(media); + mutex_lock(&stream->in_lock); /* for the jump */ + goto kernel_check; + } + else /* not an stun packet */ + stun_ret = 0; + } + +#if RTP_LOOP_PROTECT + mutex_lock(&stream->in_lock); + + for (i = 0; i < RTP_LOOP_PACKETS; i++) { + if (stream->lp_buf[i].len != s->len) + continue; + if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT))) + continue; + + __C_DBG("packet dupe"); + if (stream->lp_count >= RTP_LOOP_MAX_COUNT) { + ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " + "to avoid potential loop", RTP_LOOP_MAX_COUNT); + goto done; + } + + stream->lp_count++; + goto loop_ok; + } + + /* not a dupe */ + stream->lp_count = 0; + stream->lp_buf[stream->lp_idx].len = s->len; + memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)); + stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS; +loop_ok: + mutex_unlock(&stream->in_lock); +#endif + + + /* demux RTCP */ + + in_srtp = stream; + sink = stream->rtp_sink; + if (!sink && PS_ISSET(stream, RTCP)) { + sink = stream->rtcp_sink; + rtcp = 1; + } + else if (stream->rtcp_sink) { + muxed_rtcp = rtcp_demux(s, media); + if (muxed_rtcp == 2) { + sink = stream->rtcp_sink; + rtcp = 1; + in_srtp = stream->rtcp_sibling; + } + } + out_srtp = sink; + if (rtcp && sink && sink->rtcp_sibling) + out_srtp = sink->rtcp_sibling; + + + /* stats per RTP payload type */ + + if (media->protocol && media->protocol->rtp && !rtcp && !rtp_payload(&rtp_h, NULL, s)) { + i = (rtp_h->m_pt & 0x7f); + + rtp_s = g_hash_table_lookup(stream->rtp_stats, &i); + if (!rtp_s) { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, + "RTP packet with unknown payload type %u received", i); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&cm->statsps.errors); + } + + else { + atomic64_inc(&rtp_s->packets); + atomic64_add(&rtp_s->bytes, s->len); + } + } + + + /* do we have somewhere to forward it to? */ + + if (!sink || !sink->selected_sfd || !out_srtp->selected_sfd || !in_srtp->selected_sfd) { + ilog(LOG_WARNING, "RTP packet from %s discarded", addr); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&cm->statsps.errors); + goto unlock_out; + } + + + /* transcoding stuff, in and out */ + + mutex_lock(&in_srtp->in_lock); + + determine_handler(in_srtp, sink); + + if (!rtcp) { + rwf_in = in_srtp->handler->in->rtp; + rwf_out = in_srtp->handler->out->rtp; + } + else { + rwf_in = in_srtp->handler->in->rtcp; + rwf_out = in_srtp->handler->out->rtcp; + } + + mutex_lock(&out_srtp->out_lock); + + /* return values are: 0 = forward packet, -1 = error/dont forward, + * 1 = forward and push update to redis */ + if (rwf_in) + handler_ret = rwf_in(s, in_srtp); + if (handler_ret >= 0) { + if (rtcp && _log_facility_rtcp) + parse_and_log_rtcp_report(sfd, s->s, s->len); + if (rwf_out) + handler_ret += rwf_out(s, out_srtp); + } + + if (handler_ret > 0) { + __unkernelize(stream); + update = 1; + } + + mutex_unlock(&out_srtp->out_lock); + mutex_unlock(&in_srtp->in_lock); + + + /* endpoint address handling */ + + mutex_lock(&stream->in_lock); + + /* we're OK to (potentially) use the source address of this packet as destination + * in the other direction. */ + /* if the other side hasn't been signalled yet, just forward the packet */ + if (!PS_ISSET(stream, FILLED)) + goto forward; + + /* do not pay attention to source addresses of incoming packets for asymmetric streams */ + if (MEDIA_ISSET(media, ASYMMETRIC)) + PS_SET(stream, CONFIRMED); + + /* if we have already updated the endpoint in the past ... */ + if (PS_ISSET(stream, CONFIRMED)) { + /* see if we need to compare the source address with the known endpoint */ + if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { + endpoint = *fsin; + mutex_lock(&stream->out_lock); + + int tmp = memcmp(&endpoint, &stream->endpoint, sizeof(endpoint)); + if (tmp && PS_ISSET(stream, MEDIA_HANDOVER)) { + /* out_lock remains locked */ + ilog(LOG_INFO, "Peer address changed to %s", addr); + unk = 1; + goto update_addr; + } + + mutex_unlock(&stream->out_lock); + + if (tmp && PS_ISSET(stream, STRICT_SOURCE)) { + atomic64_inc(&stream->stats.errors); + goto drop; + } + } + goto kernel_check; + } + + /* wait at least 3 seconds after last signal before committing to a particular + * endpoint address */ + if (!call->last_signal || poller_now <= call->last_signal + 3) + goto update_peerinfo; + + ilog(LOG_INFO, "Confirmed peer address as %s", addr); + + PS_SET(stream, CONFIRMED); + update = 1; + +update_peerinfo: + mutex_lock(&stream->out_lock); +update_addr: + endpoint = stream->endpoint; + stream->endpoint = *fsin; + if (memcmp(&endpoint, &stream->endpoint, sizeof(endpoint))) + update = 1; + mutex_unlock(&stream->out_lock); + + /* check the destination address of the received packet against what we think our + * local interface to use is */ +// loc_addr = g_atomic_pointer_get(&media->local_intf); +// if (dst && !sockaddr_eq(dst, &loc_addr->spec->address.addr)) { + // XXX restore this +// struct interface_address *ifa; +// ifa = get_interface_from_address(media->logical_intf, dst); +// if (!ifa) { +// ilog(LOG_ERROR, "No matching local interface for destination address %s found", +// smart_ntop_buf(dst)); +// goto drop; +// } +// if (g_atomic_pointer_compare_and_exchange(&media->local_address, loc_addr, ifa)) { +// ilog(LOG_INFO, "Switching local interface to %s", +// smart_ntop_buf(dst)); +// update = 1; +// } +// } + + +kernel_check: + if (PS_ISSET(stream, NO_KERNEL_SUPPORT)) + goto forward; + + if (PS_ISSET(stream, CONFIRMED) && sink && PS_ARESET2(sink, CONFIRMED, FILLED)) + kernelize(stream); + +forward: + if (sink) + mutex_lock(&sink->out_lock); + + if (!sink + || !sink->advertised_endpoint.port + || (is_addr_unspecified(&sink->advertised_endpoint.address) + && !is_trickle_ice_address(&sink->advertised_endpoint)) + || stun_ret || handler_ret < 0) + goto drop; + + ZERO(mh); + mh.msg_control = buf; + mh.msg_controllen = sizeof(buf); + + mutex_unlock(&sink->out_lock); + + stream_msg_mh_src(sink, &mh); + + ZERO(iov); + iov.iov_base = s->s; + iov.iov_len = s->len; + + mh.msg_iov = &iov; + mh.msg_iovlen = 1; + + ret = socket_sendmsg(&sink->selected_sfd->socket, &mh, &sink->endpoint); + + if (ret == -1) { + ret = -errno; + ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&cm->statsps.errors); + goto out; + } + + sink = NULL; + +drop: + if (sink) + mutex_unlock(&sink->out_lock); + ret = 0; + atomic64_inc(&stream->stats.packets); + atomic64_add(&stream->stats.bytes, s->len); + atomic64_set(&stream->last_packet, poller_now); + atomic64_inc(&cm->statsps.packets); + atomic64_add(&cm->statsps.bytes, s->len); + +out: + if (ret == 0 && update) + ret = 1; + +done: + if (unk) + __stream_unconfirm(stream); + mutex_unlock(&stream->in_lock); + if (unk) { + stream_unconfirm(stream->rtp_sink); + stream_unconfirm(stream->rtcp_sink); + } +unlock_out: + rwlock_unlock_r(&call->master_lock); + + return ret; +} + + + + +static void stream_fd_readable(int fd, void *p, uintptr_t u) { + struct stream_fd *sfd = p; + char buf[RTP_BUFFER_SIZE]; + int ret, iters; + struct sockaddr_in6 sin6_src; + int update = 0; + struct call *ca; + str s; + struct msghdr mh; + struct iovec iov; + char control[128]; + struct cmsghdr *cmh; + struct in6_pktinfo *pi6; + struct in6_addr /*dst_buf,*/ *dst; + //struct in_pktinfo *pi; + endpoint_t ep; + sockaddr_t sa; + + if (sfd->socket.fd != fd) + goto out; + + log_info_stream_fd(sfd); + + for (iters = 0; ; iters++) { +#if MAX_RECV_ITERS + if (iters >= MAX_RECV_ITERS) { + ilog(LOG_ERROR, "Too many packets in UDP receive queue (more than %d), " + "aborting loop. Dropped packets possible", iters); + break; + } +#endif + + ZERO(mh); + mh.msg_name = &sin6_src; + mh.msg_namelen = sizeof(sin6_src); + mh.msg_iov = &iov; + mh.msg_iovlen = 1; + mh.msg_control = control; + mh.msg_controllen = sizeof(control); + iov.iov_base = buf + RTP_BUFFER_HEAD_ROOM; + iov.iov_len = MAX_RTP_PACKET_SIZE; + + ret = recvmsg(fd, &mh, 0); + + if (ret < 0) { + if (errno == EINTR) + continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + stream_fd_closed(fd, sfd, 0); + goto done; + } + if (ret >= MAX_RTP_PACKET_SIZE) + ilog(LOG_WARNING, "UDP packet possibly truncated"); + + for (cmh = CMSG_FIRSTHDR(&mh); cmh; cmh = CMSG_NXTHDR(&mh, cmh)) { + if (cmh->cmsg_level == IPPROTO_IPV6 && cmh->cmsg_type == IPV6_PKTINFO) { + pi6 = (void *) CMSG_DATA(cmh); + dst = &pi6->ipi6_addr; + goto got_dst; + } + // XXX +// if (cmh->cmsg_level == IPPROTO_IP && cmh->cmsg_type == IP_PKTINFO) { +// pi = (void *) CMSG_DATA(cmh); +// in4_to_6(&dst_buf, pi->ipi_addr.s_addr); +// dst = &dst_buf; +// goto got_dst; +// } + } + + ilog(LOG_WARNING, "No pkt_info present in received UDP packet, cannot handle packet"); + goto done; + +got_dst: + str_init_len(&s, buf + RTP_BUFFER_HEAD_ROOM, ret); + // XXX + ep.port = ntohs(sin6_src.sin6_port); + ep.address.u.ipv6 = sin6_src.sin6_addr; + sa.u.ipv6 = *dst; + ret = stream_packet(sfd, &s, &ep, &sa); + if (ret < 0) { + ilog(LOG_WARNING, "Write error on RTP socket: %s", strerror(-ret)); + call_destroy(sfd->call); + goto done; + } + if (ret == 1) + update = 1; + } + +out: + ca = sfd->call ? : NULL; + + if (ca && update) + redis_update(ca, sfd->call->callmaster->conf.redis); +done: + log_info_clear(); +} + + + + +static void stream_fd_free(void *p) { + struct stream_fd *f = p; + + release_port(&f->socket, f->local_intf->spec); + crypto_cleanup(&f->crypto); + dtls_connection_cleanup(&f->dtls); + + obj_put(f->call); +} + +struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif) { + struct stream_fd *sfd; + struct poller_item pi; + struct poller *po = call->callmaster->poller; + + sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); + sfd->socket = *fd; + sfd->call = obj_get(call); + sfd->local_intf = lif; + call->stream_fds = g_slist_prepend(call->stream_fds, sfd); /* hand over ref */ + + ZERO(pi); + pi.fd = sfd->socket.fd; + pi.obj = &sfd->obj; + pi.readable = stream_fd_readable; + pi.closed = stream_fd_closed; + + poller_add_item(po, &pi); + + return sfd; +} diff --git a/daemon/media_socket.h b/daemon/media_socket.h index b781970d7..e427dc13c 100644 --- a/daemon/media_socket.h +++ b/daemon/media_socket.h @@ -49,6 +49,10 @@ struct local_intf { unsigned int preference; /* starting with 0 */ const struct logical_intf *logical; }; +struct intf_list { + const struct local_intf *local_intf; + GQueue list; +}; struct stream_fd { struct obj obj; socket_t socket; /* RO */ @@ -67,14 +71,24 @@ struct logical_intf *get_logical_interface(const str *name, sockfamily_t *fam); struct local_intf *get_interface_address(const struct logical_intf *lif, sockfamily_t *fam); struct local_intf *get_any_interface_address(const struct logical_intf *lif, sockfamily_t *fam); -int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c); -void release_port(socket_t *r, const struct local_intf *); -void set_tos(int fd, unsigned int tos); +//int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c); +//void release_port(socket_t *r, const struct local_intf *); +void set_tos(socket_t *, unsigned int tos); +int get_consecutive_ports(GQueue *out, unsigned int num_ports, const struct logical_intf *log); +struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif); + +void free_intf_list(struct intf_list *il); +void free_socket_intf_list(struct intf_list *il); INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_intf *lif) { return open_socket(r, SOCK_DGRAM, port, &lif->spec->address.addr); } +void kernelize(struct packet_stream *); +void __unkernelize(struct packet_stream *); +void unkernelize(struct packet_stream *); +void __stream_unconfirm(struct packet_stream *); + /* XXX shouldnt be necessary */ INLINE struct local_intf *get_interface_from_address(const struct logical_intf *lif, const sockaddr_t *addr, socktype_t *type) diff --git a/daemon/sdp.c b/daemon/sdp.c index 0b3a22e9c..cfcbda274 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -1400,7 +1400,7 @@ static int replace_media_port(struct sdp_chopper *chop, struct sdp_media *media, if (copy_up_to(chop, port)) return -1; - p = ps->sfd ? ps->sfd->socket.local.port : 0; + p = ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0; chopper_append_printf(chop, "%u", p); if (skip_over(chop, port)) @@ -1415,7 +1415,7 @@ static int replace_consecutive_port_count(struct sdp_chopper *chop, struct sdp_m int cons; struct packet_stream *ps_n; - if (media->port_count == 1 || !ps->sfd) + if (media->port_count == 1 || !ps->selected_sfd) return 0; for (cons = 1; cons < media->port_count; cons++) { @@ -1423,7 +1423,7 @@ static int replace_consecutive_port_count(struct sdp_chopper *chop, struct sdp_m if (!j) goto warn; ps_n = j->data; - if (ps_n->sfd->socket.local.port != ps->sfd->socket.local.port + cons * 2) { + if (ps_n->selected_sfd->socket.local.port != ps->selected_sfd->socket.local.port + cons * 2) { warn: ilog(LOG_WARN, "Failed to handle consecutive ports"); break; @@ -1435,18 +1435,18 @@ warn: return 0; } -static int insert_ice_address(struct sdp_chopper *chop, struct packet_stream *ps, struct local_intf *ifa) { +static int insert_ice_address(struct sdp_chopper *chop, struct packet_stream *ps, const struct local_intf *ifa) { char buf[64]; int len; call_stream_address46(buf, ps, SAF_ICE, &len, ifa); chopper_append_dup(chop, buf, len); - chopper_append_printf(chop, " %u", ps->sfd->socket.local.port); + chopper_append_printf(chop, " %u", ps->selected_sfd->socket.local.port); return 0; } -static int insert_raddr_rport(struct sdp_chopper *chop, struct packet_stream *ps, struct local_intf *ifa) { +static int insert_raddr_rport(struct sdp_chopper *chop, struct packet_stream *ps, const struct local_intf *ifa) { char buf[64]; int len; @@ -1454,7 +1454,7 @@ static int insert_raddr_rport(struct sdp_chopper *chop, struct packet_stream *ps call_stream_address46(buf, ps, SAF_ICE, &len, ifa); chopper_append_dup(chop, buf, len); chopper_append_c(chop, " rport "); - chopper_append_printf(chop, "%u", ps->sfd->socket.local.port); + chopper_append_printf(chop, "%u", ps->selected_sfd->socket.local.port); return 0; } @@ -1480,7 +1480,7 @@ static int replace_network_address(struct sdp_chopper *chop, struct network_addr if (!is_addr_unspecified(&flags->parsed_media_address)) len = sprintf(buf, "%s", sockaddr_print_buf(&flags->parsed_media_address)); else - call_stream_address(buf, ps, SAF_NG, &len); + call_stream_address46(buf, ps, SAF_NG, &len, NULL); chopper_append_dup(chop, buf, len); if (skip_over(chop, &address->address)) @@ -1682,7 +1682,7 @@ out: static void insert_candidate(struct sdp_chopper *chop, struct packet_stream *ps, unsigned int component, unsigned int type_pref, unsigned int local_pref, enum ice_candidate_type type, - struct local_intf *ifa) + const struct local_intf *ifa) { unsigned long priority; @@ -1703,7 +1703,7 @@ static void insert_candidates(struct sdp_chopper *chop, struct packet_stream *rt struct sdp_ng_flags *flags, struct sdp_media *sdp_media) { GList *l; - struct local_intf *ifa; + const struct local_intf *ifa; unsigned int pref; struct call_media *media; const struct logical_intf *lif; @@ -1728,7 +1728,7 @@ static void insert_candidates(struct sdp_chopper *chop, struct packet_stream *rt lif = ag ? ag->logical_intf : media->logical_intf; if (ag && AGENT_ISSET(ag, COMPLETED)) { - ifa = g_atomic_pointer_get(&media->local_intf); + ifa = rtp->selected_sfd->local_intf; insert_candidate(chop, rtp, 1, type_pref, ifa->preference, cand_type, ifa); if (rtcp) /* rtcp-mux only possible in answer */ insert_candidate(chop, rtcp, 2, type_pref, ifa->preference, cand_type, ifa); @@ -1940,7 +1940,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call_monologu assert(j->data == ps_rtcp); } - if (!sdp_media->port_num || !ps->sfd) + if (!sdp_media->port_num || !ps->selected_sfd) goto next; if (MEDIA_ARESET2(call_media, SEND, RECV)) @@ -1954,13 +1954,13 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call_monologu if (MEDIA_ISSET(call_media, RTCP_MUX) && flags->opmode == OP_ANSWER) { chopper_append_c(chop, "a=rtcp:"); - chopper_append_printf(chop, "%u", ps->sfd->socket.local.port); + chopper_append_printf(chop, "%u", ps->selected_sfd->socket.local.port); chopper_append_c(chop, "\r\na=rtcp-mux\r\n"); ps_rtcp = NULL; } else if (ps_rtcp && !flags->ice_force_relay) { chopper_append_c(chop, "a=rtcp:"); - chopper_append_printf(chop, "%u", ps_rtcp->sfd->socket.local.port); + chopper_append_printf(chop, "%u", ps_rtcp->selected_sfd->socket.local.port); if (!MEDIA_ISSET(call_media, RTCP_MUX)) chopper_append_c(chop, "\r\n"); else diff --git a/daemon/stun.c b/daemon/stun.c index ee441a6df..d7680635f 100644 --- a/daemon/stun.c +++ b/daemon/stun.c @@ -378,7 +378,7 @@ static void stun_error_len(struct packet_stream *ps, const endpoint_t *sin, cons fingerprint(&mh, &fp); output_finish_src(&mh, dst); - socket_sendmsg(&ps->sfd->socket, &mh, sin); + socket_sendmsg(&ps->selected_sfd->socket, &mh, sin); } #define stun_error(ps, sin, dst, req, code, reason) \ @@ -479,7 +479,7 @@ static int stun_binding_success(struct packet_stream *ps, struct header *req, st fingerprint(&mh, &fp); output_finish_src(&mh, dst); - socket_sendmsg(&ps->sfd->socket, &mh, sin); + socket_sendmsg(&ps->selected_sfd->socket, &mh, sin); return 0; }