From fb729e3d124139136f59a7a5911d8c60383cfae6 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 19 Jan 2018 09:48:20 -0500 Subject: [PATCH] TT#30405 create a packet handling context struct for convenience Change-Id: I0f3e0f66bf138147f265f7ee8d95028d9301359b --- daemon/media_socket.c | 450 +++++++++++++++++++++--------------------- 1 file changed, 223 insertions(+), 227 deletions(-) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 2bd8ba2a3..ef57a2820 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -55,6 +55,27 @@ struct intf_rr { GQueue logical_intfs; struct logical_intf *singular; // set iff only one is present in the list - no lock needed }; +struct packet_handler_ctx { + // inputs: + str s; // raw input packet + endpoint_t fsin; // source address of received packet + struct timeval tv; // timestamp when packet was received + struct stream_fd *sfd; // fd which received the packet + + struct call *call; // sfd->call + struct packet_stream *stream; // sfd->stream + struct call_media *media; // stream->media + struct packet_stream *sink; // where to send output packets to (forward destination) + rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt + struct packet_stream *in_srtp, *out_srtp; // SRTP contexts for decrypt/encrypt (relevant for muxed RTCP) + struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp + int rtcp; // true if this is an RTCP packet + + // verdicts: + int update; // true if Redis info needs to be updated + int unkernelize; // true if stream ought to be removed from kernel + int kernelize; // true if stream can be kernelized +}; static void determine_handler(struct packet_stream *in, const struct packet_stream *out); @@ -1169,24 +1190,21 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o // returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed; // 1 = same as 0, but stream can be kernelized -static int media_demux_protocols(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) { - struct packet_stream *stream = sfd->stream; - struct call_media *media = stream->media; - - if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) { - mutex_lock(&stream->in_lock); - int ret = dtls(stream, s, fsin); - mutex_unlock(&stream->in_lock); +static int media_demux_protocols(struct packet_handler_ctx *phc) { + if (MEDIA_ISSET(phc->media, DTLS) && is_dtls(&phc->s)) { + mutex_lock(&phc->stream->in_lock); + int ret = dtls(phc->stream, &phc->s, &phc->fsin); + mutex_unlock(&phc->stream->in_lock); if (!ret) return 0; } - if (media->ice_agent && is_stun(s)) { - int stun_ret = stun(s, sfd, fsin); + if (phc->media->ice_agent && is_stun(&phc->s)) { + int stun_ret = stun(&phc->s, phc->sfd, &phc->fsin); if (!stun_ret) return 0; if (stun_ret == 1) { - call_media_state_machine(media); + call_media_state_machine(phc->media); return 1; } else /* not an stun packet */ @@ -1199,34 +1217,34 @@ static int media_demux_protocols(struct stream_fd *sfd, const str *s, const endp #if RTP_LOOP_PROTECT // returns: 0 = ok, proceed; -1 = duplicate detected, drop packet -static int media_loop_detect(struct packet_stream *stream, const str *s) { - mutex_lock(&stream->in_lock); +static int media_loop_detect(struct packet_handler_ctx *phc) { + mutex_lock(&phc->stream->in_lock); for (int i = 0; i < RTP_LOOP_PACKETS; i++) { - if (stream->lp_buf[i].len != s->len) + if (phc->stream->lp_buf[i].len != phc->s.len) continue; - if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT))) + if (memcmp(phc->stream->lp_buf[i].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT))) continue; __C_DBG("packet dupe"); - if (stream->lp_count >= RTP_LOOP_MAX_COUNT) { + if (phc->stream->lp_count >= RTP_LOOP_MAX_COUNT) { ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " "to avoid potential loop", RTP_LOOP_MAX_COUNT); - mutex_unlock(&stream->in_lock); + mutex_unlock(&phc->stream->in_lock); return -1; } - stream->lp_count++; + phc->stream->lp_count++; goto loop_ok; } /* not a dupe */ - stream->lp_count = 0; - stream->lp_buf[stream->lp_idx].len = s->len; - memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)); - stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS; + phc->stream->lp_count = 0; + phc->stream->lp_buf[phc->stream->lp_idx].len = phc->s.len; + memcpy(phc->stream->lp_buf[phc->stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)); + phc->stream->lp_idx = (phc->stream->lp_idx + 1) % RTP_LOOP_PACKETS; loop_ok: - mutex_unlock(&stream->in_lock); + mutex_unlock(&phc->stream->in_lock); return 0; } @@ -1234,269 +1252,253 @@ loop_ok: -// returns: true/false (is RTCP or not) // in_srtp and out_srtp are set to point to the SRTP contexts to use -// sink_p is set to where to forward the packet to -static int media_packet_rtcp_demux(const str *s, struct packet_stream *stream, - struct packet_stream **in_srtp_p, struct packet_stream **out_srtp_p, - struct packet_stream **sink_p) +// sink is set to where to forward the packet to +static void media_packet_rtcp_demux(struct packet_handler_ctx *phc) { - struct call_media *media = stream->media; - int rtcp = 0; - - *in_srtp_p = stream; - *sink_p = stream->rtp_sink; - if (!*sink_p && PS_ISSET(stream, RTCP)) { - *sink_p = stream->rtcp_sink; - rtcp = 1; + phc->in_srtp = phc->stream; + phc->sink = phc->stream->rtp_sink; + if (!phc->sink && PS_ISSET(phc->stream, RTCP)) { + phc->sink = phc->stream->rtcp_sink; + phc->rtcp = 1; } - else if (stream->rtcp_sink) { - int muxed_rtcp = rtcp_demux(s, media); + else if (phc->stream->rtcp_sink) { + int muxed_rtcp = rtcp_demux(&phc->s, phc->media); if (muxed_rtcp == 2) { - *sink_p = stream->rtcp_sink; - rtcp = 1; - *in_srtp_p = stream->rtcp_sibling; // use RTCP SRTP context + phc->sink = phc->stream->rtcp_sink; + phc->rtcp = 1; + phc->in_srtp = phc->stream->rtcp_sibling; // use RTCP SRTP context } } - *out_srtp_p = *sink_p; - if (rtcp && *sink_p && (*sink_p)->rtcp_sibling) - *out_srtp_p = (*sink_p)->rtcp_sibling; // use RTCP SRTP context - - return rtcp; + phc->out_srtp = phc->sink; + if (phc->rtcp && phc->sink && phc->sink->rtcp_sibling) + phc->out_srtp = phc->sink->rtcp_sibling; // use RTCP SRTP context } -static void media_packet_rtp(const str *s, struct packet_stream *stream, struct packet_stream *in_srtp, - struct packet_stream *out_srtp, int rtcp, - struct ssrc_ctx **ssrc_in_p, struct ssrc_ctx **ssrc_out_p) +static void media_packet_rtp(struct packet_handler_ctx *phc) { - struct call_media *media = stream->media; - struct call *call = media->call; struct rtp_header *rtp_h; struct rtcp_packet *rtcp_h; - if (G_UNLIKELY(!media->protocol)) + if (G_UNLIKELY(!phc->media->protocol)) return; - if (G_UNLIKELY(!media->protocol->rtp)) + if (G_UNLIKELY(!phc->media->protocol->rtp)) return; - if (G_LIKELY(!rtcp && !rtp_payload(&rtp_h, NULL, s))) { - if (G_LIKELY(out_srtp != NULL)) - __stream_ssrc(in_srtp, out_srtp, rtp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash); + if (G_LIKELY(!phc->rtcp && !rtp_payload(&rtp_h, NULL, &phc->s))) { + if (G_LIKELY(phc->out_srtp != NULL)) + __stream_ssrc(phc->in_srtp, phc->out_srtp, rtp_h->ssrc, &phc->ssrc_in, + &phc->ssrc_out, phc->call->ssrc_hash); // check the payload type int i = (rtp_h->m_pt & 0x7f); - if (G_LIKELY(*ssrc_in_p)) - (*ssrc_in_p)->parent->payload_type = i; + if (G_LIKELY(phc->ssrc_in)) + phc->ssrc_in->parent->payload_type = i; // XXX convert to array? or keep last pointer? - struct rtp_stats *rtp_s = g_hash_table_lookup(stream->rtp_stats, &i); + struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &i); if (!rtp_s) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, "RTP packet with unknown payload type %u received", i); - atomic64_inc(&stream->stats.errors); + atomic64_inc(&phc->stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); } else { atomic64_inc(&rtp_s->packets); - atomic64_add(&rtp_s->bytes, s->len); + atomic64_add(&rtp_s->bytes, phc->s.len); } } - else if (rtcp && !rtcp_payload(&rtcp_h, NULL, s)) { - if (G_LIKELY(out_srtp != NULL)) - __stream_ssrc(in_srtp, out_srtp, rtcp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash); + else if (phc->rtcp && !rtcp_payload(&rtcp_h, NULL, &phc->s)) { + if (G_LIKELY(phc->out_srtp != NULL)) + __stream_ssrc(phc->in_srtp, phc->out_srtp, rtcp_h->ssrc, &phc->ssrc_in, + &phc->ssrc_out, phc->call->ssrc_hash); } } -static int media_packet_decrypt(str *s, int rtcp, struct packet_stream *in_srtp, struct packet_stream *sink, - struct stream_fd *sfd, const endpoint_t *fsin, const struct timeval *tv, - struct ssrc_ctx *ssrc_in, rewrite_func *rwf_out_p) +static int media_packet_decrypt(struct packet_handler_ctx *phc) { - rewrite_func rwf_in; - - mutex_lock(&in_srtp->in_lock); - determine_handler(in_srtp, sink); + mutex_lock(&phc->in_srtp->in_lock); + determine_handler(phc->in_srtp, phc->sink); // XXX use an array with index instead of if/else - if (G_LIKELY(!rtcp)) { - rwf_in = in_srtp->handler->in->rtp; - *rwf_out_p = in_srtp->handler->out->rtp; + if (G_LIKELY(!phc->rtcp)) { + phc->decrypt_func = phc->in_srtp->handler->in->rtp; + phc->encrypt_func = phc->in_srtp->handler->out->rtp; } else { - rwf_in = in_srtp->handler->in->rtcp; - *rwf_out_p = in_srtp->handler->out->rtcp; + phc->decrypt_func = phc->in_srtp->handler->in->rtcp; + phc->encrypt_func = phc->in_srtp->handler->out->rtcp; } /* return values are: 0 = forward packet, -1 = error/dont forward, * 1 = forward and push update to redis */ int ret = 0; - if (rwf_in) - ret = rwf_in(s, in_srtp, sfd, fsin, tv, ssrc_in); + if (phc->decrypt_func) + ret = phc->decrypt_func(&phc->s, phc->in_srtp, phc->sfd, &phc->fsin, &phc->tv, phc->ssrc_in); - mutex_unlock(&in_srtp->in_lock); + mutex_unlock(&phc->in_srtp->in_lock); + if (ret == 1) { + phc->update = 1; + ret = 0; + } return ret; } -static int media_packet_encrypt(str *s, int rtcp, struct packet_stream *out_srtp, struct packet_stream *sink, - struct ssrc_ctx *ssrc_out, rewrite_func rwf_out) +static int media_packet_encrypt(struct packet_handler_ctx *phc) { - if (!rwf_out) + if (!phc->encrypt_func) return 0; - mutex_lock(&out_srtp->out_lock); + mutex_lock(&phc->out_srtp->out_lock); - int ret = 0; - if (rwf_out) - ret = rwf_out(s, out_srtp, NULL, NULL, NULL, ssrc_out); + int ret = phc->encrypt_func(&phc->s, phc->out_srtp, NULL, NULL, NULL, phc->ssrc_out); - mutex_unlock(&out_srtp->out_lock); + mutex_unlock(&phc->out_srtp->out_lock); + if (ret == 1) { + phc->update = 1; + ret = 0; + } return ret; } -// returns: 0 = OK, forward packet; -1 = drop packet; 1 = OK, forward, but also update info -// 2 = OK, we are ready to kernelize; 3 = same as 1, but also update info -static int media_packet_address_check(struct packet_stream *stream, struct packet_stream *sink, - struct stream_fd *sfd, const endpoint_t *fsin) +// returns: 0 = OK, forward packet; -1 = drop packet +static int media_packet_address_check(struct packet_handler_ctx *phc) { - struct call_media *media = stream->media; - struct call *call = stream->call; struct endpoint endpoint; - int unk = 0, ret; + int ret = 0; - mutex_lock(&stream->in_lock); + mutex_lock(&phc->stream->in_lock); /* we're OK to (potentially) use the source address of this packet as destination * in the other direction. */ /* if the other side hasn't been signalled yet, just forward the packet */ - if (!PS_ISSET(stream, FILLED)) { - __C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - ret = 0; + if (!PS_ISSET(phc->stream, FILLED)) { + __C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); goto out; } /* do not pay attention to source addresses of incoming packets for asymmetric streams */ - if (MEDIA_ISSET(media, ASYMMETRIC)) - PS_SET(stream, CONFIRMED); + if (MEDIA_ISSET(phc->media, ASYMMETRIC)) + PS_SET(phc->stream, CONFIRMED); /* confirm sink for unidirectional streams in order to kernelize */ - if (MEDIA_ISSET(media, UNIDIRECTIONAL)) { - PS_SET(sink, CONFIRMED); - } + if (MEDIA_ISSET(phc->media, UNIDIRECTIONAL)) + PS_SET(phc->sink, CONFIRMED); /* if we have already updated the endpoint in the past ... */ - if (PS_ISSET(stream, CONFIRMED)) { + if (PS_ISSET(phc->stream, CONFIRMED)) { /* see if we need to compare the source address with the known endpoint */ - if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { - endpoint = *fsin; - mutex_lock(&stream->out_lock); + if (PS_ISSET2(phc->stream, STRICT_SOURCE, MEDIA_HANDOVER)) { + endpoint = phc->fsin; + mutex_lock(&phc->stream->out_lock); - int tmp = memcmp(&endpoint, &stream->endpoint, sizeof(endpoint)); - if (tmp && PS_ISSET(stream, MEDIA_HANDOVER)) { + int tmp = memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint)); + if (tmp && PS_ISSET(phc->stream, MEDIA_HANDOVER)) { /* out_lock remains locked */ - ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(fsin)); - unk = 1; - ret = 1; - stream->endpoint = *fsin; + ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(&phc->fsin)); + phc->unkernelize = 1; + phc->update = 1; + phc->stream->endpoint = phc->fsin; goto update_addr; } - mutex_unlock(&stream->out_lock); + mutex_unlock(&phc->stream->out_lock); - if (tmp && PS_ISSET(stream, STRICT_SOURCE)) { + if (tmp && PS_ISSET(phc->stream, STRICT_SOURCE)) { ilog(LOG_INFO, "Drop due to strict-source attribute; got %s:%d, expected %s:%d", sockaddr_print_buf(&endpoint.address), endpoint.port, - sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); - atomic64_inc(&stream->stats.errors); + sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); + atomic64_inc(&phc->stream->stats.errors); ret = -1; goto out; } } - ret = 1; + phc->kernelize = 1; goto out; } /* wait at least 3 seconds after last signal before committing to a particular * endpoint address */ - ret = 0; - if (!call->last_signal || rtpe_now.tv_sec <= call->last_signal + 3) + if (!phc->call->last_signal || rtpe_now.tv_sec <= phc->call->last_signal + 3) goto update_peerinfo; - ret = 2; + phc->kernelize = 1; + phc->update = 1; - ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(fsin)); + ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(&phc->fsin)); - PS_SET(stream, CONFIRMED); + PS_SET(phc->stream, CONFIRMED); update_peerinfo: - mutex_lock(&stream->out_lock); - endpoint = stream->endpoint; - stream->endpoint = *fsin; - if (memcmp(&endpoint, &stream->endpoint, sizeof(endpoint))) - ret |= 1; // either 0 or 2 -> makes it 1 or 3 + mutex_lock(&phc->stream->out_lock); + endpoint = phc->stream->endpoint; + phc->stream->endpoint = phc->fsin; + if (memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint))) + phc->update = 1; update_addr: - mutex_unlock(&stream->out_lock); + mutex_unlock(&phc->stream->out_lock); /* check the destination address of the received packet against what we think our * local interface to use is */ - if (stream->selected_sfd && sfd != stream->selected_sfd) { - ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&sfd->socket.local)); - stream->selected_sfd = sfd; - ret |= 1; // 0 or 2 -> 1 or 3 + if (phc->stream->selected_sfd && phc->sfd != phc->stream->selected_sfd) { + ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&phc->sfd->socket.local)); + phc->stream->selected_sfd = phc->sfd; + phc->update = 1; } out: - if (unk) - __stream_unconfirm(stream); - - mutex_unlock(&stream->in_lock); - - if (unk) { - stream_unconfirm(stream->rtp_sink); - stream_unconfirm(stream->rtcp_sink); - } + mutex_unlock(&phc->stream->in_lock); return ret; } -static void media_packet_kernel_check(struct packet_stream *stream, struct packet_stream *sink) { - if (PS_ISSET(stream, NO_KERNEL_SUPPORT)) { - __C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); +static void media_packet_kernel_check(struct packet_handler_ctx *phc) { + if (PS_ISSET(phc->stream, NO_KERNEL_SUPPORT)) { + __C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&phc->stream->endpoint.address), phc->stream->endpoint.port); return; } - if (!PS_ISSET(stream, CONFIRMED)) { - __C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); + if (!PS_ISSET(phc->stream, CONFIRMED)) { + __C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); return; } - if (!sink) { - __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); + if (!phc->sink) { + __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); return; } - if (!PS_ISSET(sink, CONFIRMED)) { - __C_DBG("sink not CONFIRMED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); + if (!PS_ISSET(phc->sink, CONFIRMED)) { + __C_DBG("sink not CONFIRMED for stream %s:%d", + sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); return; } - if (!PS_ISSET(sink, FILLED)) { - __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); + if (!PS_ISSET(phc->sink, FILLED)) { + __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); return; } - kernelize(stream); + kernelize(phc->stream); } /* called lock-free */ -static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) { +static int stream_packet(struct packet_handler_ctx *phc) { /** * Incoming packets: * - sfd->socket.local: the local IP/port on which the packet arrived @@ -1517,129 +1519,123 @@ static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, */ /* TODO move the above comments to the data structure definitions, if the above * always holds true */ - // XXX collect all these vars in a struct to pass around - struct packet_stream *stream, - *sink = NULL, - *in_srtp, *out_srtp; - struct call_media *media; - int ret = 0, update = 0, handler_ret = 0, rtcp = 0; - struct call *call; - rewrite_func rwf_out; - struct ssrc_ctx *ssrc_in = NULL, *ssrc_out = NULL; + int ret = 0, handler_ret = 0; - call = sfd->call; + phc->call = phc->sfd->call; - rwlock_lock_r(&call->master_lock); + rwlock_lock_r(&phc->call->master_lock); - stream = sfd->stream; - if (G_UNLIKELY(!stream)) - goto unlock_out; - __C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); + phc->stream = phc->sfd->stream; + if (G_UNLIKELY(!phc->stream)) + goto out; + __C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address), + phc->stream->endpoint.port); - media = stream->media; + phc->media = phc->stream->media; - if (!stream->selected_sfd) - goto unlock_out; + if (!phc->stream->selected_sfd) + goto out; - int stun_ret = media_demux_protocols(sfd, s, fsin); + int stun_ret = media_demux_protocols(phc); if (stun_ret == 0) // packet processed - goto unlock_out; + goto out; if (stun_ret == 1) { - media_packet_kernel_check(stream, sink); + media_packet_kernel_check(phc); goto drop; } #if RTP_LOOP_PROTECT - if (MEDIA_ISSET(media, LOOP_CHECK)) { - if (media_loop_detect(stream, s)) - goto unlock_out; + if (MEDIA_ISSET(phc->media, LOOP_CHECK)) { + if (media_loop_detect(phc)) + goto out; } #endif - // this sets in_srtp, out_srtp, and sink - rtcp = media_packet_rtcp_demux(s, stream, &in_srtp, &out_srtp, &sink); + // this sets rtcp, in_srtp, out_srtp, and sink + media_packet_rtcp_demux(phc); // this set ssrc_in and ssrc_out - media_packet_rtp(s, stream, in_srtp, out_srtp, rtcp, &ssrc_in, &ssrc_out); + media_packet_rtp(phc); /* do we have somewhere to forward it to? */ - if (G_UNLIKELY(!sink || !sink->selected_sfd || !out_srtp || !out_srtp->selected_sfd || !in_srtp->selected_sfd)) { - ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(fsin)); - atomic64_inc(&stream->stats.errors); + if (G_UNLIKELY(!phc->sink || !phc->sink->selected_sfd || !phc->out_srtp + || !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd)) + { + ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(&phc->fsin)); + atomic64_inc(&phc->stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); - goto unlock_out; + goto out; } - handler_ret = media_packet_decrypt(s, rtcp, in_srtp, sink, sfd, fsin, tv, ssrc_in, &rwf_out); + handler_ret = media_packet_decrypt(phc); // If recording pcap dumper is set, then we record the call. - if (call->recording) - dump_packet(call->recording, stream, s); + if (phc->call->recording) + dump_packet(phc->call->recording, phc->stream, &phc->s); if (G_LIKELY(handler_ret >= 0)) - handler_ret = media_packet_encrypt(s, rtcp, out_srtp, in_srtp, ssrc_out, rwf_out); + handler_ret = media_packet_encrypt(phc); - if (handler_ret > 0) { - unkernelize(stream); - update = 1; - } + if (phc->update) // for RTCP packet index updates + unkernelize(phc->stream); - int address_check = media_packet_address_check(stream, sink, sfd, fsin); - if (address_check == -1) + int address_check = media_packet_address_check(phc); + if (address_check) goto drop; - if ((address_check & 1)) - update = 1; - if ((address_check & 2)) - media_packet_kernel_check(stream, sink); + if (phc->kernelize) + media_packet_kernel_check(phc); - mutex_lock(&sink->out_lock); - if (!sink->advertised_endpoint.port - || (is_addr_unspecified(&sink->advertised_endpoint.address) - && !is_trickle_ice_address(&sink->advertised_endpoint)) + mutex_lock(&phc->sink->out_lock); + + if (!phc->sink->advertised_endpoint.port + || (is_addr_unspecified(&phc->sink->advertised_endpoint.address) + && !is_trickle_ice_address(&phc->sink->advertised_endpoint)) || handler_ret < 0) + { + mutex_unlock(&phc->sink->out_lock); goto drop; + } - ret = socket_sendto(&sink->selected_sfd->socket, s->s, s->len, &sink->endpoint); - __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), sink->endpoint.port); + ret = socket_sendto(&phc->sink->selected_sfd->socket, phc->s.s, phc->s.len, &phc->sink->endpoint); + __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&phc->sink->endpoint.address), + phc->sink->endpoint.port); - mutex_unlock(&sink->out_lock); + mutex_unlock(&phc->sink->out_lock); if (ret == -1) { ret = -errno; ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); - atomic64_inc(&stream->stats.errors); + atomic64_inc(&phc->stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); goto out; } - sink = NULL; - drop: - if (sink) - mutex_unlock(&sink->out_lock); ret = 0; - atomic64_inc(&stream->stats.packets); - atomic64_add(&stream->stats.bytes, s->len); - atomic64_set(&stream->last_packet, rtpe_now.tv_sec); + atomic64_inc(&phc->stream->stats.packets); + atomic64_add(&phc->stream->stats.bytes, phc->s.len); + atomic64_set(&phc->stream->last_packet, rtpe_now.tv_sec); atomic64_inc(&rtpe_statsps.packets); - atomic64_add(&rtpe_statsps.bytes, s->len); + atomic64_add(&rtpe_statsps.bytes, phc->s.len); out: - if (ret == 0 && update) - ret = 1; + if (phc->unkernelize) { + stream_unconfirm(phc->stream); + stream_unconfirm(phc->stream->rtp_sink); + stream_unconfirm(phc->stream->rtcp_sink); + } -unlock_out: - rwlock_unlock_r(&call->master_lock); + rwlock_unlock_r(&phc->call->master_lock); return ret; } @@ -1651,9 +1647,6 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { int ret, iters; int update = 0; struct call *ca; - str s; - endpoint_t ep; - struct timeval tv; if (sfd->socket.fd != fd) goto out; @@ -1669,8 +1662,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { } #endif + struct packet_handler_ctx phc = { 0, }; + phc.sfd = sfd; + ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, - &ep, &tv); + &phc.fsin, &phc.tv); if (ret < 0) { if (errno == EINTR) @@ -1683,11 +1679,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { if (ret >= MAX_RTP_PACKET_SIZE) ilog(LOG_WARNING, "UDP packet possibly truncated"); - str_init_len(&s, buf + RTP_BUFFER_HEAD_ROOM, ret); - ret = stream_packet(sfd, &s, &ep, &tv); + str_init_len(&phc.s, buf + RTP_BUFFER_HEAD_ROOM, ret); + ret = stream_packet(&phc); if (G_UNLIKELY(ret < 0)) ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret)); - else if (ret == 1) + else if (phc.update) update = 1; }