diff --git a/daemon/codec.c b/daemon/codec.c index 25d01b566..d1ad62667 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -5,30 +5,42 @@ #include "log.h" #include "rtplib.h" #include "codeclib.h" +#include "ssrc.h" -static codec_handler_func handler_func_stub; +struct codec_ssrc_handler { + struct ssrc_entry h; // must be first + mutex_t lock; + packet_sequencer_t sequencer; + decoder_t *decoder; +}; +struct transcode_packet { + seq_packet_t p; // must be first + unsigned long ts; + str *payload; +}; + + +static codec_handler_func handler_func_passthrough; static codec_handler_func handler_func_transcode; +static struct ssrc_entry *__ssrc_handler_new(void *p); +static void __ssrc_handler_free(struct codec_ssrc_handler *p); + +static void __transcode_packet_free(struct transcode_packet *); + static struct codec_handler codec_handler_stub = { - .rtp_payload_type = -1, - .func = handler_func_stub, + .source_pt.payload_type = -1, + .func = handler_func_passthrough, }; static void __handler_shutdown(struct codec_handler *handler) { - if (handler->decoder) - decoder_close(handler->decoder); - handler->decoder = NULL; -} - -static void __make_stub(struct codec_handler *handler) { - __handler_shutdown(handler); - handler->func = handler_func_stub; + free_ssrc_hash(&handler->ssrc_hash); } static void __codec_handler_free(void *pp) { @@ -37,26 +49,38 @@ static void __codec_handler_free(void *pp) { g_slice_free1(sizeof(*h), h); } +static struct codec_handler *__handler_new(int pt) { + struct codec_handler *handler = g_slice_alloc0(sizeof(*handler)); + handler->source_pt.payload_type = pt; + return handler; +} + +static void __make_passthrough(struct codec_handler *handler) { + __handler_shutdown(handler); + handler->func = handler_func_passthrough; +} + static void __make_transcoder(struct codec_handler *handler, struct rtp_payload_type *source, struct rtp_payload_type *dest) { assert(source->codec_def != NULL); assert(dest->codec_def != NULL); + assert(source->payload_type == handler->source_pt.payload_type); __handler_shutdown(handler); + handler->source_pt = *source; + handler->dest_pt = *dest; handler->func = handler_func_transcode; - handler->decoder = decoder_new_fmt(source->codec_def, source->clock_rate, 1, 0); - if (!handler->decoder) - goto err; + + handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, (ssrc_free_func_t) __ssrc_handler_free, + handler); ilog(LOG_DEBUG, "Created transcode context for '" STR_FORMAT "' -> '" STR_FORMAT "'", STR_FMT(&source->encoding), STR_FMT(&dest->encoding)); return; -err: - __make_stub(handler); } // call must be locked in W @@ -93,9 +117,8 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink) handler = g_hash_table_lookup(receiver->codec_handlers, &pt->payload_type); if (!handler) { ilog(LOG_DEBUG, "Creating codec handler for " STR_FORMAT, STR_FMT(&pt->encoding)); - handler = g_slice_alloc0(sizeof(*handler)); - handler->rtp_payload_type = pt->payload_type; - g_hash_table_insert(receiver->codec_handlers, &handler->rtp_payload_type, + handler = __handler_new(pt->payload_type); + g_hash_table_insert(receiver->codec_handlers, &handler->source_pt.payload_type, handler); } @@ -105,7 +128,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink) // we default to forwarding without transcoding. if (!pref_dest_codec) { ilog(LOG_DEBUG, "No known/supported sink codec for " STR_FORMAT, STR_FMT(&pt->encoding)); - __make_stub(handler); + __make_passthrough(handler); continue; } @@ -113,7 +136,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink) // the sink supports this codec. forward without transcoding. // XXX check format parameters as well ilog(LOG_DEBUG, "Sink supports codec " STR_FORMAT, STR_FMT(&pt->encoding)); - __make_stub(handler); + __make_passthrough(handler); continue; } @@ -132,7 +155,7 @@ struct codec_handler *codec_handler_get(struct call_media *m, int payload_type) goto out; h = g_atomic_pointer_get(&m->codec_handler_cache); - if (G_LIKELY(G_LIKELY(h) && G_LIKELY(h->rtp_payload_type == payload_type))) + if (G_LIKELY(G_LIKELY(h) && G_LIKELY(h->source_pt.payload_type == payload_type))) return h; h = g_hash_table_lookup(m->codec_handlers, &payload_type); @@ -154,14 +177,102 @@ void codec_handlers_free(struct call_media *m) { } -static int handler_func_stub(struct codec_handler *h, struct call_media *media, const str *s, GQueue *out) { +static int handler_func_passthrough(struct codec_handler *h, struct call_media *media, + const struct media_packet *mp, GQueue *out) +{ struct codec_packet *p = g_slice_alloc(sizeof(*p)); - p->s = *s; + p->s = mp->raw; p->free_func = NULL; g_queue_push_tail(out, p); return 0; } -static int handler_func_transcode(struct codec_handler *h, struct call_media *media, const str *s, GQueue *out) { + + +static void __transcode_packet_free(struct transcode_packet *p) { + free(p->payload); + g_slice_free1(sizeof(*p), p); +} + +static struct ssrc_entry *__ssrc_handler_new(void *p) { + struct codec_handler *h = p; + struct codec_ssrc_handler *ch = g_slice_alloc0(sizeof(*ch)); + mutex_init(&ch->lock); + packet_sequencer_init(&ch->sequencer, (GDestroyNotify) __transcode_packet_free); + ch->decoder = decoder_new_fmt(h->source_pt.codec_def, h->source_pt.clock_rate, 1, 0); + if (!ch->decoder) + goto err; + return &ch->h; + +err: + __ssrc_handler_free(ch); + return NULL; +} +static void __ssrc_handler_free(struct codec_ssrc_handler *ch) { + packet_sequencer_destroy(&ch->sequencer); + if (ch->decoder) + decoder_close(ch->decoder); + g_slice_free1(sizeof(*ch), ch); +} + +int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *u2) { + //struct codec_ssrc_handler *ch = u1; + + ilog(LOG_DEBUG, "RTP media successfully decoded"); + + av_frame_free(&frame); + return 0; +} + +static int handler_func_transcode(struct codec_handler *h, struct call_media *media, + const struct media_packet *mp, GQueue *out) +{ + if (G_UNLIKELY(!mp->rtp || mp->rtcp)) + return handler_func_passthrough(h, media, mp, out); + + assert((mp->rtp->m_pt & 0x7f) == h->source_pt.payload_type); + + // create new packet and insert it into sequencer queue + + ilog(LOG_DEBUG, "Received RTP packet: SSRC %u, PT %u, seq %u, TS %u", + ntohl(mp->rtp->ssrc), mp->rtp->m_pt, ntohs(mp->rtp->seq_num), + ntohl(mp->rtp->timestamp)); + + struct codec_ssrc_handler *ch = get_ssrc(mp->rtp->ssrc, h->ssrc_hash); + if (G_UNLIKELY(!ch)) + return 0; + + struct transcode_packet *packet = g_slice_alloc0(sizeof(*packet)); + packet->p.seq = ntohs(mp->rtp->seq_num); + packet->payload = str_dup(&mp->payload); + packet->ts = ntohl(mp->rtp->timestamp); + + mutex_lock(&ch->lock); + + if (packet_sequencer_insert(&ch->sequencer, &packet->p)) { + // dupe + mutex_unlock(&ch->lock); + __transcode_packet_free(packet); + ilog(LOG_DEBUG, "Ignoring duplicate RTP packet"); + return 0; + } + + // got a new packet, run decoder + + while (1) { + packet = packet_sequencer_next_packet(&ch->sequencer); + if (G_UNLIKELY(!packet)) + break; + + ilog(LOG_DEBUG, "Decoding RTP packet: seq %u, TS %lu", + packet->p.seq, packet->ts); + + if (decoder_input_data(ch->decoder, packet->payload, packet->ts, __packet_decoded, ch, NULL)) + ilog(LOG_WARN, "Decoder error while processing RTP packet"); + __transcode_packet_free(packet); + } + + mutex_unlock(&ch->lock); + return 0; } diff --git a/daemon/codec.h b/daemon/codec.h index d7f18e899..0d52bd048 100644 --- a/daemon/codec.h +++ b/daemon/codec.h @@ -5,19 +5,26 @@ #include #include "str.h" #include "codeclib.h" +#include "aux.h" +#include "rtplib.h" struct call_media; struct codec_handler; +struct media_packet; +struct ssrc_hash; -typedef int codec_handler_func(struct codec_handler *, struct call_media *, const str *, GQueue *); +typedef int codec_handler_func(struct codec_handler *, struct call_media *, const struct media_packet *, + GQueue *); struct codec_handler { - int rtp_payload_type; + struct rtp_payload_type source_pt; // source_pt.payload_type = hashtable index + struct rtp_payload_type dest_pt; codec_handler_func *func; - decoder_t *decoder; + + struct ssrc_hash *ssrc_hash; }; struct codec_packet { diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 3303b4d42..4c23abb76 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -79,6 +79,7 @@ struct packet_handler_ctx { int kernelize; // true if stream can be kernelized // output: + struct media_packet mp; // passed to handlers GQueue packets_out; }; @@ -1283,9 +1284,6 @@ static void media_packet_rtcp_demux(struct packet_handler_ctx *phc) static void media_packet_rtp(struct packet_handler_ctx *phc) { - struct rtp_header *rtp_h; - struct rtcp_packet *rtcp_h; - phc->payload_type = -1; if (G_UNLIKELY(!phc->media->protocol)) @@ -1293,18 +1291,21 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) if (G_UNLIKELY(!phc->media->protocol->rtp)) return; - if (G_LIKELY(!phc->rtcp && !rtp_payload(&rtp_h, NULL, &phc->s))) { + if (G_LIKELY(!phc->rtcp && !rtp_payload(&phc->mp.rtp, &phc->mp.payload, &phc->s))) { + rtp_padding(phc->mp.rtp, &phc->mp.payload); + if (G_LIKELY(phc->out_srtp != NULL)) - __stream_ssrc(phc->in_srtp, phc->out_srtp, rtp_h->ssrc, &phc->ssrc_in, + __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtp->ssrc, &phc->ssrc_in, &phc->ssrc_out, phc->call->ssrc_hash); // check the payload type // XXX redundant between SSRC handling and codec_handler stuff -> combine - phc->payload_type = (rtp_h->m_pt & 0x7f); + phc->payload_type = (phc->mp.rtp->m_pt & 0x7f); if (G_LIKELY(phc->ssrc_in)) phc->ssrc_in->parent->payload_type = phc->payload_type; // XXX convert to array? or keep last pointer? + // XXX yet another hash table per payload type -> combine struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type); if (!rtp_s) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, @@ -1318,9 +1319,9 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) atomic64_add(&rtp_s->bytes, phc->s.len); } } - else if (phc->rtcp && !rtcp_payload(&rtcp_h, NULL, &phc->s)) { + else if (phc->rtcp && !rtcp_payload(&phc->mp.rtcp, NULL, &phc->s)) { if (G_LIKELY(phc->out_srtp != NULL)) - __stream_ssrc(phc->in_srtp, phc->out_srtp, rtcp_h->ssrc, &phc->ssrc_in, + __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtcp->ssrc, &phc->ssrc_in, &phc->ssrc_out, phc->call->ssrc_hash); } } @@ -1570,7 +1571,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { // this sets rtcp, in_srtp, out_srtp, and sink media_packet_rtcp_demux(phc); - // this set payload_type, ssrc_in and ssrc_out + // this set payload_type, ssrc_in, ssrc_out and mp media_packet_rtp(phc); @@ -1595,7 +1596,9 @@ static int stream_packet(struct packet_handler_ctx *phc) { // XXX use a handler for RTCP struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type); // this transfers the packet from 's' to 'packets_out' - transcoder->func(transcoder, phc->media, &phc->s, &phc->packets_out); + phc->mp.raw = phc->s; + if (transcoder->func(transcoder, phc->media, &phc->mp, &phc->packets_out)) + goto drop; if (G_LIKELY(handler_ret >= 0)) handler_ret = media_packet_encrypt(phc); diff --git a/daemon/media_socket.h b/daemon/media_socket.h index 173f8e9f6..3d25eeba8 100644 --- a/daemon/media_socket.h +++ b/daemon/media_socket.h @@ -68,6 +68,12 @@ struct stream_fd { struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ }; +struct media_packet { + str raw; + struct rtp_header *rtp; + struct rtcp_packet *rtcp; + str payload; +}; diff --git a/perl/NGCP/Rtpclient/RTP.pm b/perl/NGCP/Rtpclient/RTP.pm index 7d667724b..7889b8194 100644 --- a/perl/NGCP/Rtpclient/RTP.pm +++ b/perl/NGCP/Rtpclient/RTP.pm @@ -25,6 +25,7 @@ sub new { $self->{octet_count} = 0; $self->{other_ssrcs} = {}; $self->{args} = \%args; + $self->{payload_type} = $args{payload_type} // 0; return $self; } @@ -34,7 +35,8 @@ sub timer { time() < $self->{next_send} and return; - my $hdr = pack("CCnNN", 0x80, 0x00, $self->{seq}, $self->{timestamp}->bstr(), $self->{ssrc}); + my $hdr = pack("CCnNN", 0x80, $self->{payload_type}, $self->{seq}, $self->{timestamp}->bstr(), + $self->{ssrc}); my $payload = chr(rand(256)) x $self->{payload}; # XXX adapt to codec my $lost = 0; diff --git a/perl/NGCP/Rtpclient/SDP.pm b/perl/NGCP/Rtpclient/SDP.pm index e9b19253f..fa50e8384 100644 --- a/perl/NGCP/Rtpclient/SDP.pm +++ b/perl/NGCP/Rtpclient/SDP.pm @@ -123,17 +123,29 @@ use Socket; use Socket6; use IO::Socket; +my %codec_map = ( + PCMA => { payload_type => 8 }, + PCMU => { payload_type => 0 }, +); +my %payload_type_map = map {$codec_map{$_}{payload_type} => $_} keys(%codec_map); + +sub _codec_list_to_hash { + my ($list) = @_; + return { map { $_ => { %{$codec_map{$_}} } } @{$list} }; +} + sub new { - my ($class, $rtp, $rtcp, $protocol, $type) = @_; + my ($class, $rtp, $rtcp, %args) = @_; my $self = {}; bless $self, $class; $self->{rtp} = $rtp; # main transport $self->{rtcp} = $rtcp; # optional - $self->{protocol} = $protocol // 'RTP/AVP'; - $self->{type} = $type // 'audio'; - $self->{payload_types} = [0]; + $self->{protocol} = $args{protocol} // 'RTP/AVP'; + $self->{type} = $args{type} // 'audio'; + $self->{codec_list} = $args{codecs}; + $self->{codecs} = _codec_list_to_hash(@{$self->{codecs}}); $self->{additional_attributes} = []; @@ -149,7 +161,9 @@ sub new_remote { $self->{protocol} = $protocol; $self->{port} = $port; $self->{type} = $type; - $self->{payload_types} = [split(/ /, $payload_types)]; + my @payload_types = [split(/ /, $payload_types)]; + $self->{codec_list} = [ map {$payload_type_map{$_}} @payload_types ]; + $self->{codecs} = _codec_list_to_hash(@{$self->{codecs}}); return $self; }; @@ -165,8 +179,10 @@ sub encode { my $pconn = $parent_connection ? NGCP::Rtpclient::SDP::encode_address($parent_connection) : ''; my @out; + my @payload_types = map {$codec_map{$_}{payload_type}} @{$self->{codec_list}}; + push(@out, "m=$self->{type} " . $self->{rtp}->sockport() . ' ' . $self->{protocol} . ' ' - . join(' ', @{$self->{payload_types}})); + . join(' ', @payload_types)); my $rtpconn = NGCP::Rtpclient::SDP::encode_address($self->{rtp}); $rtpconn eq $pconn or push(@out, "c=$rtpconn"); diff --git a/perl/NGCP/Rtpengine/Test.pm b/perl/NGCP/Rtpengine/Test.pm index 4a572abef..a059baf50 100644 --- a/perl/NGCP/Rtpengine/Test.pm +++ b/perl/NGCP/Rtpengine/Test.pm @@ -138,6 +138,7 @@ sub _new { $self->{parent} = $parent; $self->{tag} = rand(); + $self->{codecs} = $args{codecs} // [qw(PCMU)]; # create media sockets my @addresses = @{$parent->{all_addresses}}; @@ -181,7 +182,10 @@ sub _new { $args{protocol} and $proto = $args{protocol}; $self->{local_media} = $self->{local_sdp}->add_media(NGCP::Rtpclient::SDP::Media->new( - $self->{main_sockets}->[0], $self->{main_sockets}->[1], $proto)); # main rtp and rtcp + $self->{main_sockets}->[0], $self->{main_sockets}->[1], # main rtp and rtcp + protocol => $proto, + codecs => $self->{codecs}, + )); # XXX support multiple medias if ($args{dtls}) { @@ -273,7 +277,7 @@ sub _default_req_args { my $req = { command => $cmd, 'call-id' => $self->{parent}->{callid} }; - for my $cp (qw(sdp from-tag to-tag ICE transport-protocol address-family label direction)) { + for my $cp (qw(sdp from-tag to-tag ICE transport-protocol address-family label direction codec)) { $args{$cp} and $req->{$cp} = $args{$cp}; } for my $cp (@{$args{flags}}) { diff --git a/t/test-transcode.pl b/t/test-transcode.pl new file mode 100755 index 000000000..2f413903f --- /dev/null +++ b/t/test-transcode.pl @@ -0,0 +1,27 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use NGCP::Rtpengine::Test; +use IO::Socket; + +my $r = NGCP::Rtpengine::Test->new(); +my ($a, $b) = $r->client_pair( + {sockdomain => &Socket::AF_INET, codecs => [qw(PCMA)]}, + {sockdomain => &Socket::AF_INET, codecs => [qw(PCMU)]} +); + +$r->timer_once(3, sub { + $b->answer($a, ICE => 'remove', label => "callee"); + $a->start_rtp(); + $a->start_rtcp(); + }); +$r->timer_once(10, sub { $r->stop(); }); + +$a->offer($b, ICE => 'remove', label => "caller", codec => { transcode => ['PCMU']}); +$b->start_rtp(); +$b->start_rtcp(); + +$r->run(); + +$a->teardown(dump => 1);