diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 60b5042bf..2bd8ba2a3 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -838,7 +838,7 @@ static void stream_fd_closed(int fd, void *p, uintptr_t u) { /* returns: 0 = not a muxed stream, 1 = muxed, RTP, 2 = muxed, RTCP */ -static int rtcp_demux(str *s, struct call_media *media) { +static int rtcp_demux(const str *s, struct call_media *media) { if (!MEDIA_ISSET(media, RTCP_MUX)) return 0; return rtcp_demux_is_rtcp(s) ? 2 : 1; @@ -1167,223 +1167,207 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o } -/* XXX split this function into pieces */ -/* called lock-free */ -static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) { -/** - * Incoming packets: - * - sfd->socket.local: the local IP/port on which the packet arrived - * - sfd->stream->endpoint: adjusted/learned IP/port from where the packet - * was sent - * - sfd->stream->advertised_endpoint: the unadjusted IP/port from where the - * packet was sent. These are the values present in the SDP - * - * Outgoing packets: - * - sfd->stream->rtp_sink->endpoint: the destination IP/port - * - sfd->stream->selected_sfd->socket.local: the local source IP/port for the - * outgoing packet - * - * If the rtpengine runs behind a NAT and local addresses are configured with - * different advertised endpoints, the SDP would not contain the address from - * `...->socket.local`, but rather from `sfd->local_intf->spec->address.advertised` - * (of type `sockaddr_t`). The port will be the same. - */ -/* TODO move the above comments to the data structure definitions, if the above - * always holds true */ - struct packet_stream *stream, - *sink = NULL, - *in_srtp, *out_srtp; - struct call_media *media; - int ret = 0, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0, rtcp = 0, - unk = 0; - int i; - struct call *call; - /*unsigned char cc;*/ - struct endpoint endpoint; - rewrite_func rwf_in, rwf_out; - //struct local_intf *loc_addr; - struct rtp_header *rtp_h; - struct rtcp_packet *rtcp_h; - struct rtp_stats *rtp_s; - struct ssrc_ctx *ssrc_in = NULL, *ssrc_out = NULL; - - call = sfd->call; - - rwlock_lock_r(&call->master_lock); - - stream = sfd->stream; - if (!stream) - goto unlock_out; - __C_DBG("Try to Kernelizing media stream: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - - - media = stream->media; - - if (!stream->selected_sfd) - goto unlock_out; - - - /* demux other protocols running on this port */ +// returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed; +// 1 = same as 0, but stream can be kernelized +static int media_demux_protocols(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) { + struct packet_stream *stream = sfd->stream; + struct call_media *media = stream->media; if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) { mutex_lock(&stream->in_lock); - ret = dtls(stream, s, fsin); + int ret = dtls(stream, s, fsin); mutex_unlock(&stream->in_lock); if (!ret) - goto unlock_out; + return 0; } if (media->ice_agent && is_stun(s)) { - stun_ret = stun(s, sfd, fsin); + int stun_ret = stun(s, sfd, fsin); if (!stun_ret) - goto unlock_out; + return 0; if (stun_ret == 1) { call_media_state_machine(media); - mutex_lock(&stream->in_lock); /* for the jump */ - goto kernel_check; + return 1; } else /* not an stun packet */ - stun_ret = 0; + ; } + return -1; +} -#if RTP_LOOP_PROTECT - if (MEDIA_ISSET(media, LOOP_CHECK)) { - mutex_lock(&stream->in_lock); - for (i = 0; i < RTP_LOOP_PACKETS; i++) { - if (stream->lp_buf[i].len != s->len) - continue; - if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT))) - continue; - __C_DBG("packet dupe"); - if (stream->lp_count >= RTP_LOOP_MAX_COUNT) { - ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " - "to avoid potential loop", RTP_LOOP_MAX_COUNT); - goto done; - } +#if RTP_LOOP_PROTECT +// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet +static int media_loop_detect(struct packet_stream *stream, const str *s) { + mutex_lock(&stream->in_lock); - stream->lp_count++; - goto loop_ok; + for (int i = 0; i < RTP_LOOP_PACKETS; i++) { + if (stream->lp_buf[i].len != s->len) + continue; + if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT))) + continue; + + __C_DBG("packet dupe"); + if (stream->lp_count >= RTP_LOOP_MAX_COUNT) { + ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " + "to avoid potential loop", RTP_LOOP_MAX_COUNT); + mutex_unlock(&stream->in_lock); + return -1; } - /* not a dupe */ - stream->lp_count = 0; - stream->lp_buf[stream->lp_idx].len = s->len; - memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)); - stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS; -loop_ok: - mutex_unlock(&stream->in_lock); + stream->lp_count++; + goto loop_ok; } + + /* not a dupe */ + stream->lp_count = 0; + stream->lp_buf[stream->lp_idx].len = s->len; + memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)); + stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS; +loop_ok: + mutex_unlock(&stream->in_lock); + + return 0; +} #endif - /* demux RTCP */ - in_srtp = stream; - sink = stream->rtp_sink; - if (!sink && PS_ISSET(stream, RTCP)) { - sink = stream->rtcp_sink; +// returns: true/false (is RTCP or not) +// in_srtp and out_srtp are set to point to the SRTP contexts to use +// sink_p is set to where to forward the packet to +static int media_packet_rtcp_demux(const str *s, struct packet_stream *stream, + struct packet_stream **in_srtp_p, struct packet_stream **out_srtp_p, + struct packet_stream **sink_p) +{ + struct call_media *media = stream->media; + int rtcp = 0; + + *in_srtp_p = stream; + *sink_p = stream->rtp_sink; + if (!*sink_p && PS_ISSET(stream, RTCP)) { + *sink_p = stream->rtcp_sink; rtcp = 1; } else if (stream->rtcp_sink) { - muxed_rtcp = rtcp_demux(s, media); + int muxed_rtcp = rtcp_demux(s, media); if (muxed_rtcp == 2) { - sink = stream->rtcp_sink; + *sink_p = stream->rtcp_sink; rtcp = 1; - in_srtp = stream->rtcp_sibling; + *in_srtp_p = stream->rtcp_sibling; // use RTCP SRTP context } } - out_srtp = sink; - if (rtcp && sink && sink->rtcp_sibling) - out_srtp = sink->rtcp_sibling; - + *out_srtp_p = *sink_p; + if (rtcp && *sink_p && (*sink_p)->rtcp_sibling) + *out_srtp_p = (*sink_p)->rtcp_sibling; // use RTCP SRTP context - /* RTP/RTCP specifics */ + return rtcp; +} - if (G_LIKELY(media->protocol && media->protocol->rtp)) { - if (G_LIKELY(!rtcp && !rtp_payload(&rtp_h, NULL, s))) { - if (G_LIKELY(out_srtp != NULL)) - __stream_ssrc(in_srtp, out_srtp, rtp_h->ssrc, &ssrc_in, &ssrc_out, call->ssrc_hash); - // check the payload type - i = (rtp_h->m_pt & 0x7f); - if (G_LIKELY(ssrc_in)) - ssrc_in->parent->payload_type = i; +static void media_packet_rtp(const str *s, struct packet_stream *stream, struct packet_stream *in_srtp, + struct packet_stream *out_srtp, int rtcp, + struct ssrc_ctx **ssrc_in_p, struct ssrc_ctx **ssrc_out_p) +{ + struct call_media *media = stream->media; + struct call *call = media->call; + struct rtp_header *rtp_h; + struct rtcp_packet *rtcp_h; - // XXX convert to array? or keep last pointer? - rtp_s = g_hash_table_lookup(stream->rtp_stats, &i); - if (!rtp_s) { - ilog(LOG_WARNING | LOG_FLAG_LIMIT, - "RTP packet with unknown payload type %u received", i); - atomic64_inc(&stream->stats.errors); - atomic64_inc(&rtpe_statsps.errors); - } + if (G_UNLIKELY(!media->protocol)) + return; + if (G_UNLIKELY(!media->protocol->rtp)) + return; - else { - atomic64_inc(&rtp_s->packets); - atomic64_add(&rtp_s->bytes, s->len); - } + if (G_LIKELY(!rtcp && !rtp_payload(&rtp_h, NULL, s))) { + if (G_LIKELY(out_srtp != NULL)) + __stream_ssrc(in_srtp, out_srtp, rtp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash); + + // check the payload type + int i = (rtp_h->m_pt & 0x7f); + if (G_LIKELY(*ssrc_in_p)) + (*ssrc_in_p)->parent->payload_type = i; + + // XXX convert to array? or keep last pointer? + struct rtp_stats *rtp_s = g_hash_table_lookup(stream->rtp_stats, &i); + if (!rtp_s) { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, + "RTP packet with unknown payload type %u received", i); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&rtpe_statsps.errors); } - else if (rtcp && !rtcp_payload(&rtcp_h, NULL, s)) { - if (G_LIKELY(out_srtp != NULL)) - __stream_ssrc(in_srtp, out_srtp, rtcp_h->ssrc, &ssrc_in, &ssrc_out, call->ssrc_hash); + + else { + atomic64_inc(&rtp_s->packets); + atomic64_add(&rtp_s->bytes, s->len); } } - - /* do we have somewhere to forward it to? */ - - if (G_UNLIKELY(!sink || !sink->selected_sfd || !out_srtp || !out_srtp->selected_sfd || !in_srtp->selected_sfd)) { - ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(fsin)); - atomic64_inc(&stream->stats.errors); - atomic64_inc(&rtpe_statsps.errors); - goto unlock_out; + else if (rtcp && !rtcp_payload(&rtcp_h, NULL, s)) { + if (G_LIKELY(out_srtp != NULL)) + __stream_ssrc(in_srtp, out_srtp, rtcp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash); } +} - /* transcoding stuff, in and out */ +static int media_packet_decrypt(str *s, int rtcp, struct packet_stream *in_srtp, struct packet_stream *sink, + struct stream_fd *sfd, const endpoint_t *fsin, const struct timeval *tv, + struct ssrc_ctx *ssrc_in, rewrite_func *rwf_out_p) +{ + rewrite_func rwf_in; mutex_lock(&in_srtp->in_lock); - determine_handler(in_srtp, sink); + // XXX use an array with index instead of if/else if (G_LIKELY(!rtcp)) { rwf_in = in_srtp->handler->in->rtp; - rwf_out = in_srtp->handler->out->rtp; + *rwf_out_p = in_srtp->handler->out->rtp; } else { rwf_in = in_srtp->handler->in->rtcp; - rwf_out = in_srtp->handler->out->rtcp; + *rwf_out_p = in_srtp->handler->out->rtcp; } - mutex_lock(&out_srtp->out_lock); - /* return values are: 0 = forward packet, -1 = error/dont forward, * 1 = forward and push update to redis */ - if (rwf_in) { - handler_ret = rwf_in(s, in_srtp, sfd, fsin, tv, ssrc_in); - } + int ret = 0; + if (rwf_in) + ret = rwf_in(s, in_srtp, sfd, fsin, tv, ssrc_in); - // If recording pcap dumper is set, then we record the call. - if (call->recording) { - dump_packet(call->recording, stream, s); - } + mutex_unlock(&in_srtp->in_lock); - if (G_LIKELY(handler_ret >= 0)) { - if (rwf_out) - handler_ret += rwf_out(s, out_srtp, NULL, NULL, NULL, ssrc_out); - } + return ret; +} - if (handler_ret > 0) { - __unkernelize(stream); - update = 1; - } +static int media_packet_encrypt(str *s, int rtcp, struct packet_stream *out_srtp, struct packet_stream *sink, + struct ssrc_ctx *ssrc_out, rewrite_func rwf_out) +{ + if (!rwf_out) + return 0; + + mutex_lock(&out_srtp->out_lock); + + int ret = 0; + if (rwf_out) + ret = rwf_out(s, out_srtp, NULL, NULL, NULL, ssrc_out); mutex_unlock(&out_srtp->out_lock); - mutex_unlock(&in_srtp->in_lock); + + return ret; +} + - /* endpoint address handling */ +// returns: 0 = OK, forward packet; -1 = drop packet; 1 = OK, forward, but also update info +// 2 = OK, we are ready to kernelize; 3 = same as 1, but also update info +static int media_packet_address_check(struct packet_stream *stream, struct packet_stream *sink, + struct stream_fd *sfd, const endpoint_t *fsin) +{ + struct call_media *media = stream->media; + struct call *call = stream->call; + struct endpoint endpoint; + int unk = 0, ret; mutex_lock(&stream->in_lock); @@ -1392,7 +1376,8 @@ loop_ok: /* if the other side hasn't been signalled yet, just forward the packet */ if (!PS_ISSET(stream, FILLED)) { __C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - goto forward; + ret = 0; + goto out; } /* do not pay attention to source addresses of incoming packets for asymmetric streams */ @@ -1416,6 +1401,8 @@ loop_ok: /* out_lock remains locked */ ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(fsin)); unk = 1; + ret = 1; + stream->endpoint = *fsin; goto update_addr; } @@ -1426,29 +1413,33 @@ loop_ok: sockaddr_print_buf(&endpoint.address), endpoint.port, sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); atomic64_inc(&stream->stats.errors); - goto drop; + ret = -1; + goto out; } } - goto kernel_check; + ret = 1; + goto out; } /* wait at least 3 seconds after last signal before committing to a particular * endpoint address */ + ret = 0; if (!call->last_signal || rtpe_now.tv_sec <= call->last_signal + 3) goto update_peerinfo; + ret = 2; + ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(fsin)); PS_SET(stream, CONFIRMED); - update = 1; update_peerinfo: mutex_lock(&stream->out_lock); -update_addr: endpoint = stream->endpoint; stream->endpoint = *fsin; if (memcmp(&endpoint, &stream->endpoint, sizeof(endpoint))) - update = 1; + ret |= 1; // either 0 or 2 -> makes it 1 or 3 +update_addr: mutex_unlock(&stream->out_lock); /* check the destination address of the received packet against what we think our @@ -1456,50 +1447,168 @@ update_addr: if (stream->selected_sfd && sfd != stream->selected_sfd) { ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&sfd->socket.local)); stream->selected_sfd = sfd; - update = 1; + ret |= 1; // 0 or 2 -> 1 or 3 } +out: + if (unk) + __stream_unconfirm(stream); + + mutex_unlock(&stream->in_lock); -kernel_check: + if (unk) { + stream_unconfirm(stream->rtp_sink); + stream_unconfirm(stream->rtcp_sink); + } + + return ret; +} + + +static void media_packet_kernel_check(struct packet_stream *stream, struct packet_stream *sink) { if (PS_ISSET(stream, NO_KERNEL_SUPPORT)) { __C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - goto forward; + return; } if (!PS_ISSET(stream, CONFIRMED)) { __C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - goto forward; + return; } if (!sink) { __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - goto forward; + return; } if (!PS_ISSET(sink, CONFIRMED)) { __C_DBG("sink not CONFIRMED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - goto forward; + return; } if (!PS_ISSET(sink, FILLED)) { __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - goto forward; + return; } kernelize(stream); +} -forward: - if (sink) - mutex_lock(&sink->out_lock); - if (!sink - || !sink->advertised_endpoint.port +/* called lock-free */ +static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) { +/** + * Incoming packets: + * - sfd->socket.local: the local IP/port on which the packet arrived + * - sfd->stream->endpoint: adjusted/learned IP/port from where the packet + * was sent + * - sfd->stream->advertised_endpoint: the unadjusted IP/port from where the + * packet was sent. These are the values present in the SDP + * + * Outgoing packets: + * - sfd->stream->rtp_sink->endpoint: the destination IP/port + * - sfd->stream->selected_sfd->socket.local: the local source IP/port for the + * outgoing packet + * + * If the rtpengine runs behind a NAT and local addresses are configured with + * different advertised endpoints, the SDP would not contain the address from + * `...->socket.local`, but rather from `sfd->local_intf->spec->address.advertised` + * (of type `sockaddr_t`). The port will be the same. + */ +/* TODO move the above comments to the data structure definitions, if the above + * always holds true */ + // XXX collect all these vars in a struct to pass around + struct packet_stream *stream, + *sink = NULL, + *in_srtp, *out_srtp; + struct call_media *media; + int ret = 0, update = 0, handler_ret = 0, rtcp = 0; + struct call *call; + rewrite_func rwf_out; + struct ssrc_ctx *ssrc_in = NULL, *ssrc_out = NULL; + + call = sfd->call; + + rwlock_lock_r(&call->master_lock); + + stream = sfd->stream; + if (G_UNLIKELY(!stream)) + goto unlock_out; + __C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); + + + media = stream->media; + + if (!stream->selected_sfd) + goto unlock_out; + + + int stun_ret = media_demux_protocols(sfd, s, fsin); + if (stun_ret == 0) // packet processed + goto unlock_out; + if (stun_ret == 1) { + media_packet_kernel_check(stream, sink); + goto drop; + } + + +#if RTP_LOOP_PROTECT + if (MEDIA_ISSET(media, LOOP_CHECK)) { + if (media_loop_detect(stream, s)) + goto unlock_out; + } +#endif + + + // this sets in_srtp, out_srtp, and sink + rtcp = media_packet_rtcp_demux(s, stream, &in_srtp, &out_srtp, &sink); + + // this set ssrc_in and ssrc_out + media_packet_rtp(s, stream, in_srtp, out_srtp, rtcp, &ssrc_in, &ssrc_out); + + + /* do we have somewhere to forward it to? */ + + if (G_UNLIKELY(!sink || !sink->selected_sfd || !out_srtp || !out_srtp->selected_sfd || !in_srtp->selected_sfd)) { + ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(fsin)); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&rtpe_statsps.errors); + goto unlock_out; + } + + + handler_ret = media_packet_decrypt(s, rtcp, in_srtp, sink, sfd, fsin, tv, ssrc_in, &rwf_out); + + // If recording pcap dumper is set, then we record the call. + if (call->recording) + dump_packet(call->recording, stream, s); + + if (G_LIKELY(handler_ret >= 0)) + handler_ret = media_packet_encrypt(s, rtcp, out_srtp, in_srtp, ssrc_out, rwf_out); + + if (handler_ret > 0) { + unkernelize(stream); + update = 1; + } + + + int address_check = media_packet_address_check(stream, sink, sfd, fsin); + if (address_check == -1) + goto drop; + if ((address_check & 1)) + update = 1; + if ((address_check & 2)) + media_packet_kernel_check(stream, sink); + + + mutex_lock(&sink->out_lock); + + if (!sink->advertised_endpoint.port || (is_addr_unspecified(&sink->advertised_endpoint.address) && !is_trickle_ice_address(&sink->advertised_endpoint)) - || stun_ret || handler_ret < 0) + || handler_ret < 0) goto drop; - // s is my packet? ret = socket_sendto(&sink->selected_sfd->socket, s->s, s->len, &sink->endpoint); __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), sink->endpoint.port); @@ -1529,16 +1638,6 @@ out: if (ret == 0 && update) ret = 1; -#if RTP_LOOP_PROTECT -done: -#endif - if (unk) - __stream_unconfirm(stream); - mutex_unlock(&stream->in_lock); - if (unk) { - stream_unconfirm(stream->rtp_sink); - stream_unconfirm(stream->rtcp_sink); - } unlock_out: rwlock_unlock_r(&call->master_lock); diff --git a/daemon/stun.c b/daemon/stun.c index dfecde7cd..77e09c08e 100644 --- a/daemon/stun.c +++ b/daemon/stun.c @@ -409,7 +409,7 @@ static void stun_error_len(struct stream_fd *sfd, const endpoint_t *sin, -static int check_fingerprint(str *msg, struct stun_attrs *attrs) { +static int check_fingerprint(const str *msg, struct stun_attrs *attrs) { int len; u_int32_t crc; @@ -422,7 +422,7 @@ static int check_fingerprint(str *msg, struct stun_attrs *attrs) { return 0; } -static int check_auth(str *msg, struct stun_attrs *attrs, struct call_media *media, int dst, int src) { +static int check_auth(const str *msg, struct stun_attrs *attrs, struct call_media *media, int dst, int src) { u_int16_t lenX; char digest[20]; str ufrag[2]; @@ -553,7 +553,7 @@ static int __stun_error(struct stream_fd *sfd, const endpoint_t *sin, * * call is locked in R */ -int stun(str *b, struct stream_fd *sfd, const endpoint_t *sin) { +int stun(const str *b, struct stream_fd *sfd, const endpoint_t *sin) { struct header *req = (void *) b->s; int msglen, method, class; str attr_str; diff --git a/daemon/stun.h b/daemon/stun.h index 640d1392c..08488a08b 100644 --- a/daemon/stun.h +++ b/daemon/stun.h @@ -49,7 +49,7 @@ INLINE int is_stun(const str *s) { } -int stun(str *, struct stream_fd *, const endpoint_t *); +int stun(const str *, struct stream_fd *, const endpoint_t *); int stun_binding_request(const endpoint_t *dst, u_int32_t transaction[3], str *pwd, str ufrags[2], int controlling, u_int64_t tiebreaker, u_int32_t priority,