diff --git a/daemon/call.h b/daemon/call.h index 254065924..a049a59f8 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -342,6 +342,7 @@ struct call_media { GHashTable *codec_handlers; // int payload type -> struct codec_handler // XXX combine this with 'codecs_recv' hash table? volatile struct codec_handler *codec_handler_cache; + struct rtcp_handler *rtcp_handler; int ptime; // either from SDP or overridden diff --git a/daemon/codec.c b/daemon/codec.c index 38886faab..67c461824 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -6,6 +6,7 @@ #include "rtplib.h" #include "codeclib.h" #include "ssrc.h" +#include "rtcp.h" @@ -138,6 +139,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink) NULL, __codec_handler_free); MEDIA_CLEAR(receiver, TRANSCODE); + receiver->rtcp_handler = NULL; // we go through the list of codecs that the receiver supports and compare it // with the list of codecs supported by the sink. if the receiver supports @@ -335,6 +337,8 @@ next: STR_FMT(&pt->encoding)); l = __delete_receiver_codec(receiver, l); } + + receiver->rtcp_handler = rtcp_transcode_handler; } } @@ -368,16 +372,16 @@ void codec_handlers_free(struct call_media *m) { } -void codec_add_raw_packet(struct media_packet *mp, const str *raw) { +void codec_add_raw_packet(struct media_packet *mp) { struct codec_packet *p = g_slice_alloc(sizeof(*p)); - p->s = *raw; + p->s = mp->raw; p->free_func = NULL; g_queue_push_tail(&mp->packets_out, p); } static int handler_func_passthrough(struct codec_handler *h, struct call_media *media, struct media_packet *mp) { - codec_add_raw_packet(mp, &mp->raw); + codec_add_raw_packet(mp); return 0; } @@ -500,7 +504,7 @@ static int __packet_encoded(encoder_t *enc, void *u1, void *u2) { rh->m_pt = ch->handler->dest_pt.payload_type; rh->seq_num = htons(ch->seq_out++); rh->timestamp = htonl(enc->avpkt.pts + ch->ts_out); - rh->ssrc = mp->ssrc_out->ssrc_map_out; + rh->ssrc = htonl(mp->ssrc_in->ssrc_map_out); // add to output queue struct codec_packet *p = g_slice_alloc(sizeof(*p)); diff --git a/daemon/codec.h b/daemon/codec.h index 3e3c6a584..4cdffe3e3 100644 --- a/daemon/codec.h +++ b/daemon/codec.h @@ -36,7 +36,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink) struct codec_handler *codec_handler_get(struct call_media *, int payload_type); void codec_handlers_free(struct call_media *); -void codec_add_raw_packet(struct media_packet *mp, const str *); +void codec_add_raw_packet(struct media_packet *mp); void codec_packet_free(void *); void codec_rtp_payload_types(struct call_media *media, struct call_media *other_media, diff --git a/daemon/media_socket.c b/daemon/media_socket.c index f1021505a..27d0af5d4 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -60,13 +60,7 @@ struct intf_rr { struct packet_handler_ctx { // inputs: str s; // raw input packet - endpoint_t fsin; // source address of received packet - struct timeval tv; // timestamp when packet was received - struct stream_fd *sfd; // fd which received the packet - struct call *call; // sfd->call - struct packet_stream *stream; // sfd->stream - struct call_media *media; // stream->media struct packet_stream *sink; // where to send output packets to (forward destination) rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt rtcp_filter_func *rtcp_filter; @@ -1136,31 +1130,38 @@ noop: static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *out_srtp, u_int32_t ssrc_bs, struct ssrc_ctx **ssrc_in_p, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash) { - u_int32_t ssrc = ntohl(ssrc_bs); + u_int32_t in_ssrc = ntohl(ssrc_bs); + u_int32_t out_ssrc; // input direction mutex_lock(&in_srtp->in_lock); (*ssrc_in_p) = in_srtp->ssrc_in; - if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != ssrc)) { + if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != in_ssrc)) { // SSRC mismatch - get the new entry (*ssrc_in_p) = in_srtp->ssrc_in = - get_ssrc_ctx(ssrc, ssrc_hash, SSRC_DIR_INPUT); + get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT); + + // might have created a new entry, which would have a new random + // ssrc_map_out. we don't need this if we're not transcoding + if (!MEDIA_ISSET(in_srtp->media, TRANSCODE)) + (*ssrc_in_p)->ssrc_map_out = in_ssrc; } mutex_unlock(&in_srtp->in_lock); - if (MEDIA_ISSET(in_srtp->media, TRANSCODE)) - ssrc = (*ssrc_in_p)->ssrc_map_out; - // out direction + out_ssrc = (*ssrc_in_p)->ssrc_map_out; mutex_lock(&out_srtp->out_lock); (*ssrc_out_p) = out_srtp->ssrc_out; - if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != ssrc)) { + if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != out_ssrc)) { // SSRC mismatch - get the new entry (*ssrc_out_p) = out_srtp->ssrc_out = - get_ssrc_ctx(ssrc, ssrc_hash, SSRC_DIR_OUTPUT); + get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT); + + // reverse SSRC mapping + (*ssrc_out_p)->ssrc_map_out = in_ssrc; } mutex_unlock(&out_srtp->out_lock); @@ -1170,20 +1171,20 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o // returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed; // 1 = same as 0, but stream can be kernelized static int media_demux_protocols(struct packet_handler_ctx *phc) { - if (MEDIA_ISSET(phc->media, DTLS) && is_dtls(&phc->s)) { - mutex_lock(&phc->stream->in_lock); - int ret = dtls(phc->stream, &phc->s, &phc->fsin); - mutex_unlock(&phc->stream->in_lock); + if (MEDIA_ISSET(phc->mp.media, DTLS) && is_dtls(&phc->s)) { + mutex_lock(&phc->mp.stream->in_lock); + int ret = dtls(phc->mp.stream, &phc->s, &phc->mp.fsin); + mutex_unlock(&phc->mp.stream->in_lock); if (!ret) return 0; } - if (phc->media->ice_agent && is_stun(&phc->s)) { - int stun_ret = stun(&phc->s, phc->sfd, &phc->fsin); + if (phc->mp.media->ice_agent && is_stun(&phc->s)) { + int stun_ret = stun(&phc->s, phc->mp.sfd, &phc->mp.fsin); if (!stun_ret) return 0; if (stun_ret == 1) { - call_media_state_machine(phc->media); + call_media_state_machine(phc->mp.media); return 1; } else /* not an stun packet */ @@ -1197,33 +1198,33 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) { #if RTP_LOOP_PROTECT // returns: 0 = ok, proceed; -1 = duplicate detected, drop packet static int media_loop_detect(struct packet_handler_ctx *phc) { - mutex_lock(&phc->stream->in_lock); + mutex_lock(&phc->mp.stream->in_lock); for (int i = 0; i < RTP_LOOP_PACKETS; i++) { - if (phc->stream->lp_buf[i].len != phc->s.len) + if (phc->mp.stream->lp_buf[i].len != phc->s.len) continue; - if (memcmp(phc->stream->lp_buf[i].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT))) + if (memcmp(phc->mp.stream->lp_buf[i].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT))) continue; __C_DBG("packet dupe"); - if (phc->stream->lp_count >= RTP_LOOP_MAX_COUNT) { + if (phc->mp.stream->lp_count >= RTP_LOOP_MAX_COUNT) { ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet " "to avoid potential loop", RTP_LOOP_MAX_COUNT); - mutex_unlock(&phc->stream->in_lock); + mutex_unlock(&phc->mp.stream->in_lock); return -1; } - phc->stream->lp_count++; + phc->mp.stream->lp_count++; goto loop_ok; } /* not a dupe */ - phc->stream->lp_count = 0; - phc->stream->lp_buf[phc->stream->lp_idx].len = phc->s.len; - memcpy(phc->stream->lp_buf[phc->stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)); - phc->stream->lp_idx = (phc->stream->lp_idx + 1) % RTP_LOOP_PACKETS; + phc->mp.stream->lp_count = 0; + phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].len = phc->s.len; + memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)); + phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS; loop_ok: - mutex_unlock(&phc->stream->in_lock); + mutex_unlock(&phc->mp.stream->in_lock); return 0; } @@ -1235,18 +1236,18 @@ loop_ok: // sink is set to where to forward the packet to static void media_packet_rtcp_demux(struct packet_handler_ctx *phc) { - phc->in_srtp = phc->stream; - phc->sink = phc->stream->rtp_sink; - if (!phc->sink && PS_ISSET(phc->stream, RTCP)) { - phc->sink = phc->stream->rtcp_sink; + phc->in_srtp = phc->mp.stream; + phc->sink = phc->mp.stream->rtp_sink; + if (!phc->sink && PS_ISSET(phc->mp.stream, RTCP)) { + phc->sink = phc->mp.stream->rtcp_sink; phc->rtcp = 1; } - else if (phc->stream->rtcp_sink) { - int muxed_rtcp = rtcp_demux(&phc->s, phc->media); + else if (phc->mp.stream->rtcp_sink) { + int muxed_rtcp = rtcp_demux(&phc->s, phc->mp.media); if (muxed_rtcp == 2) { - phc->sink = phc->stream->rtcp_sink; + phc->sink = phc->mp.stream->rtcp_sink; phc->rtcp = 1; - phc->in_srtp = phc->stream->rtcp_sibling; // use RTCP SRTP context + phc->in_srtp = phc->mp.stream->rtcp_sibling; // use RTCP SRTP context } } phc->out_srtp = phc->sink; @@ -1259,9 +1260,9 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) { phc->payload_type = -1; - if (G_UNLIKELY(!phc->media->protocol)) + if (G_UNLIKELY(!phc->mp.media->protocol)) return; - if (G_UNLIKELY(!phc->media->protocol->rtp)) + if (G_UNLIKELY(!phc->mp.media->protocol->rtp)) return; if (G_LIKELY(!phc->rtcp && !rtp_payload(&phc->mp.rtp, &phc->mp.payload, &phc->s))) { @@ -1269,7 +1270,7 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) if (G_LIKELY(phc->out_srtp != NULL)) __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtp->ssrc, &phc->mp.ssrc_in, - &phc->mp.ssrc_out, phc->call->ssrc_hash); + &phc->mp.ssrc_out, phc->mp.call->ssrc_hash); // check the payload type // XXX redundant between SSRC handling and codec_handler stuff -> combine @@ -1279,11 +1280,11 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) // XXX convert to array? or keep last pointer? // XXX yet another hash table per payload type -> combine - struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type); + struct rtp_stats *rtp_s = g_hash_table_lookup(phc->mp.stream->rtp_stats, &phc->payload_type); if (!rtp_s) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, "RTP packet with unknown payload type %u received", phc->payload_type); - atomic64_inc(&phc->stream->stats.errors); + atomic64_inc(&phc->mp.stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); } @@ -1295,7 +1296,7 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) else if (phc->rtcp && !rtcp_payload(&phc->mp.rtcp, NULL, &phc->s)) { if (G_LIKELY(phc->out_srtp != NULL)) __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtcp->ssrc, &phc->mp.ssrc_in, - &phc->mp.ssrc_out, phc->call->ssrc_hash); + &phc->mp.ssrc_out, phc->mp.call->ssrc_hash); } } @@ -1320,7 +1321,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc) * 1 = forward and push update to redis */ int ret = 0; if (phc->decrypt_func) - ret = phc->decrypt_func(&phc->s, phc->in_srtp, phc->sfd, &phc->fsin, &phc->tv, phc->mp.ssrc_in); + ret = phc->decrypt_func(&phc->s, phc->in_srtp, phc->mp.sfd, &phc->mp.fsin, &phc->mp.tv, phc->mp.ssrc_in); mutex_unlock(&phc->in_srtp->in_lock); @@ -1361,50 +1362,50 @@ static int media_packet_address_check(struct packet_handler_ctx *phc) struct endpoint endpoint; int ret = 0; - mutex_lock(&phc->stream->in_lock); + mutex_lock(&phc->mp.stream->in_lock); /* we're OK to (potentially) use the source address of this packet as destination * in the other direction. */ /* if the other side hasn't been signalled yet, just forward the packet */ - if (!PS_ISSET(phc->stream, FILLED)) { - __C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); + if (!PS_ISSET(phc->mp.stream, FILLED)) { + __C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); goto out; } /* do not pay attention to source addresses of incoming packets for asymmetric streams */ - if (MEDIA_ISSET(phc->media, ASYMMETRIC)) - PS_SET(phc->stream, CONFIRMED); + if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC)) + PS_SET(phc->mp.stream, CONFIRMED); /* confirm sink for unidirectional streams in order to kernelize */ - if (MEDIA_ISSET(phc->media, UNIDIRECTIONAL)) + if (MEDIA_ISSET(phc->mp.media, UNIDIRECTIONAL)) PS_SET(phc->sink, CONFIRMED); /* if we have already updated the endpoint in the past ... */ - if (PS_ISSET(phc->stream, CONFIRMED)) { + if (PS_ISSET(phc->mp.stream, CONFIRMED)) { /* see if we need to compare the source address with the known endpoint */ - if (PS_ISSET2(phc->stream, STRICT_SOURCE, MEDIA_HANDOVER)) { - endpoint = phc->fsin; - mutex_lock(&phc->stream->out_lock); + if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) { + endpoint = phc->mp.fsin; + mutex_lock(&phc->mp.stream->out_lock); - int tmp = memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint)); - if (tmp && PS_ISSET(phc->stream, MEDIA_HANDOVER)) { + int tmp = memcmp(&endpoint, &phc->mp.stream->endpoint, sizeof(endpoint)); + if (tmp && PS_ISSET(phc->mp.stream, MEDIA_HANDOVER)) { /* out_lock remains locked */ - ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(&phc->fsin)); + ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(&phc->mp.fsin)); phc->unkernelize = 1; phc->update = 1; - phc->stream->endpoint = phc->fsin; + phc->mp.stream->endpoint = phc->mp.fsin; goto update_addr; } - mutex_unlock(&phc->stream->out_lock); + mutex_unlock(&phc->mp.stream->out_lock); - if (tmp && PS_ISSET(phc->stream, STRICT_SOURCE)) { + if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) { ilog(LOG_INFO, "Drop due to strict-source attribute; got %s:%d, expected %s:%d", sockaddr_print_buf(&endpoint.address), endpoint.port, - sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); - atomic64_inc(&phc->stream->stats.errors); + sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); + atomic64_inc(&phc->mp.stream->stats.errors); ret = -1; goto out; } @@ -1415,72 +1416,72 @@ static int media_packet_address_check(struct packet_handler_ctx *phc) /* wait at least 3 seconds after last signal before committing to a particular * endpoint address */ - if (!phc->call->last_signal || rtpe_now.tv_sec <= phc->call->last_signal + 3) + if (!phc->mp.call->last_signal || rtpe_now.tv_sec <= phc->mp.call->last_signal + 3) goto update_peerinfo; phc->kernelize = 1; phc->update = 1; - ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(&phc->fsin)); + ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(&phc->mp.fsin)); - PS_SET(phc->stream, CONFIRMED); + PS_SET(phc->mp.stream, CONFIRMED); update_peerinfo: - mutex_lock(&phc->stream->out_lock); - endpoint = phc->stream->endpoint; - phc->stream->endpoint = phc->fsin; - if (memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint))) + mutex_lock(&phc->mp.stream->out_lock); + endpoint = phc->mp.stream->endpoint; + phc->mp.stream->endpoint = phc->mp.fsin; + if (memcmp(&endpoint, &phc->mp.stream->endpoint, sizeof(endpoint))) phc->update = 1; update_addr: - mutex_unlock(&phc->stream->out_lock); + mutex_unlock(&phc->mp.stream->out_lock); /* check the destination address of the received packet against what we think our * local interface to use is */ - if (phc->stream->selected_sfd && phc->sfd != phc->stream->selected_sfd) { - ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&phc->sfd->socket.local)); - phc->stream->selected_sfd = phc->sfd; + if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) { + ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&phc->mp.sfd->socket.local)); + phc->mp.stream->selected_sfd = phc->mp.sfd; phc->update = 1; } out: - mutex_unlock(&phc->stream->in_lock); + mutex_unlock(&phc->mp.stream->in_lock); return ret; } static void media_packet_kernel_check(struct packet_handler_ctx *phc) { - if (PS_ISSET(phc->stream, NO_KERNEL_SUPPORT)) { - __C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&phc->stream->endpoint.address), phc->stream->endpoint.port); + if (PS_ISSET(phc->mp.stream, NO_KERNEL_SUPPORT)) { + __C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&phc->mp.stream->endpoint.address), phc->mp.stream->endpoint.port); return; } - if (!PS_ISSET(phc->stream, CONFIRMED)) { - __C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); + if (!PS_ISSET(phc->mp.stream, CONFIRMED)) { + __C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); return; } if (!phc->sink) { - __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); + __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); return; } if (!PS_ISSET(phc->sink, CONFIRMED)) { __C_DBG("sink not CONFIRMED for stream %s:%d", - sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); + sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); return; } if (!PS_ISSET(phc->sink, FILLED)) { - __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); + __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); return; } - kernelize(phc->stream); + kernelize(phc->mp.stream); } @@ -1490,13 +1491,14 @@ static int do_rtcp(struct packet_handler_ctx *phc) { // XXX rewrite/consume for transcoding GQueue rtcp_list = G_QUEUE_INIT; - if (rtcp_parse(&rtcp_list, &phc->s, phc->sfd, &phc->fsin, &phc->tv)) + if (rtcp_parse(&rtcp_list, &phc->mp)) goto out; if (phc->rtcp_filter) - if (phc->rtcp_filter(&phc->s, &rtcp_list)) + if (phc->rtcp_filter(&phc->mp, &rtcp_list)) goto out; - codec_add_raw_packet(&phc->mp, &phc->s); + // queue for output + codec_add_raw_packet(&phc->mp); ret = 0; out: @@ -1529,20 +1531,20 @@ static int stream_packet(struct packet_handler_ctx *phc) { * always holds true */ int ret = 0, handler_ret = 0; - phc->call = phc->sfd->call; + phc->mp.call = phc->mp.sfd->call; - rwlock_lock_r(&phc->call->master_lock); + rwlock_lock_r(&phc->mp.call->master_lock); - phc->stream = phc->sfd->stream; - if (G_UNLIKELY(!phc->stream)) + phc->mp.stream = phc->mp.sfd->stream; + if (G_UNLIKELY(!phc->mp.stream)) goto out; - __C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address), - phc->stream->endpoint.port); + __C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); - phc->media = phc->stream->media; + phc->mp.media = phc->mp.stream->media; - if (!phc->stream->selected_sfd) + if (!phc->mp.stream->selected_sfd) goto out; @@ -1556,7 +1558,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { #if RTP_LOOP_PROTECT - if (MEDIA_ISSET(phc->media, LOOP_CHECK)) { + if (MEDIA_ISSET(phc->mp.media, LOOP_CHECK)) { if (media_loop_detect(phc)) goto out; } @@ -1575,8 +1577,8 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (G_UNLIKELY(!phc->sink || !phc->sink->selected_sfd || !phc->out_srtp || !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd)) { - ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(&phc->fsin)); - atomic64_inc(&phc->stream->stats.errors); + ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(&phc->mp.fsin)); + atomic64_inc(&phc->mp.stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); goto out; } @@ -1585,19 +1587,21 @@ static int stream_packet(struct packet_handler_ctx *phc) { handler_ret = media_packet_decrypt(phc); // If recording pcap dumper is set, then we record the call. - if (phc->call->recording) - dump_packet(phc->call->recording, phc->stream, &phc->s); + if (phc->mp.call->recording) + dump_packet(phc->mp.call->recording, phc->mp.stream, &phc->s); + + // ready to process + + phc->mp.raw = phc->s; - // RTCP detection is further up, so we could make this determination there - if (phc->mp.rtcp) { + if (phc->rtcp) { if (do_rtcp(phc)) goto drop; } else { - struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type); + struct codec_handler *transcoder = codec_handler_get(phc->mp.media, phc->payload_type); // this transfers the packet from 's' to 'packets_out' - phc->mp.raw = phc->s; - if (transcoder->func(transcoder, phc->media, &phc->mp)) + if (transcoder->func(transcoder, phc->mp.media, &phc->mp)) goto drop; } @@ -1605,7 +1609,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { handler_ret = media_packet_encrypt(phc); if (phc->update) // for RTCP packet index updates - unkernelize(phc->stream); + unkernelize(phc->mp.stream); int address_check = media_packet_address_check(phc); @@ -1646,7 +1650,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (ret == -1) { ret = -errno; ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); - atomic64_inc(&phc->stream->stats.errors); + atomic64_inc(&phc->mp.stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); goto out; } @@ -1654,20 +1658,20 @@ static int stream_packet(struct packet_handler_ctx *phc) { drop: ret = 0; // XXX separate stats for received/sent - atomic64_inc(&phc->stream->stats.packets); - atomic64_add(&phc->stream->stats.bytes, phc->s.len); - atomic64_set(&phc->stream->last_packet, rtpe_now.tv_sec); + atomic64_inc(&phc->mp.stream->stats.packets); + atomic64_add(&phc->mp.stream->stats.bytes, phc->s.len); + atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec); atomic64_inc(&rtpe_statsps.packets); atomic64_add(&rtpe_statsps.bytes, phc->s.len); out: if (phc->unkernelize) { - stream_unconfirm(phc->stream); - stream_unconfirm(phc->stream->rtp_sink); - stream_unconfirm(phc->stream->rtcp_sink); + stream_unconfirm(phc->mp.stream); + stream_unconfirm(phc->mp.stream->rtp_sink); + stream_unconfirm(phc->mp.stream->rtcp_sink); } - rwlock_unlock_r(&phc->call->master_lock); + rwlock_unlock_r(&phc->mp.call->master_lock); g_queue_clear_full(&phc->mp.packets_out, codec_packet_free); @@ -1698,10 +1702,10 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { struct packet_handler_ctx phc; ZERO(phc); - phc.sfd = sfd; + phc.mp.sfd = sfd; ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, - &phc.fsin, &phc.tv); + &phc.mp.fsin, &phc.mp.tv); if (ret < 0) { if (errno == EINTR) diff --git a/daemon/media_socket.h b/daemon/media_socket.h index 0eeab20a4..a4405d6ba 100644 --- a/daemon/media_socket.h +++ b/daemon/media_socket.h @@ -15,7 +15,9 @@ -typedef int rtcp_filter_func(str *, GQueue *); +struct media_packet; + +typedef int rtcp_filter_func(struct media_packet *, GQueue *); @@ -73,6 +75,14 @@ struct stream_fd { }; struct media_packet { str raw; + + endpoint_t fsin; // source address of received packet + struct timeval tv; // timestamp when packet was received + struct stream_fd *sfd; // fd which received the packet + struct call *call; // sfd->call + struct packet_stream *stream; // sfd->stream + struct call_media *media; // stream->media + struct rtp_header *rtp; struct rtcp_packet *rtcp; struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp diff --git a/daemon/rtcp.c b/daemon/rtcp.c index c44ffcba0..00381b68b 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -226,9 +226,7 @@ struct rtcp_chain_element { // context to hold state variables struct rtcp_process_ctx { // input - struct call *call; - struct call_media *media; - const struct timeval *received; + struct media_packet *mp; // handler vars union { @@ -252,10 +250,10 @@ struct rtcp_process_ctx { struct rtcp_handler { void (*init)(struct rtcp_process_ctx *); void (*start)(struct rtcp_process_ctx *, struct call *); - void (*common)(struct rtcp_process_ctx *, const struct rtcp_packet *); - void (*sr)(struct rtcp_process_ctx *, const struct sender_report_packet *); + void (*common)(struct rtcp_process_ctx *, struct rtcp_packet *); + void (*sr)(struct rtcp_process_ctx *, struct sender_report_packet *); void (*rr_list_start)(struct rtcp_process_ctx *, const struct rtcp_packet *); - void (*rr)(struct rtcp_process_ctx *, const struct report_block *); + void (*rr)(struct rtcp_process_ctx *, struct report_block *); void (*rr_list_end)(struct rtcp_process_ctx *); //void (*xr)(struct rtcp_process_ctx *, const struct rtcp_packet *, str *); void (*sdes_list_start)(struct rtcp_process_ctx *, const struct source_description_packet *); @@ -276,6 +274,7 @@ struct rtcp_handlers { const struct rtcp_handler *scratch, *mos, + *transcode, *logging, *homer; }; @@ -283,25 +282,35 @@ struct rtcp_handlers { // log handler function prototypes // scratch area (prepare/parse packet) -static void scratch_common(struct rtcp_process_ctx *, const struct rtcp_packet *); -static void scratch_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); -static void scratch_rr(struct rtcp_process_ctx *, const struct report_block *); +static void scratch_common(struct rtcp_process_ctx *, struct rtcp_packet *); +static void scratch_sr(struct rtcp_process_ctx *, struct sender_report_packet *); +static void scratch_rr(struct rtcp_process_ctx *, struct report_block *); static void scratch_xr_rr_time(struct rtcp_process_ctx *, const struct xr_rb_rr_time *); static void scratch_xr_dlrr(struct rtcp_process_ctx *, const struct xr_rb_dlrr *); static void scratch_xr_voip_metrics(struct rtcp_process_ctx *, const struct xr_rb_voip_metrics *); // MOS calculation / stats -static void mos_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); -static void mos_rr(struct rtcp_process_ctx *, const struct report_block *); +static void mos_sr(struct rtcp_process_ctx *, struct sender_report_packet *); +static void mos_rr(struct rtcp_process_ctx *, struct report_block *); static void mos_xr_rr_time(struct rtcp_process_ctx *, const struct xr_rb_rr_time *); static void mos_xr_dlrr(struct rtcp_process_ctx *, const struct xr_rb_dlrr *); static void mos_xr_voip_metrics(struct rtcp_process_ctx *, const struct xr_rb_voip_metrics *); +// RTCP translation for transcoding +static void transcode_common(struct rtcp_process_ctx *, struct rtcp_packet *); +static void transcode_rr(struct rtcp_process_ctx *, struct report_block *); +static void transcode_sr(struct rtcp_process_ctx *, struct sender_report_packet *); + +// wrappers to enable dynamic transcoding +static void transcode_common_wrap(struct rtcp_process_ctx *, struct rtcp_packet *); +static void transcode_rr_wrap(struct rtcp_process_ctx *, struct report_block *); +static void transcode_sr_wrap(struct rtcp_process_ctx *, struct sender_report_packet *); + // homer functions static void homer_init(struct rtcp_process_ctx *); -static void homer_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); +static void homer_sr(struct rtcp_process_ctx *, struct sender_report_packet *); static void homer_rr_list_start(struct rtcp_process_ctx *, const struct rtcp_packet *); -static void homer_rr(struct rtcp_process_ctx *, const struct report_block *); +static void homer_rr(struct rtcp_process_ctx *, struct report_block *); static void homer_rr_list_end(struct rtcp_process_ctx *); static void homer_sdes_list_start(struct rtcp_process_ctx *, const struct source_description_packet *); static void homer_sdes_item(struct rtcp_process_ctx *, const struct sdes_chunk *, const struct sdes_item *, @@ -313,10 +322,10 @@ static void homer_finish(struct rtcp_process_ctx *, struct call *, const endpoin // syslog functions static void logging_init(struct rtcp_process_ctx *); static void logging_start(struct rtcp_process_ctx *, struct call *); -static void logging_common(struct rtcp_process_ctx *, const struct rtcp_packet *); +static void logging_common(struct rtcp_process_ctx *, struct rtcp_packet *); static void logging_sdes_list_start(struct rtcp_process_ctx *, const struct source_description_packet *); -static void logging_sr(struct rtcp_process_ctx *, const struct sender_report_packet *); -static void logging_rr(struct rtcp_process_ctx *, const struct report_block *); +static void logging_sr(struct rtcp_process_ctx *, struct sender_report_packet *); +static void logging_rr(struct rtcp_process_ctx *, struct report_block *); static void logging_xr_rb(struct rtcp_process_ctx *, const struct xr_report_block *); static void logging_xr_rr_time(struct rtcp_process_ctx *, const struct xr_rb_rr_time *); static void logging_xr_dlrr(struct rtcp_process_ctx *, const struct xr_rb_dlrr *); @@ -343,6 +352,16 @@ static struct rtcp_handler mos_handlers = { .xr_dlrr = mos_xr_dlrr, .xr_voip_metrics = mos_xr_voip_metrics, }; +static struct rtcp_handler transcode_handlers = { + .common = transcode_common, + .rr = transcode_rr, + .sr = transcode_sr, +}; +static struct rtcp_handler transcode_handlers_wrap = { + .common = transcode_common_wrap, + .rr = transcode_rr_wrap, + .sr = transcode_sr_wrap, +}; static struct rtcp_handler log_handlers = { .init = logging_init, .start = logging_start, @@ -374,6 +393,7 @@ static struct rtcp_handler homer_handlers = { static struct rtcp_handlers rtcp_handlers = { .scratch = &scratch_handlers, .mos = &mos_handlers, + .transcode = &transcode_handlers_wrap, // remainder is variable }; @@ -383,11 +403,13 @@ static struct rtcp_handlers rtcp_handlers = { rtcp_handlers.type->func(log_ctx, ##__VA_ARGS__); \ } while (0) // macro to call all function handlers in one go +// order is important #define CAH(func, ...) do { \ - CH(func, scratch, ##__VA_ARGS__); \ - CH(func, mos, ##__VA_ARGS__); \ - CH(func, logging, ##__VA_ARGS__); \ - CH(func, homer, ##__VA_ARGS__); \ + CH(func, scratch, ##__VA_ARGS__); /* first parse out the values into scratch area */ \ + CH(func, mos, ##__VA_ARGS__); /* process for MOS calculation */ \ + CH(func, logging, ##__VA_ARGS__); /* log packets to syslog */ \ + CH(func, homer, ##__VA_ARGS__); /* send contents to homer */ \ + CH(func, transcode, ##__VA_ARGS__); /* translate for transcoding */ \ } while (0) @@ -448,6 +470,11 @@ static const int min_xr_packet_sizes[] = { +struct rtcp_handler *rtcp_transcode_handler = &transcode_handlers; + + + + static struct rtcp_header *rtcp_length_check(str *s, size_t min_len, unsigned int *len_p) { @@ -617,15 +644,12 @@ void rtcp_list_free(GQueue *q) { -int rtcp_parse(GQueue *q, const str *_s, struct stream_fd *sfd, const endpoint_t *src, - const struct timeval *tv) -{ +int rtcp_parse(GQueue *q, struct media_packet *mp) { struct rtcp_header *hdr; struct rtcp_chain_element *el; rtcp_handler_func func; - str s = *_s; - struct call *c = sfd->call; - struct call_media *m = sfd->stream->media; + str s = mp->raw; + struct call *c = mp->call; struct rtcp_process_ctx log_ctx_s, *log_ctx; unsigned int len; @@ -633,9 +657,7 @@ int rtcp_parse(GQueue *q, const str *_s, struct stream_fd *sfd, const endpoint_t int min_packet_size; ZERO(log_ctx_s); - log_ctx_s.call = c; - log_ctx_s.media = m; - log_ctx_s.received = tv; + log_ctx_s.mp = mp; log_ctx = &log_ctx_s; @@ -687,25 +709,25 @@ next: abort(); } - CAH(finish, c, src, &sfd->socket.local, tv); + CAH(finish, c, &mp->fsin, &mp->sfd->socket.local, &mp->tv); CAH(destroy); return 0; error: - CAH(finish, c, src, &sfd->socket.local, tv); + CAH(finish, c, &mp->fsin, &mp->sfd->socket.local, &mp->tv); CAH(destroy); rtcp_list_free(q); return -1; } -int rtcp_avpf2avp_filter(str *s, GQueue *rtcp_list) { +int rtcp_avpf2avp_filter(struct media_packet *mp, GQueue *rtcp_list) { GList *l; struct rtcp_chain_element *el; void *start; unsigned int removed, left; - left = s->len; + left = mp->raw.len; removed = 0; for (l = rtcp_list->head; l; l = l->next) { el = l->data; @@ -724,8 +746,8 @@ int rtcp_avpf2avp_filter(str *s, GQueue *rtcp_list) { } } - s->len -= removed; - if (!s->len) + mp->raw.len -= removed; + if (!mp->raw.len) return -1; return 0; @@ -884,10 +906,10 @@ static void str_sanitize(GString *s) { -static void scratch_common(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) { +static void scratch_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) { ctx->scratch_common_ssrc = htonl(common->ssrc); } -static void scratch_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { +static void scratch_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) { ctx->scratch.rr = (struct ssrc_receiver_report) { .from = ctx->scratch_common_ssrc, .ssrc = htonl(rr->ssrc), @@ -899,7 +921,7 @@ static void scratch_rr(struct rtcp_process_ctx *ctx, const struct report_block * }; ctx->scratch.rr.packets_lost = (rr->number_lost[0] << 16) | (rr->number_lost[1] << 8) | rr->number_lost[2]; } -static void scratch_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { +static void scratch_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { ctx->scratch.sr = (struct ssrc_sender_report) { .ssrc = ctx->scratch_common_ssrc, .ntp_msw = ntohl(sr->ntp_msw), @@ -958,7 +980,7 @@ static void homer_init(struct rtcp_process_ctx *ctx) { ctx->json = g_string_new("{ "); ctx->json_init_len = ctx->json->len; } -static void homer_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { +static void homer_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { g_string_append_printf(ctx->json, "\"sender_information\":{\"ntp_timestamp_sec\":%u," "\"ntp_timestamp_usec\":%u,\"octets\":%u,\"rtp_timestamp\":%u, \"packets\":%u},", ctx->scratch.sr.ntp_msw, @@ -973,7 +995,7 @@ static void homer_rr_list_start(struct rtcp_process_ctx *ctx, const struct rtcp_ common->header.pt, common->header.count); } -static void homer_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { +static void homer_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) { g_string_append_printf(ctx->json, "{\"source_ssrc\":%u," "\"highest_seq_no\":%u,\"fraction_lost\":%u,\"ia_jitter\":%u," "\"packets_lost\":%u,\"lsr\":%u,\"dlsr\":%u},", @@ -1056,7 +1078,7 @@ static void logging_start(struct rtcp_process_ctx *ctx, struct call *c) { g_string_append_printf(ctx->log, "["STR_FORMAT"] ", STR_FMT(&c->callid)); ctx->log_init_len = ctx->log->len; } -static void logging_common(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) { +static void logging_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) { g_string_append_printf(ctx->log,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ", common->header.version, common->header.p, @@ -1073,7 +1095,7 @@ static void logging_sdes_list_start(struct rtcp_process_ctx *ctx, const struct s sdes->header.pt, ntohs(sdes->header.length)); } -static void logging_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { +static void logging_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { g_string_append_printf(ctx->log,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, " \ "sender_bytes=%u, ", ctx->scratch.sr.ntp_msw, @@ -1082,7 +1104,7 @@ static void logging_sr(struct rtcp_process_ctx *ctx, const struct sender_report_ ctx->scratch.sr.packet_count, ctx->scratch.sr.octet_count); } -static void logging_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { +static void logging_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) { g_string_append_printf(ctx->log,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ", ctx->scratch.rr.ssrc, rr->fraction_lost, @@ -1179,20 +1201,80 @@ static void logging_destroy(struct rtcp_process_ctx *ctx) { -static void mos_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) { - ssrc_sender_report(ctx->media, &ctx->scratch.sr, ctx->received); +static void mos_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { + ssrc_sender_report(ctx->mp->media, &ctx->scratch.sr, &ctx->mp->tv); } -static void mos_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) { - ssrc_receiver_report(ctx->media, &ctx->scratch.rr, ctx->received); +static void mos_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) { + ssrc_receiver_report(ctx->mp->media, &ctx->scratch.rr, &ctx->mp->tv); } static void mos_xr_rr_time(struct rtcp_process_ctx *ctx, const struct xr_rb_rr_time *rr) { - ssrc_receiver_rr_time(ctx->media, &ctx->scratch.xr_rr, ctx->received); + ssrc_receiver_rr_time(ctx->mp->media, &ctx->scratch.xr_rr, &ctx->mp->tv); } static void mos_xr_dlrr(struct rtcp_process_ctx *ctx, const struct xr_rb_dlrr *dlrr) { - ssrc_receiver_dlrr(ctx->media, &ctx->scratch.xr_dlrr, ctx->received); + ssrc_receiver_dlrr(ctx->mp->media, &ctx->scratch.xr_dlrr, &ctx->mp->tv); } static void mos_xr_voip_metrics(struct rtcp_process_ctx *ctx, const struct xr_rb_voip_metrics *rb_voip_mtc) { - ssrc_voip_metrics(ctx->media, &ctx->scratch.xr_vm, ctx->received); + ssrc_voip_metrics(ctx->mp->media, &ctx->scratch.xr_vm, &ctx->mp->tv); +} + + + + +static void transcode_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) { + assert(ctx->scratch_common_ssrc == ctx->mp->ssrc_in->parent->h.ssrc); + common->ssrc = htonl(ctx->mp->ssrc_in->ssrc_map_out); +} +static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) { + assert(ctx->scratch.rr.from == ctx->mp->ssrc_in->parent->h.ssrc); + + // reverse SSRC mapping + struct ssrc_ctx *map_ctx = get_ssrc_ctx(ctx->scratch.rr.ssrc, ctx->mp->call->ssrc_hash, + SSRC_DIR_OUTPUT); + rr->ssrc = htonl(map_ctx->ssrc_map_out); + +// ilog(LOG_DEBUG, "transcode_rr: from ssrc %x about %x " +// "ssrc_in %x ssrc_in-map %x ssrc_out %x ssrc_out-map %x " +// "map_ctx %x map_ctx-map %x", +// ctx->scratch.rr.from, +// ctx->scratch.rr.ssrc, +// ctx->mp->ssrc_in->parent->h.ssrc, +// ctx->mp->ssrc_in->ssrc_map_out, +// ctx->mp->ssrc_out->parent->h.ssrc, +// ctx->mp->ssrc_out->ssrc_map_out, +// map_ctx->parent->h.ssrc, +// map_ctx->ssrc_map_out); + + // translate ctx->scratch.rr.from to ctx->mp->ssrc_in->ssrc_map_out - done by transcode_common +} +static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { + assert(ctx->scratch.sr.ssrc == ctx->mp->ssrc_in->parent->h.ssrc); + +// ilog(LOG_DEBUG, "transcode_sr: ssrc %x ssrc_in %x ssrc_in-map %x ssrc_out %x ssrc_out-map %x", +// ctx->scratch.sr.ssrc, +// ctx->mp->ssrc_in->parent->h.ssrc, +// ctx->mp->ssrc_in->ssrc_map_out, +// ctx->mp->ssrc_out->parent->h.ssrc, +// ctx->mp->ssrc_out->ssrc_map_out); +// + // translate ctx->scratch.sr.ssrc to ctx->mp->ssrc_in->ssrc_map_out - done by transcode_common +} + + + +static void transcode_common_wrap(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) { + if (!ctx->mp->media->rtcp_handler) + return; + ctx->mp->media->rtcp_handler->common(ctx, common); +} +static void transcode_rr_wrap(struct rtcp_process_ctx *ctx, struct report_block *rr) { + if (!ctx->mp->media->rtcp_handler) + return; + ctx->mp->media->rtcp_handler->rr(ctx, rr); +} +static void transcode_sr_wrap(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { + if (!ctx->mp->media->rtcp_handler) + return; + ctx->mp->media->rtcp_handler->sr(ctx, sr); } diff --git a/daemon/rtcp.h b/daemon/rtcp.h index 480dd8ed3..ecbce3b8d 100644 --- a/daemon/rtcp.h +++ b/daemon/rtcp.h @@ -10,15 +10,25 @@ struct crypto_context; struct rtcp_packet; struct ssrc_ctx; +struct rtcp_handler; +struct rtcp_parse_ctx { + struct call *call; + struct call_media *media; + const struct timeval *received; +}; + + +extern struct rtcp_handler *rtcp_transcode_handler; + int rtcp_avp2savp(str *, struct crypto_context *, struct ssrc_ctx *); int rtcp_savp2avp(str *, struct crypto_context *, struct ssrc_ctx *); int rtcp_payload(struct rtcp_packet **out, str *p, const str *s); -int rtcp_parse(GQueue *q, const str *, struct stream_fd *sfd, const endpoint_t *, const struct timeval *); +int rtcp_parse(GQueue *q, struct media_packet *); void rtcp_list_free(GQueue *q); rtcp_filter_func rtcp_avpf2avp_filter; diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 3edce4467..8c6780d67 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -102,7 +102,7 @@ restart: while (G_UNLIKELY(ht->q.length > 20)) { // arbitrary limit struct ssrc_entry *old_ent = g_queue_pop_head(&ht->q); - ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %u) - deleting SSRC %u", + ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %x) - deleting SSRC %x", ssrc, old_ent->ssrc); g_hash_table_remove(ht->ht, &old_ent->ssrc); g_atomic_pointer_set(&ht->cache, NULL); @@ -236,7 +236,7 @@ found:; mutex_unlock(&e->h.lock); rtt -= (long long) delay * 1000000LL / 65536LL; - ilog(LOG_DEBUG, "Calculated round-trip time for %u is %lli us", ssrc, rtt); + ilog(LOG_DEBUG, "Calculated round-trip time for %x is %lli us", ssrc, rtt); if (rtt <= 0 || rtt > 10000000) { ilog(LOG_DEBUG, "Invalid RTT - discarding"); @@ -260,7 +260,7 @@ void ssrc_sender_report(struct call_media *m, const struct ssrc_sender_report *s seri->report = *sr; - ilog(LOG_DEBUG, "SR from %u: RTP TS %u PC %u OC %u NTP TS %u/%u=%f", + ilog(LOG_DEBUG, "SR from %x: RTP TS %u PC %u OC %u NTP TS %u/%u=%f", sr->ssrc, sr->timestamp, sr->packet_count, sr->octet_count, sr->ntp_msw, sr->ntp_lsw, seri->time_item.ntp_ts); @@ -271,7 +271,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor { struct call *c = m->call; - ilog(LOG_DEBUG, "RR from %u about %u: FL %u TL %u HSR %u J %u LSR %u DLSR %u", + ilog(LOG_DEBUG, "RR from %x about %x: FL %u TL %u HSR %u J %u LSR %u DLSR %u", rr->from, rr->ssrc, rr->fraction_lost, rr->packets_lost, rr->high_seq_received, rr->jitter, rr->lsr, rr->dlsr); @@ -299,7 +299,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor goto out_nl; } unsigned int jitter = rpt->clock_rate ? (rr->jitter * 1000 / rpt->clock_rate) : rr->jitter; - ilog(LOG_DEBUG, "Calculated jitter for %u is %u ms", rr->ssrc, jitter); + ilog(LOG_DEBUG, "Calculated jitter for %x is %u ms", rr->ssrc, jitter); ilog(LOG_DEBUG, "Adding opposide side RTT of %u us", other_e->last_rtt); @@ -312,7 +312,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor }; mos_calc(ssb); - ilog(LOG_DEBUG, "Calculated MOS from RR for %u is %.1f", rr->from, (double) ssb->mos / 10.0); + ilog(LOG_DEBUG, "Calculated MOS from RR for %x is %.1f", rr->from, (double) ssb->mos / 10.0); // got a new stats block, add it to reporting ssrc mutex_lock(&other_e->h.lock); @@ -358,7 +358,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r if (!srti) return; - ilog(LOG_DEBUG, "XR RR TIME from %u: NTP TS %u/%u=%f", + ilog(LOG_DEBUG, "XR RR TIME from %x: NTP TS %u/%u=%f", rr->ssrc, rr->ntp_msw, rr->ntp_lsw, srti->time_item.ntp_ts); @@ -368,7 +368,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr, const struct timeval *tv) { - ilog(LOG_DEBUG, "XR DLRR from %u about %u: LRR %u DLRR %u", + ilog(LOG_DEBUG, "XR DLRR from %x about %x: LRR %u DLRR %u", dlrr->from, dlrr->ssrc, dlrr->lrr, dlrr->dlrr); @@ -379,7 +379,7 @@ void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr, void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics *vm, const struct timeval *tv) { - ilog(LOG_DEBUG, "XR VM from %u about %u: LR %u DR %u BD %u GD %u BDu %u GDu %u RTD %u " + ilog(LOG_DEBUG, "XR VM from %x about %x: LR %u DR %u BD %u GD %u BDu %u GDu %u RTD %u " "ESD %u SL %u NL %u RERL %u GMin %u R %u eR %u MOSL %u MOSC %u RX %u " "JBn %u JBm %u JBam %u", vm->from, vm->ssrc, diff --git a/perl/NGCP/Rtpclient/RTP.pm b/perl/NGCP/Rtpclient/RTP.pm index 7889b8194..5cf0cd3d7 100644 --- a/perl/NGCP/Rtpclient/RTP.pm +++ b/perl/NGCP/Rtpclient/RTP.pm @@ -66,7 +66,7 @@ sub input { my ($vpxcc, $pt, $seq, $ts, $ssrc, $payload) = unpack("CCnNN a*", $packet); $vpxcc == 0x80 or die; - $pt == 0 or die; + #$pt == 0 or die; my $remote = ($self->{other_ssrcs}->{$ssrc} //= { ssrc => $ssrc, diff --git a/perl/NGCP/Rtpengine/Test.pm b/perl/NGCP/Rtpengine/Test.pm index 5808614f2..a439ed219 100644 --- a/perl/NGCP/Rtpengine/Test.pm +++ b/perl/NGCP/Rtpengine/Test.pm @@ -372,10 +372,11 @@ sub _input { $$input eq $exp or die; } $self->{media_packets_received}->[$component]++; - $$input = ''; $self->{client_components}->[$component] and - $self->{client_components}->[$component]->input($exp); + $self->{client_components}->[$component]->input($$input); + + $$input = ''; } sub _timer {