Browse Source

TT#30405 split stream_packet() into smaller pieces

Change-Id: I89b46e02bfdbedffa63f71ef11b03cbf204ececa
changes/88/18488/9
Richard Fuchs 8 years ago
parent
commit
bc420cbdd0
3 changed files with 288 additions and 189 deletions
  1. +284
    -185
      daemon/media_socket.c
  2. +3
    -3
      daemon/stun.c
  3. +1
    -1
      daemon/stun.h

+ 284
- 185
daemon/media_socket.c View File

@ -838,7 +838,7 @@ static void stream_fd_closed(int fd, void *p, uintptr_t u) {
/* returns: 0 = not a muxed stream, 1 = muxed, RTP, 2 = muxed, RTCP */
static int rtcp_demux(str *s, struct call_media *media) {
static int rtcp_demux(const str *s, struct call_media *media) {
if (!MEDIA_ISSET(media, RTCP_MUX))
return 0;
return rtcp_demux_is_rtcp(s) ? 2 : 1;
@ -1167,223 +1167,207 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
}
/* XXX split this function into pieces */
/* called lock-free */
static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) {
/**
* Incoming packets:
* - sfd->socket.local: the local IP/port on which the packet arrived
* - sfd->stream->endpoint: adjusted/learned IP/port from where the packet
* was sent
* - sfd->stream->advertised_endpoint: the unadjusted IP/port from where the
* packet was sent. These are the values present in the SDP
*
* Outgoing packets:
* - sfd->stream->rtp_sink->endpoint: the destination IP/port
* - sfd->stream->selected_sfd->socket.local: the local source IP/port for the
* outgoing packet
*
* If the rtpengine runs behind a NAT and local addresses are configured with
* different advertised endpoints, the SDP would not contain the address from
* `...->socket.local`, but rather from `sfd->local_intf->spec->address.advertised`
* (of type `sockaddr_t`). The port will be the same.
*/
/* TODO move the above comments to the data structure definitions, if the above
* always holds true */
struct packet_stream *stream,
*sink = NULL,
*in_srtp, *out_srtp;
struct call_media *media;
int ret = 0, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0, rtcp = 0,
unk = 0;
int i;
struct call *call;
/*unsigned char cc;*/
struct endpoint endpoint;
rewrite_func rwf_in, rwf_out;
//struct local_intf *loc_addr;
struct rtp_header *rtp_h;
struct rtcp_packet *rtcp_h;
struct rtp_stats *rtp_s;
struct ssrc_ctx *ssrc_in = NULL, *ssrc_out = NULL;
call = sfd->call;
rwlock_lock_r(&call->master_lock);
stream = sfd->stream;
if (!stream)
goto unlock_out;
__C_DBG("Try to Kernelizing media stream: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
media = stream->media;
if (!stream->selected_sfd)
goto unlock_out;
/* demux other protocols running on this port */
// returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed;
// 1 = same as 0, but stream can be kernelized
static int media_demux_protocols(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *stream = sfd->stream;
struct call_media *media = stream->media;
if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) {
mutex_lock(&stream->in_lock);
ret = dtls(stream, s, fsin);
int ret = dtls(stream, s, fsin);
mutex_unlock(&stream->in_lock);
if (!ret)
goto unlock_out;
return 0;
}
if (media->ice_agent && is_stun(s)) {
stun_ret = stun(s, sfd, fsin);
int stun_ret = stun(s, sfd, fsin);
if (!stun_ret)
goto unlock_out;
return 0;
if (stun_ret == 1) {
call_media_state_machine(media);
mutex_lock(&stream->in_lock); /* for the jump */
goto kernel_check;
return 1;
}
else /* not an stun packet */
stun_ret = 0;
;
}
return -1;
}
#if RTP_LOOP_PROTECT
if (MEDIA_ISSET(media, LOOP_CHECK)) {
mutex_lock(&stream->in_lock);
for (i = 0; i < RTP_LOOP_PACKETS; i++) {
if (stream->lp_buf[i].len != s->len)
continue;
if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)))
continue;
__C_DBG("packet dupe");
if (stream->lp_count >= RTP_LOOP_MAX_COUNT) {
ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet "
"to avoid potential loop", RTP_LOOP_MAX_COUNT);
goto done;
}
#if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_stream *stream, const str *s) {
mutex_lock(&stream->in_lock);
stream->lp_count++;
goto loop_ok;
for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (stream->lp_buf[i].len != s->len)
continue;
if (memcmp(stream->lp_buf[i].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT)))
continue;
__C_DBG("packet dupe");
if (stream->lp_count >= RTP_LOOP_MAX_COUNT) {
ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet "
"to avoid potential loop", RTP_LOOP_MAX_COUNT);
mutex_unlock(&stream->in_lock);
return -1;
}
/* not a dupe */
stream->lp_count = 0;
stream->lp_buf[stream->lp_idx].len = s->len;
memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT));
stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok:
mutex_unlock(&stream->in_lock);
stream->lp_count++;
goto loop_ok;
}
/* not a dupe */
stream->lp_count = 0;
stream->lp_buf[stream->lp_idx].len = s->len;
memcpy(stream->lp_buf[stream->lp_idx].buf, s->s, MIN(s->len, RTP_LOOP_PROTECT));
stream->lp_idx = (stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok:
mutex_unlock(&stream->in_lock);
return 0;
}
#endif
/* demux RTCP */
in_srtp = stream;
sink = stream->rtp_sink;
if (!sink && PS_ISSET(stream, RTCP)) {
sink = stream->rtcp_sink;
// returns: true/false (is RTCP or not)
// in_srtp and out_srtp are set to point to the SRTP contexts to use
// sink_p is set to where to forward the packet to
static int media_packet_rtcp_demux(const str *s, struct packet_stream *stream,
struct packet_stream **in_srtp_p, struct packet_stream **out_srtp_p,
struct packet_stream **sink_p)
{
struct call_media *media = stream->media;
int rtcp = 0;
*in_srtp_p = stream;
*sink_p = stream->rtp_sink;
if (!*sink_p && PS_ISSET(stream, RTCP)) {
*sink_p = stream->rtcp_sink;
rtcp = 1;
}
else if (stream->rtcp_sink) {
muxed_rtcp = rtcp_demux(s, media);
int muxed_rtcp = rtcp_demux(s, media);
if (muxed_rtcp == 2) {
sink = stream->rtcp_sink;
*sink_p = stream->rtcp_sink;
rtcp = 1;
in_srtp = stream->rtcp_sibling;
*in_srtp_p = stream->rtcp_sibling; // use RTCP SRTP context
}
}
out_srtp = sink;
if (rtcp && sink && sink->rtcp_sibling)
out_srtp = sink->rtcp_sibling;
*out_srtp_p = *sink_p;
if (rtcp && *sink_p && (*sink_p)->rtcp_sibling)
*out_srtp_p = (*sink_p)->rtcp_sibling; // use RTCP SRTP context
/* RTP/RTCP specifics */
return rtcp;
}
if (G_LIKELY(media->protocol && media->protocol->rtp)) {
if (G_LIKELY(!rtcp && !rtp_payload(&rtp_h, NULL, s))) {
if (G_LIKELY(out_srtp != NULL))
__stream_ssrc(in_srtp, out_srtp, rtp_h->ssrc, &ssrc_in, &ssrc_out, call->ssrc_hash);
// check the payload type
i = (rtp_h->m_pt & 0x7f);
if (G_LIKELY(ssrc_in))
ssrc_in->parent->payload_type = i;
static void media_packet_rtp(const str *s, struct packet_stream *stream, struct packet_stream *in_srtp,
struct packet_stream *out_srtp, int rtcp,
struct ssrc_ctx **ssrc_in_p, struct ssrc_ctx **ssrc_out_p)
{
struct call_media *media = stream->media;
struct call *call = media->call;
struct rtp_header *rtp_h;
struct rtcp_packet *rtcp_h;
// XXX convert to array? or keep last pointer?
rtp_s = g_hash_table_lookup(stream->rtp_stats, &i);
if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT,
"RTP packet with unknown payload type %u received", i);
atomic64_inc(&stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
}
if (G_UNLIKELY(!media->protocol))
return;
if (G_UNLIKELY(!media->protocol->rtp))
return;
else {
atomic64_inc(&rtp_s->packets);
atomic64_add(&rtp_s->bytes, s->len);
}
if (G_LIKELY(!rtcp && !rtp_payload(&rtp_h, NULL, s))) {
if (G_LIKELY(out_srtp != NULL))
__stream_ssrc(in_srtp, out_srtp, rtp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash);
// check the payload type
int i = (rtp_h->m_pt & 0x7f);
if (G_LIKELY(*ssrc_in_p))
(*ssrc_in_p)->parent->payload_type = i;
// XXX convert to array? or keep last pointer?
struct rtp_stats *rtp_s = g_hash_table_lookup(stream->rtp_stats, &i);
if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT,
"RTP packet with unknown payload type %u received", i);
atomic64_inc(&stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
}
else if (rtcp && !rtcp_payload(&rtcp_h, NULL, s)) {
if (G_LIKELY(out_srtp != NULL))
__stream_ssrc(in_srtp, out_srtp, rtcp_h->ssrc, &ssrc_in, &ssrc_out, call->ssrc_hash);
else {
atomic64_inc(&rtp_s->packets);
atomic64_add(&rtp_s->bytes, s->len);
}
}
/* do we have somewhere to forward it to? */
if (G_UNLIKELY(!sink || !sink->selected_sfd || !out_srtp || !out_srtp->selected_sfd || !in_srtp->selected_sfd)) {
ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(fsin));
atomic64_inc(&stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
goto unlock_out;
else if (rtcp && !rtcp_payload(&rtcp_h, NULL, s)) {
if (G_LIKELY(out_srtp != NULL))
__stream_ssrc(in_srtp, out_srtp, rtcp_h->ssrc, ssrc_in_p, ssrc_out_p, call->ssrc_hash);
}
}
/* transcoding stuff, in and out */
static int media_packet_decrypt(str *s, int rtcp, struct packet_stream *in_srtp, struct packet_stream *sink,
struct stream_fd *sfd, const endpoint_t *fsin, const struct timeval *tv,
struct ssrc_ctx *ssrc_in, rewrite_func *rwf_out_p)
{
rewrite_func rwf_in;
mutex_lock(&in_srtp->in_lock);
determine_handler(in_srtp, sink);
// XXX use an array with index instead of if/else
if (G_LIKELY(!rtcp)) {
rwf_in = in_srtp->handler->in->rtp;
rwf_out = in_srtp->handler->out->rtp;
*rwf_out_p = in_srtp->handler->out->rtp;
}
else {
rwf_in = in_srtp->handler->in->rtcp;
rwf_out = in_srtp->handler->out->rtcp;
*rwf_out_p = in_srtp->handler->out->rtcp;
}
mutex_lock(&out_srtp->out_lock);
/* return values are: 0 = forward packet, -1 = error/dont forward,
* 1 = forward and push update to redis */
if (rwf_in) {
handler_ret = rwf_in(s, in_srtp, sfd, fsin, tv, ssrc_in);
}
int ret = 0;
if (rwf_in)
ret = rwf_in(s, in_srtp, sfd, fsin, tv, ssrc_in);
// If recording pcap dumper is set, then we record the call.
if (call->recording) {
dump_packet(call->recording, stream, s);
}
mutex_unlock(&in_srtp->in_lock);
if (G_LIKELY(handler_ret >= 0)) {
if (rwf_out)
handler_ret += rwf_out(s, out_srtp, NULL, NULL, NULL, ssrc_out);
}
return ret;
}
if (handler_ret > 0) {
__unkernelize(stream);
update = 1;
}
static int media_packet_encrypt(str *s, int rtcp, struct packet_stream *out_srtp, struct packet_stream *sink,
struct ssrc_ctx *ssrc_out, rewrite_func rwf_out)
{
if (!rwf_out)
return 0;
mutex_lock(&out_srtp->out_lock);
int ret = 0;
if (rwf_out)
ret = rwf_out(s, out_srtp, NULL, NULL, NULL, ssrc_out);
mutex_unlock(&out_srtp->out_lock);
mutex_unlock(&in_srtp->in_lock);
return ret;
}
/* endpoint address handling */
// returns: 0 = OK, forward packet; -1 = drop packet; 1 = OK, forward, but also update info
// 2 = OK, we are ready to kernelize; 3 = same as 1, but also update info
static int media_packet_address_check(struct packet_stream *stream, struct packet_stream *sink,
struct stream_fd *sfd, const endpoint_t *fsin)
{
struct call_media *media = stream->media;
struct call *call = stream->call;
struct endpoint endpoint;
int unk = 0, ret;
mutex_lock(&stream->in_lock);
@ -1392,7 +1376,8 @@ loop_ok:
/* if the other side hasn't been signalled yet, just forward the packet */
if (!PS_ISSET(stream, FILLED)) {
__C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
goto forward;
ret = 0;
goto out;
}
/* do not pay attention to source addresses of incoming packets for asymmetric streams */
@ -1416,6 +1401,8 @@ loop_ok:
/* out_lock remains locked */
ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(fsin));
unk = 1;
ret = 1;
stream->endpoint = *fsin;
goto update_addr;
}
@ -1426,29 +1413,33 @@ loop_ok:
sockaddr_print_buf(&endpoint.address), endpoint.port,
sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
atomic64_inc(&stream->stats.errors);
goto drop;
ret = -1;
goto out;
}
}
goto kernel_check;
ret = 1;
goto out;
}
/* wait at least 3 seconds after last signal before committing to a particular
* endpoint address */
ret = 0;
if (!call->last_signal || rtpe_now.tv_sec <= call->last_signal + 3)
goto update_peerinfo;
ret = 2;
ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(fsin));
PS_SET(stream, CONFIRMED);
update = 1;
update_peerinfo:
mutex_lock(&stream->out_lock);
update_addr:
endpoint = stream->endpoint;
stream->endpoint = *fsin;
if (memcmp(&endpoint, &stream->endpoint, sizeof(endpoint)))
update = 1;
ret |= 1; // either 0 or 2 -> makes it 1 or 3
update_addr:
mutex_unlock(&stream->out_lock);
/* check the destination address of the received packet against what we think our
@ -1456,50 +1447,168 @@ update_addr:
if (stream->selected_sfd && sfd != stream->selected_sfd) {
ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&sfd->socket.local));
stream->selected_sfd = sfd;
update = 1;
ret |= 1; // 0 or 2 -> 1 or 3
}
out:
if (unk)
__stream_unconfirm(stream);
mutex_unlock(&stream->in_lock);
kernel_check:
if (unk) {
stream_unconfirm(stream->rtp_sink);
stream_unconfirm(stream->rtcp_sink);
}
return ret;
}
static void media_packet_kernel_check(struct packet_stream *stream, struct packet_stream *sink) {
if (PS_ISSET(stream, NO_KERNEL_SUPPORT)) {
__C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
goto forward;
return;
}
if (!PS_ISSET(stream, CONFIRMED)) {
__C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
goto forward;
return;
}
if (!sink) {
__C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
goto forward;
return;
}
if (!PS_ISSET(sink, CONFIRMED)) {
__C_DBG("sink not CONFIRMED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
goto forward;
return;
}
if (!PS_ISSET(sink, FILLED)) {
__C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
goto forward;
return;
}
kernelize(stream);
}
forward:
if (sink)
mutex_lock(&sink->out_lock);
if (!sink
|| !sink->advertised_endpoint.port
/* called lock-free */
static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin, const struct timeval *tv) {
/**
* Incoming packets:
* - sfd->socket.local: the local IP/port on which the packet arrived
* - sfd->stream->endpoint: adjusted/learned IP/port from where the packet
* was sent
* - sfd->stream->advertised_endpoint: the unadjusted IP/port from where the
* packet was sent. These are the values present in the SDP
*
* Outgoing packets:
* - sfd->stream->rtp_sink->endpoint: the destination IP/port
* - sfd->stream->selected_sfd->socket.local: the local source IP/port for the
* outgoing packet
*
* If the rtpengine runs behind a NAT and local addresses are configured with
* different advertised endpoints, the SDP would not contain the address from
* `...->socket.local`, but rather from `sfd->local_intf->spec->address.advertised`
* (of type `sockaddr_t`). The port will be the same.
*/
/* TODO move the above comments to the data structure definitions, if the above
* always holds true */
// XXX collect all these vars in a struct to pass around
struct packet_stream *stream,
*sink = NULL,
*in_srtp, *out_srtp;
struct call_media *media;
int ret = 0, update = 0, handler_ret = 0, rtcp = 0;
struct call *call;
rewrite_func rwf_out;
struct ssrc_ctx *ssrc_in = NULL, *ssrc_out = NULL;
call = sfd->call;
rwlock_lock_r(&call->master_lock);
stream = sfd->stream;
if (G_UNLIKELY(!stream))
goto unlock_out;
__C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);
media = stream->media;
if (!stream->selected_sfd)
goto unlock_out;
int stun_ret = media_demux_protocols(sfd, s, fsin);
if (stun_ret == 0) // packet processed
goto unlock_out;
if (stun_ret == 1) {
media_packet_kernel_check(stream, sink);
goto drop;
}
#if RTP_LOOP_PROTECT
if (MEDIA_ISSET(media, LOOP_CHECK)) {
if (media_loop_detect(stream, s))
goto unlock_out;
}
#endif
// this sets in_srtp, out_srtp, and sink
rtcp = media_packet_rtcp_demux(s, stream, &in_srtp, &out_srtp, &sink);
// this set ssrc_in and ssrc_out
media_packet_rtp(s, stream, in_srtp, out_srtp, rtcp, &ssrc_in, &ssrc_out);
/* do we have somewhere to forward it to? */
if (G_UNLIKELY(!sink || !sink->selected_sfd || !out_srtp || !out_srtp->selected_sfd || !in_srtp->selected_sfd)) {
ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(fsin));
atomic64_inc(&stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
goto unlock_out;
}
handler_ret = media_packet_decrypt(s, rtcp, in_srtp, sink, sfd, fsin, tv, ssrc_in, &rwf_out);
// If recording pcap dumper is set, then we record the call.
if (call->recording)
dump_packet(call->recording, stream, s);
if (G_LIKELY(handler_ret >= 0))
handler_ret = media_packet_encrypt(s, rtcp, out_srtp, in_srtp, ssrc_out, rwf_out);
if (handler_ret > 0) {
unkernelize(stream);
update = 1;
}
int address_check = media_packet_address_check(stream, sink, sfd, fsin);
if (address_check == -1)
goto drop;
if ((address_check & 1))
update = 1;
if ((address_check & 2))
media_packet_kernel_check(stream, sink);
mutex_lock(&sink->out_lock);
if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint))
|| stun_ret || handler_ret < 0)
|| handler_ret < 0)
goto drop;
// s is my packet?
ret = socket_sendto(&sink->selected_sfd->socket, s->s, s->len, &sink->endpoint);
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), sink->endpoint.port);
@ -1529,16 +1638,6 @@ out:
if (ret == 0 && update)
ret = 1;
#if RTP_LOOP_PROTECT
done:
#endif
if (unk)
__stream_unconfirm(stream);
mutex_unlock(&stream->in_lock);
if (unk) {
stream_unconfirm(stream->rtp_sink);
stream_unconfirm(stream->rtcp_sink);
}
unlock_out:
rwlock_unlock_r(&call->master_lock);


+ 3
- 3
daemon/stun.c View File

@ -409,7 +409,7 @@ static void stun_error_len(struct stream_fd *sfd, const endpoint_t *sin,
static int check_fingerprint(str *msg, struct stun_attrs *attrs) {
static int check_fingerprint(const str *msg, struct stun_attrs *attrs) {
int len;
u_int32_t crc;
@ -422,7 +422,7 @@ static int check_fingerprint(str *msg, struct stun_attrs *attrs) {
return 0;
}
static int check_auth(str *msg, struct stun_attrs *attrs, struct call_media *media, int dst, int src) {
static int check_auth(const str *msg, struct stun_attrs *attrs, struct call_media *media, int dst, int src) {
u_int16_t lenX;
char digest[20];
str ufrag[2];
@ -553,7 +553,7 @@ static int __stun_error(struct stream_fd *sfd, const endpoint_t *sin,
*
* call is locked in R
*/
int stun(str *b, struct stream_fd *sfd, const endpoint_t *sin) {
int stun(const str *b, struct stream_fd *sfd, const endpoint_t *sin) {
struct header *req = (void *) b->s;
int msglen, method, class;
str attr_str;


+ 1
- 1
daemon/stun.h View File

@ -49,7 +49,7 @@ INLINE int is_stun(const str *s) {
}
int stun(str *, struct stream_fd *, const endpoint_t *);
int stun(const str *, struct stream_fd *, const endpoint_t *);
int stun_binding_request(const endpoint_t *dst, u_int32_t transaction[3], str *pwd,
str ufrags[2], int controlling, u_int64_t tiebreaker, u_int32_t priority,


Loading…
Cancel
Save