Browse Source

TT#30405 create a packet handling context struct for convenience

Change-Id: I0f3e0f66bf138147f265f7ee8d95028d9301359b
changes/08/18508/10
Richard Fuchs 8 years ago
parent
commit
fb729e3d12
1 changed files with 223 additions and 227 deletions
  1. +223
    -227
      daemon/media_socket.c

+ 223
- 227
daemon/media_socket.c View File

@ -55,6 +55,27 @@ struct intf_rr {
GQueue logical_intfs; GQueue logical_intfs;
struct logical_intf *singular; // set iff only one is present in the list - no lock needed struct logical_intf *singular; // set iff only one is present in the list - no lock needed
}; };
struct packet_handler_ctx {
// inputs:
str s; // raw input packet
endpoint_t fsin; // source address of received packet
struct timeval tv; // timestamp when packet was received
struct stream_fd *sfd; // fd which received the packet
struct call *call; // sfd->call
struct packet_stream *stream; // sfd->stream
struct call_media *media; // stream->media
struct packet_stream *sink; // where to send output packets to (forward destination)
rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt
struct packet_stream *in_srtp, *out_srtp; // SRTP contexts for decrypt/encrypt (relevant for muxed RTCP)
struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp
int rtcp; // true if this is an RTCP packet
// verdicts:
int update; // true if Redis info needs to be updated
int unkernelize; // true if stream ought to be removed from kernel
int kernelize; // true if stream can be kernelized
};
static void determine_handler(struct packet_stream *in, const struct packet_stream *out); static void determine_handler(struct packet_stream *in, const struct packet_stream *out);
@ -1169,24 +1190,21 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
// returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed; // returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed;
// 1 = same as 0, but stream can be kernelized // 1 = same as 0, but stream can be kernelized
static int media_demux_protocols(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *stream = sfd->stream;
struct call_media *media = stream->media;
if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) {
mutex_lock(&stream->in_lock);
int ret = dtls(stream, s, fsin);
mutex_unlock(&stream->in_lock);
static int media_demux_protocols(struct packet_handler_ctx *phc) {
if (MEDIA_ISSET(phc->media, DTLS) && is_dtls(&phc->s)) {
mutex_lock(&phc->stream->in_lock);
int ret = dtls(phc->stream, &phc->s, &phc->fsin);
mutex_unlock(&phc->stream->in_lock);
if (!ret) if (!ret)
return 0; return 0;
} }
if (media->ice_agent && is_stun(s)) {
int stun_ret = stun(s, sfd, fsin);
if (phc->media->ice_agent && is_stun(&phc->s)) {
int stun_ret = stun(&phc->s, phc->sfd, &phc->fsin);
if (!stun_ret) if (!stun_ret)
return 0; return 0;
if (stun_ret == 1) { if (stun_ret == 1) {
call_media_state_machine(media);
call_media_state_machine(phc->media);
return 1; return 1;
} }
else /* not an stun packet */ else /* not an stun packet */
@ -1199,34 +1217,34 @@ static int media_demux_protocols(struct stream_fd *sfd, const str *s, const endp
#if RTP_LOOP_PROTECT #if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet // returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_stream *stream, const str *s) {
mutex_lock(&stream->in_lock);
static int media_loop_detect(struct packet_handler_ctx *phc) {
mutex_lock(&phc->stream->in_lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) { for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (stream->lp_buf[i].len != s->len)
if (phc->stream->lp_buf[i].len != phc->s.len)
continue; continue;
if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)))
if (memcmp(phc->stream->lp_buf[i].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)))
continue; continue;
__C_DBG("packet dupe"); __C_DBG("packet dupe");
if (stream->lp_count >= RTP_LOOP_MAX_COUNT) {
if (phc->stream->lp_count >= RTP_LOOP_MAX_COUNT) {
ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet "
"to avoid potential loop", RTP_LOOP_MAX_COUNT); "to avoid potential loop", RTP_LOOP_MAX_COUNT);
mutex_unlock(&stream->in_lock);
mutex_unlock(&phc->stream->in_lock);
return -1; return -1;
} }
stream->lp_count++;
phc->stream->lp_count++;
goto loop_ok; goto loop_ok;
} }
/* not a dupe */ /* not a dupe */
stream->lp_count = 0;
stream->lp_buf[stream->lp_idx].len = s->len;
memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT));
stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS;
phc->stream->lp_count = 0;
phc->stream->lp_buf[phc->stream->lp_idx].len = phc->s.len;
memcpy(phc->stream->lp_buf[phc->stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->stream->lp_idx = (phc->stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok: loop_ok:
mutex_unlock(&stream->in_lock);
mutex_unlock(&phc->stream->in_lock);
return 0; return 0;
} }
@ -1234,269 +1252,253 @@ loop_ok:
// returns: true/false (is RTCP or not)
// in_srtp and out_srtp are set to point to the SRTP contexts to use // in_srtp and out_srtp are set to point to the SRTP contexts to use
// sink_p is set to where to forward the packet to
static int media_packet_rtcp_demux(const str *s, struct packet_stream *stream,
struct packet_stream **in_srtp_p, struct packet_stream **out_srtp_p,
struct packet_stream **sink_p)
// sink is set to where to forward the packet to
static void media_packet_rtcp_demux(struct packet_handler_ctx *phc)
{ {
struct call_media *media = stream->media;
int rtcp = 0;
*in_srtp_p = stream;
*sink_p = stream->rtp_sink;
if (!*sink_p && PS_ISSET(stream, RTCP)) {
*sink_p = stream->rtcp_sink;
rtcp = 1;
phc->in_srtp = phc->stream;
phc->sink = phc->stream->rtp_sink;
if (!phc->sink && PS_ISSET(phc->stream, RTCP)) {
phc->sink = phc->stream->rtcp_sink;
phc->rtcp = 1;
} }
else if (stream->rtcp_sink) {
int muxed_rtcp = rtcp_demux(s, media);
else if (phc->stream->rtcp_sink) {
int muxed_rtcp = rtcp_demux(&phc->s, phc->media);
if (muxed_rtcp == 2) { if (muxed_rtcp == 2) {
*sink_p = stream->rtcp_sink;
rtcp = 1;
*in_srtp_p = stream->rtcp_sibling; // use RTCP SRTP context
phc->sink = phc->stream->rtcp_sink;
phc->rtcp = 1;
phc->in_srtp = phc->stream->rtcp_sibling; // use RTCP SRTP context
} }
} }
*out_srtp_p = *sink_p;
if (rtcp && *sink_p && (*sink_p)->rtcp_sibling)
*out_srtp_p = (*sink_p)->rtcp_sibling; // use RTCP SRTP context
return rtcp;
phc->out_srtp = phc->sink;
if (phc->rtcp && phc->sink && phc->sink->rtcp_sibling)
phc->out_srtp = phc->sink->rtcp_sibling; // use RTCP SRTP context
} }
static void media_packet_rtp(const str *s, struct packet_stream *stream, struct packet_stream *in_srtp,
struct packet_stream *out_srtp, int rtcp,
struct ssrc_ctx **ssrc_in_p, struct ssrc_ctx **ssrc_out_p)
static void media_packet_rtp(struct packet_handler_ctx *phc)
{ {
struct call_media *media = stream->media;
struct call *call = media->call;
struct rtp_header *rtp_h; struct rtp_header *rtp_h;
struct rtcp_packet *rtcp_h; struct rtcp_packet *rtcp_h;
if (G_UNLIKELY(!media->protocol))
if (G_UNLIKELY(!phc->media->protocol))
return; return;
if (G_UNLIKELY(!media->protocol->rtp))
if (G_UNLIKELY(!phc->media->protocol->rtp))
return; return;
if (G_LIKELY(!rtcp && !rtp_payload(&rtp_h, NULL, s))) {
if (G_LIKELY(out_srtp != NULL))
__stream_ssrc(in_srtp, out_srtp, rtp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash);
if (G_LIKELY(!phc->rtcp && !rtp_payload(&rtp_h, NULL, &phc->s))) {
if (G_LIKELY(phc->out_srtp != NULL))
__stream_ssrc(phc->in_srtp, phc->out_srtp, rtp_h->ssrc, &phc->ssrc_in,
&phc->ssrc_out, phc->call->ssrc_hash);
// check the payload type // check the payload type
int i = (rtp_h->m_pt & 0x7f); int i = (rtp_h->m_pt & 0x7f);
if (G_LIKELY(*ssrc_in_p))
(*ssrc_in_p)->parent->payload_type = i;
if (G_LIKELY(phc->ssrc_in))
phc->ssrc_in->parent->payload_type = i;
// XXX convert to array? or keep last pointer? // XXX convert to array? or keep last pointer?
struct rtp_stats *rtp_s = g_hash_table_lookup(stream->rtp_stats, &i);
struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &i);
if (!rtp_s) { if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, ilog(LOG_WARNING | LOG_FLAG_LIMIT,
"RTP packet with unknown payload type %u received", i); "RTP packet with unknown payload type %u received", i);
atomic64_inc(&stream->stats.errors);
atomic64_inc(&phc->stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors); atomic64_inc(&rtpe_statsps.errors);
} }
else { else {
atomic64_inc(&rtp_s->packets); atomic64_inc(&rtp_s->packets);
atomic64_add(&rtp_s->bytes, s->len);
atomic64_add(&rtp_s->bytes, phc->s.len);
} }
} }
else if (rtcp && !rtcp_payload(&rtcp_h, NULL, s)) {
if (G_LIKELY(out_srtp != NULL))
__stream_ssrc(in_srtp, out_srtp, rtcp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash);
else if (phc->rtcp && !rtcp_payload(&rtcp_h, NULL, &phc->s)) {
if (G_LIKELY(phc->out_srtp != NULL))
__stream_ssrc(phc->in_srtp, phc->out_srtp, rtcp_h->ssrc, &phc->ssrc_in,
&phc->ssrc_out, phc->call->ssrc_hash);
} }
} }
static int media_packet_decrypt(str *s, int rtcp, struct packet_stream *in_srtp, struct packet_stream *sink,
struct stream_fd *sfd, const endpoint_t *fsin, const struct timeval *tv,
struct ssrc_ctx *ssrc_in, rewrite_func *rwf_out_p)
static int media_packet_decrypt(struct packet_handler_ctx *phc)
{ {
rewrite_func rwf_in;
mutex_lock(&in_srtp->in_lock);
determine_handler(in_srtp, sink);
mutex_lock(&phc->in_srtp->in_lock);
determine_handler(phc->in_srtp, phc->sink);
// XXX use an array with index instead of if/else // XXX use an array with index instead of if/else
if (G_LIKELY(!rtcp)) {
rwf_in = in_srtp->handler->in->rtp;
*rwf_out_p = in_srtp->handler->out->rtp;
if (G_LIKELY(!phc->rtcp)) {
phc->decrypt_func = phc->in_srtp->handler->in->rtp;
phc->encrypt_func = phc->in_srtp->handler->out->rtp;
} }
else { else {
rwf_in = in_srtp->handler->in->rtcp;
*rwf_out_p = in_srtp->handler->out->rtcp;
phc->decrypt_func = phc->in_srtp->handler->in->rtcp;
phc->encrypt_func = phc->in_srtp->handler->out->rtcp;
} }
/* return values are: 0 = forward packet, -1 = error/dont forward, /* return values are: 0 = forward packet, -1 = error/dont forward,
* 1 = forward and push update to redis */ * 1 = forward and push update to redis */
int ret = 0; int ret = 0;
if (rwf_in)
ret = rwf_in(s, in_srtp, sfd, fsin, tv, ssrc_in);
if (phc->decrypt_func)
ret = phc->decrypt_func(&phc->s, phc->in_srtp, phc->sfd, &phc->fsin, &phc->tv, phc->ssrc_in);
mutex_unlock(&in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->in_lock);
if (ret == 1) {
phc->update = 1;
ret = 0;
}
return ret; return ret;
} }
static int media_packet_encrypt(str *s, int rtcp, struct packet_stream *out_srtp, struct packet_stream *sink,
struct ssrc_ctx *ssrc_out, rewrite_func rwf_out)
static int media_packet_encrypt(struct packet_handler_ctx *phc)
{ {
if (!rwf_out)
if (!phc->encrypt_func)
return 0; return 0;
mutex_lock(&out_srtp->out_lock);
mutex_lock(&phc->out_srtp->out_lock);
int ret = 0;
if (rwf_out)
ret = rwf_out(s, out_srtp, NULL, NULL, NULL, ssrc_out);
int ret = phc->encrypt_func(&phc->s, phc->out_srtp, NULL, NULL, NULL, phc->ssrc_out);
mutex_unlock(&out_srtp->out_lock);
mutex_unlock(&phc->out_srtp->out_lock);
if (ret == 1) {
phc->update = 1;
ret = 0;
}
return ret; return ret;
} }
// returns: 0 = OK, forward packet; -1 = drop packet; 1 = OK, forward, but also update info
// 2 = OK, we are ready to kernelize; 3 = same as 1, but also update info
static int media_packet_address_check(struct packet_stream *stream, struct packet_stream *sink,
struct stream_fd *sfd, const endpoint_t *fsin)
// returns: 0 = OK, forward packet; -1 = drop packet
static int media_packet_address_check(struct packet_handler_ctx *phc)
{ {
struct call_media *media = stream->media;
struct call *call = stream->call;
struct endpoint endpoint; struct endpoint endpoint;
int unk = 0, ret;
int ret = 0;
mutex_lock(&stream->in_lock);
mutex_lock(&phc->stream->in_lock);
/* we're OK to (potentially) use the source address of this packet as destination /* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */ * in the other direction. */
/* if the other side hasn't been signalled yet, just forward the packet */ /* if the other side hasn't been signalled yet, just forward the packet */
if (!PS_ISSET(stream, FILLED)) {
__C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
ret = 0;
if (!PS_ISSET(phc->stream, FILLED)) {
__C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
goto out; goto out;
} }
/* do not pay attention to source addresses of incoming packets for asymmetric streams */ /* do not pay attention to source addresses of incoming packets for asymmetric streams */
if (MEDIA_ISSET(media, ASYMMETRIC))
PS_SET(stream, CONFIRMED);
if (MEDIA_ISSET(phc->media, ASYMMETRIC))
PS_SET(phc->stream, CONFIRMED);
/* confirm sink for unidirectional streams in order to kernelize */ /* confirm sink for unidirectional streams in order to kernelize */
if (MEDIA_ISSET(media, UNIDIRECTIONAL)) {
PS_SET(sink, CONFIRMED);
}
if (MEDIA_ISSET(phc->media, UNIDIRECTIONAL))
PS_SET(phc->sink, CONFIRMED);
/* if we have already updated the endpoint in the past ... */ /* if we have already updated the endpoint in the past ... */
if (PS_ISSET(stream, CONFIRMED)) {
if (PS_ISSET(phc->stream, CONFIRMED)) {
/* see if we need to compare the source address with the known endpoint */ /* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = *fsin;
mutex_lock(&stream->out_lock);
if (PS_ISSET2(phc->stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->fsin;
mutex_lock(&phc->stream->out_lock);
int tmp = memcmp(&endpoint, &stream->endpoint, sizeof(endpoint));
if (tmp && PS_ISSET(stream, MEDIA_HANDOVER)) {
int tmp = memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint));
if (tmp && PS_ISSET(phc->stream, MEDIA_HANDOVER)) {
/* out_lock remains locked */ /* out_lock remains locked */
ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(fsin));
unk = 1;
ret = 1;
stream->endpoint = *fsin;
ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(&phc->fsin));
phc->unkernelize = 1;
phc->update = 1;
phc->stream->endpoint = phc->fsin;
goto update_addr; goto update_addr;
} }
mutex_unlock(&stream->out_lock);
mutex_unlock(&phc->stream->out_lock);
if (tmp && PS_ISSET(stream, STRICT_SOURCE)) {
if (tmp && PS_ISSET(phc->stream, STRICT_SOURCE)) {
ilog(LOG_INFO, "Drop due to strict-source attribute; got %s:%d, expected %s:%d", ilog(LOG_INFO, "Drop due to strict-source attribute; got %s:%d, expected %s:%d",
sockaddr_print_buf(&endpoint.address), endpoint.port, sockaddr_print_buf(&endpoint.address), endpoint.port,
sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
atomic64_inc(&stream->stats.errors);
sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
atomic64_inc(&phc->stream->stats.errors);
ret = -1; ret = -1;
goto out; goto out;
} }
} }
ret = 1;
phc->kernelize = 1;
goto out; goto out;
} }
/* wait at least 3 seconds after last signal before committing to a particular /* wait at least 3 seconds after last signal before committing to a particular
* endpoint address */ * endpoint address */
ret = 0;
if (!call->last_signal || rtpe_now.tv_sec <= call->last_signal + 3)
if (!phc->call->last_signal || rtpe_now.tv_sec <= phc->call->last_signal + 3)
goto update_peerinfo; goto update_peerinfo;
ret = 2;
phc->kernelize = 1;
phc->update = 1;
ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(fsin));
ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(&phc->fsin));
PS_SET(stream, CONFIRMED);
PS_SET(phc->stream, CONFIRMED);
update_peerinfo: update_peerinfo:
mutex_lock(&stream->out_lock);
endpoint = stream->endpoint;
stream->endpoint = *fsin;
if (memcmp(&endpoint, &stream->endpoint, sizeof(endpoint)))
ret |= 1; // either 0 or 2 -> makes it 1 or 3
mutex_lock(&phc->stream->out_lock);
endpoint = phc->stream->endpoint;
phc->stream->endpoint = phc->fsin;
if (memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint)))
phc->update = 1;
update_addr: update_addr:
mutex_unlock(&stream->out_lock);
mutex_unlock(&phc->stream->out_lock);
/* check the destination address of the received packet against what we think our /* check the destination address of the received packet against what we think our
* local interface to use is */ * local interface to use is */
if (stream->selected_sfd && sfd != stream->selected_sfd) {
ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&sfd->socket.local));
stream->selected_sfd = sfd;
ret |= 1; // 0 or 2 -> 1 or 3
if (phc->stream->selected_sfd && phc->sfd != phc->stream->selected_sfd) {
ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&phc->sfd->socket.local));
phc->stream->selected_sfd = phc->sfd;
phc->update = 1;
} }
out: out:
if (unk)
__stream_unconfirm(stream);
mutex_unlock(&stream->in_lock);
if (unk) {
stream_unconfirm(stream->rtp_sink);
stream_unconfirm(stream->rtcp_sink);
}
mutex_unlock(&phc->stream->in_lock);
return ret; return ret;
} }
static void media_packet_kernel_check(struct packet_stream *stream, struct packet_stream *sink) {
if (PS_ISSET(stream, NO_KERNEL_SUPPORT)) {
__C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (PS_ISSET(phc->stream, NO_KERNEL_SUPPORT)) {
__C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&phc->stream->endpoint.address), phc->stream->endpoint.port);
return; return;
} }
if (!PS_ISSET(stream, CONFIRMED)) {
__C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
if (!PS_ISSET(phc->stream, CONFIRMED)) {
__C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
return; return;
} }
if (!sink) {
__C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
if (!phc->sink) {
__C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
return; return;
} }
if (!PS_ISSET(sink, CONFIRMED)) {
__C_DBG("sink not CONFIRMED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
if (!PS_ISSET(phc->sink, CONFIRMED)) {
__C_DBG("sink not CONFIRMED for stream %s:%d",
sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
return; return;
} }
if (!PS_ISSET(sink, FILLED)) {
__C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
if (!PS_ISSET(phc->sink, FILLED)) {
__C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
return; return;
} }
kernelize(stream);
kernelize(phc->stream);
} }
/* called lock-free */ /* called lock-free */
static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) {
static int stream_packet(struct packet_handler_ctx *phc) {
/** /**
* Incoming packets: * Incoming packets:
* - sfd->socket.local: the local IP/port on which the packet arrived * - sfd->socket.local: the local IP/port on which the packet arrived
@ -1517,129 +1519,123 @@ static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin,
*/ */
/* TODO move the above comments to the data structure definitions, if the above /* TODO move the above comments to the data structure definitions, if the above
* always holds true */ * always holds true */
// XXX collect all these vars in a struct to pass around
struct packet_stream *stream,
*sink = NULL,
*in_srtp, *out_srtp;
struct call_media *media;
int ret = 0, update = 0, handler_ret = 0, rtcp = 0;
struct call *call;
rewrite_func rwf_out;
struct ssrc_ctx *ssrc_in = NULL, *ssrc_out = NULL;
int ret = 0, handler_ret = 0;
call = sfd->call;
phc->call = phc->sfd->call;
rwlock_lock_r(&call->master_lock);
rwlock_lock_r(&phc->call->master_lock);
stream = sfd->stream;
if (G_UNLIKELY(!stream))
goto unlock_out;
__C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
phc->stream = phc->sfd->stream;
if (G_UNLIKELY(!phc->stream))
goto out;
__C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
media = stream->media;
phc->media = phc->stream->media;
if (!stream->selected_sfd)
goto unlock_out;
if (!phc->stream->selected_sfd)
goto out;
int stun_ret = media_demux_protocols(sfd, s, fsin);
int stun_ret = media_demux_protocols(phc);
if (stun_ret == 0) // packet processed if (stun_ret == 0) // packet processed
goto unlock_out;
goto out;
if (stun_ret == 1) { if (stun_ret == 1) {
media_packet_kernel_check(stream, sink);
media_packet_kernel_check(phc);
goto drop; goto drop;
} }
#if RTP_LOOP_PROTECT #if RTP_LOOP_PROTECT
if (MEDIA_ISSET(media, LOOP_CHECK)) {
if (media_loop_detect(stream, s))
goto unlock_out;
if (MEDIA_ISSET(phc->media, LOOP_CHECK)) {
if (media_loop_detect(phc))
goto out;
} }
#endif #endif
// this sets in_srtp, out_srtp, and sink
rtcp = media_packet_rtcp_demux(s, stream, &in_srtp, &out_srtp, &sink);
// this sets rtcp, in_srtp, out_srtp, and sink
media_packet_rtcp_demux(phc);
// this set ssrc_in and ssrc_out // this set ssrc_in and ssrc_out
media_packet_rtp(s, stream, in_srtp, out_srtp, rtcp, &ssrc_in, &ssrc_out);
media_packet_rtp(phc);
/* do we have somewhere to forward it to? */ /* do we have somewhere to forward it to? */
if (G_UNLIKELY(!sink || !sink->selected_sfd || !out_srtp || !out_srtp->selected_sfd || !in_srtp->selected_sfd)) {
ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(fsin));
atomic64_inc(&stream->stats.errors);
if (G_UNLIKELY(!phc->sink || !phc->sink->selected_sfd || !phc->out_srtp
|| !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd))
{
ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(&phc->fsin));
atomic64_inc(&phc->stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors); atomic64_inc(&rtpe_statsps.errors);
goto unlock_out;
goto out;
} }
handler_ret = media_packet_decrypt(s, rtcp, in_srtp, sink, sfd, fsin, tv, ssrc_in, &rwf_out);
handler_ret = media_packet_decrypt(phc);
// If recording pcap dumper is set, then we record the call. // If recording pcap dumper is set, then we record the call.
if (call->recording)
dump_packet(call->recording, stream, s);
if (phc->call->recording)
dump_packet(phc->call->recording, phc->stream, &phc->s);
if (G_LIKELY(handler_ret >= 0)) if (G_LIKELY(handler_ret >= 0))
handler_ret = media_packet_encrypt(s, rtcp, out_srtp, in_srtp, ssrc_out, rwf_out);
handler_ret = media_packet_encrypt(phc);
if (handler_ret > 0) {
unkernelize(stream);
update = 1;
}
if (phc->update) // for RTCP packet index updates
unkernelize(phc->stream);
int address_check = media_packet_address_check(stream, sink, sfd, fsin);
if (address_check == -1)
int address_check = media_packet_address_check(phc);
if (address_check)
goto drop; goto drop;
if ((address_check & 1))
update = 1;
if ((address_check & 2))
media_packet_kernel_check(stream, sink);
if (phc->kernelize)
media_packet_kernel_check(phc);
mutex_lock(&sink->out_lock);
if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint))
mutex_lock(&phc->sink->out_lock);
if (!phc->sink->advertised_endpoint.port
|| (is_addr_unspecified(&phc->sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&phc->sink->advertised_endpoint))
|| handler_ret < 0) || handler_ret < 0)
{
mutex_unlock(&phc->sink->out_lock);
goto drop; goto drop;
}
ret = socket_sendto(&sink->selected_sfd->socket, s->s, s->len, &sink->endpoint);
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), sink->endpoint.port);
ret = socket_sendto(&phc->sink->selected_sfd->socket, phc->s.s, phc->s.len, &phc->sink->endpoint);
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&phc->sink->endpoint.address),
phc->sink->endpoint.port);
mutex_unlock(&sink->out_lock);
mutex_unlock(&phc->sink->out_lock);
if (ret == -1) { if (ret == -1) {
ret = -errno; ret = -errno;
ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno));
atomic64_inc(&stream->stats.errors);
atomic64_inc(&phc->stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors); atomic64_inc(&rtpe_statsps.errors);
goto out; goto out;
} }
sink = NULL;
drop: drop:
if (sink)
mutex_unlock(&sink->out_lock);
ret = 0; ret = 0;
atomic64_inc(&stream->stats.packets);
atomic64_add(&stream->stats.bytes, s->len);
atomic64_set(&stream->last_packet, rtpe_now.tv_sec);
atomic64_inc(&phc->stream->stats.packets);
atomic64_add(&phc->stream->stats.bytes, phc->s.len);
atomic64_set(&phc->stream->last_packet, rtpe_now.tv_sec);
atomic64_inc(&rtpe_statsps.packets); atomic64_inc(&rtpe_statsps.packets);
atomic64_add(&rtpe_statsps.bytes, s->len);
atomic64_add(&rtpe_statsps.bytes, phc->s.len);
out: out:
if (ret == 0 && update)
ret = 1;
if (phc->unkernelize) {
stream_unconfirm(phc->stream);
stream_unconfirm(phc->stream->rtp_sink);
stream_unconfirm(phc->stream->rtcp_sink);
}
unlock_out:
rwlock_unlock_r(&call->master_lock);
rwlock_unlock_r(&phc->call->master_lock);
return ret; return ret;
} }
@ -1651,9 +1647,6 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
int ret, iters; int ret, iters;
int update = 0; int update = 0;
struct call *ca; struct call *ca;
str s;
endpoint_t ep;
struct timeval tv;
if (sfd->socket.fd != fd) if (sfd->socket.fd != fd)
goto out; goto out;
@ -1669,8 +1662,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
} }
#endif #endif
struct packet_handler_ctx phc = { 0, };
phc.sfd = sfd;
ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE,
&ep, &tv);
&phc.fsin, &phc.tv);
if (ret < 0) { if (ret < 0) {
if (errno == EINTR) if (errno == EINTR)
@ -1683,11 +1679,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
if (ret >= MAX_RTP_PACKET_SIZE) if (ret >= MAX_RTP_PACKET_SIZE)
ilog(LOG_WARNING, "UDP packet possibly truncated"); ilog(LOG_WARNING, "UDP packet possibly truncated");
str_init_len(&s, buf + RTP_BUFFER_HEAD_ROOM, ret);
ret = stream_packet(sfd, &s, &ep, &tv);
str_init_len(&phc.s, buf + RTP_BUFFER_HEAD_ROOM, ret);
ret = stream_packet(&phc);
if (G_UNLIKELY(ret < 0)) if (G_UNLIKELY(ret < 0))
ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret)); ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret));
else if (ret == 1)
else if (phc.update)
update = 1; update = 1;
} }


Loading…
Cancel
Save