diff --git a/daemon/.ycm_extra_conf.py b/daemon/.ycm_extra_conf.py index cf64ff7da..aefe9b1ba 100644 --- a/daemon/.ycm_extra_conf.py +++ b/daemon/.ycm_extra_conf.py @@ -20,6 +20,7 @@ flags = [ '-I/usr/lib/x86_64-linux-gnu/glib-2.0/include', '-pthread', '-I../kernel-module/', + '-I../lib/', '-D_GNU_SOURCE', '-D__DEBUG=1', '-D__YCM=1', diff --git a/daemon/Makefile b/daemon/Makefile index 21c5870c0..f81d1a2a0 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -55,7 +55,8 @@ include ../lib/lib.Makefile SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ - media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c + media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ + codec.c LIBSRCS= loglib.c auxlib.c rtplib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/daemon/aux.h b/daemon/aux.h index 4feb1250b..ea82a4b9b 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -158,6 +158,19 @@ INLINE void g_tree_add_all(GTree *t, GQueue *q) { +/* GHASHTABLE */ + +INLINE GQueue *g_hash_table_lookup_queue_new(GHashTable *ht, void *key) { + GQueue *ret = g_hash_table_lookup(ht, key); + if (ret) + return ret; + ret = g_queue_new(); + g_hash_table_insert(ht, key, ret); + return ret; +} + + + /*** STRING HELPERS ***/ INLINE void strmove(char **d, char **s) { diff --git a/daemon/call.c b/daemon/call.c index 553970cf7..486b16606 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -43,6 +43,7 @@ #include "ssrc.h" #include "main.h" #include "graphite.h" +#include "codec.h" /* also serves as array index for callstream->peers[] */ @@ -683,6 +684,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con med->index = sp->index; call_str_cpy(ml->call, &med->type, &sp->type); med->codecs = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free); + med->codec_names = g_hash_table_new_full(str_hash, str_equal, NULL, (void (*)(void*)) g_queue_free); g_queue_push_tail(&ml->medias, med); @@ -1448,27 +1450,57 @@ static void __dtls_logic(const struct sdp_ng_flags *flags, MEDIA_SET(other_media, DTLS); } -static void __rtp_payload_type_add(struct call_media *media, struct rtp_payload_type *pt) { +static void __rtp_payload_type_add(struct call_media *media, struct call_media *other_media, + struct rtp_payload_type *pt) +{ struct call *call = media->call; + + if (g_hash_table_lookup(media->codecs, &pt->payload_type)) { + // collision/duplicate - ignore + __payload_type_free(pt); + return; + } /* we must duplicate the contents */ call_str_cpy(call, &pt->encoding_with_params, &pt->encoding_with_params); call_str_cpy(call, &pt->encoding, &pt->encoding); call_str_cpy(call, &pt->encoding_parameters, &pt->encoding_parameters); call_str_cpy(call, &pt->format_parameters, &pt->format_parameters); g_hash_table_replace(media->codecs, &pt->payload_type, pt); - g_queue_push_tail(&media->codecs_prefs, pt); + + GQueue *q = g_hash_table_lookup_queue_new(media->codec_names, &pt->encoding); + g_queue_push_tail(q, GUINT_TO_POINTER(pt->payload_type)); + + g_queue_push_tail(&media->codecs_prefs_recv, pt); + + // for the other side, we need a new 'pt' struct + struct rtp_payload_type *pt_copy = g_slice_alloc(sizeof(*pt)); + *pt_copy = *pt; // contents are allocated from the 'call' + g_queue_push_tail(&other_media->codecs_prefs_send, pt_copy); + + // make sure we have at least an empty queue here to indicate support for this code. + // don't add anything to the queue as we don't know the reverse RTP payload type. + g_hash_table_lookup_queue_new(other_media->codec_names, &pt->encoding); } -static void __rtp_payload_types(struct call_media *media, GQueue *types, GHashTable *strip, +static void __payload_queue_free(void *qq) { + GQueue *q = qq; + g_queue_free_full(q, __payload_type_free); +} +static void __rtp_payload_types(struct call_media *media, struct call_media *other_media, + GQueue *types, GHashTable *strip, const GQueue *offer) { + // 'media' = receiver of this offer/answer; 'other_media' = sender of this offer/answer struct rtp_payload_type *pt; static const str str_all = STR_CONST_INIT("all"); - GHashTable *removed = g_hash_table_new_full(str_hash, str_equal, NULL, __payload_type_free); + GHashTable *removed = g_hash_table_new_full(str_hash, str_equal, NULL, __payload_queue_free); int remove_all = 0; // start fresh - g_queue_clear(&media->codecs_prefs); + g_queue_clear(&media->codecs_prefs_recv); + g_queue_clear_full(&other_media->codecs_prefs_send, __payload_type_free); + g_hash_table_remove_all(media->codecs); + g_hash_table_remove_all(media->codec_names); if (strip && g_hash_table_lookup(strip, &str_all)) remove_all = 1; @@ -1478,22 +1510,27 @@ static void __rtp_payload_types(struct call_media *media, GQueue *types, GHashTa // codec stripping if (strip) { if (remove_all || g_hash_table_lookup(strip, &pt->encoding)) { - g_hash_table_replace(removed, &pt->encoding, pt); + GQueue *q = g_hash_table_lookup_queue_new(removed, &pt->encoding); + g_queue_push_tail(q, pt); continue; } } - __rtp_payload_type_add(media, pt); + __rtp_payload_type_add(media, other_media, pt); } if (offer) { // now restore codecs that have been removed, but should be offered for (GList *l = offer->head; l; l = l->next) { str *codec = l->data; - pt = g_hash_table_lookup(removed, codec); - if (!pt) + GQueue *q = g_hash_table_lookup(removed, codec); + if (!q) continue; g_hash_table_steal(removed, codec); - __rtp_payload_type_add(media, pt); + for (GList *l = q->head; l; l = l->next) { + pt = l->data; + __rtp_payload_type_add(media, other_media, pt); + } + g_queue_free(q); } } @@ -1620,7 +1657,10 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, MEDIA_SET(other_media, SDES); } - __rtp_payload_types(media, &sp->rtp_payload_types, flags->codec_strip, &flags->codec_offer); + // codec and RTP payload types handling + __rtp_payload_types(media, other_media, &sp->rtp_payload_types, + flags->codec_strip, &flags->codec_offer); + codec_handlers_update(media, other_media); /* send and recv are from our POV */ bf_copy_same(&media->media_flags, &sp->sp_flags, @@ -2030,7 +2070,10 @@ static void __call_free(void *p) { g_queue_clear(&md->streams); g_queue_clear(&md->endpoint_maps); g_hash_table_destroy(md->codecs); - g_queue_clear(&md->codecs_prefs); + g_hash_table_destroy(md->codec_names); + g_queue_clear(&md->codecs_prefs_recv); + g_queue_clear_full(&md->codecs_prefs_send, __payload_type_free); + codec_handlers_free(md); g_slice_free1(sizeof(*md), md); } diff --git a/daemon/call.h b/daemon/call.h index f6d88fd86..4bddf6c2c 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -199,6 +199,7 @@ struct local_interface; struct call_monologue; struct ice_agent; struct ssrc_hash; +struct codec_handler; typedef bencode_buffer_t call_buffer_t; @@ -323,8 +324,13 @@ struct call_media { GQueue streams; /* normally RTP + RTCP */ GQueue endpoint_maps; - GHashTable *codecs; - GQueue codecs_prefs; + GHashTable *codecs; // int payload type -> struct rtp_payload_type; storage container + GHashTable *codec_names; // codec name -> GQueue of int payload types; storage container + GQueue codecs_prefs_recv, // preference by order in SDP; values shared with 'codecs' + codecs_prefs_send; // ditto for outgoing media; storage container + GHashTable *codec_handlers; // int payload type -> struct codec_handler + // XXX combine this with 'codecs' hash table? + volatile struct codec_handler *codec_handler_cache; volatile unsigned int media_flags; }; diff --git a/daemon/codec.c b/daemon/codec.c new file mode 100644 index 000000000..9c2ebc7f1 --- /dev/null +++ b/daemon/codec.c @@ -0,0 +1,130 @@ +#include "codec.h" +#include +#include "call.h" +#include "log.h" +#include "rtplib.h" + + + + +static codec_handler_func handler_func_stub; + + +static struct codec_handler codec_handler_stub = { + .rtp_payload_type = -1, + .func = handler_func_stub, +}; + + + +static void __make_stub(struct codec_handler *handler) { + handler->func = handler_func_stub; +} + +static void __codec_handler_free(void *pp) { + struct codec_handler *h = pp; + g_slice_free1(sizeof(*h), h); +} + +// call must be locked in W +void codec_handlers_update(struct call_media *receiver, struct call_media *sink) { + if (!receiver->codec_handlers) + receiver->codec_handlers = g_hash_table_new_full(g_int_hash, g_int_equal, + NULL, __codec_handler_free); + + // we go through the list of codecs that the receiver supports and compare it + // with the list of codecs supported by the sink. if the receiver supports + // a codec that the sink doesn't support, we must transcode. + // + // if we transcode, we transcode to the highest-preference supported codec + // that the sink specified. determine this first. + struct rtp_payload_type *pref_dest_codec = NULL; + for (GList *l = sink->codecs_prefs_send.head; l; l = l->next) { + struct rtp_payload_type *pt = l->data; + // XXX if supported ... + ilog(LOG_DEBUG, "Default sink codec is " STR_FORMAT, STR_FMT(&pt->encoding)); + pref_dest_codec = pt; + break; + } + + for (GList *l = receiver->codecs_prefs_recv.head; l; l = l->next) { + struct rtp_payload_type *pt = l->data; + + // first, make sure we have a codec_handler struct for this + struct codec_handler *handler; + handler = g_hash_table_lookup(receiver->codec_handlers, &pt->payload_type); + if (!handler) { + ilog(LOG_DEBUG, "Creating codec handler for " STR_FORMAT, STR_FMT(&pt->encoding)); + handler = g_slice_alloc0(sizeof(*handler)); + handler->rtp_payload_type = pt->payload_type; + g_hash_table_insert(receiver->codec_handlers, &handler->rtp_payload_type, + handler); + } + + // if the sink's codec preferences are unknown (empty), or there are + // no supported codecs to transcode to, then we have nothing + // to do. most likely this is an initial offer without a received answer. + // we default to forwarding without transcoding. + if (!pref_dest_codec) { + ilog(LOG_DEBUG, "No known/supported sink codec for " STR_FORMAT, STR_FMT(&pt->encoding)); + __make_stub(handler); + continue; + } + + if (g_hash_table_lookup(sink->codec_names, &pt->encoding)) { + // the sink supports this codec. forward without transcoding. + ilog(LOG_DEBUG, "Sink supports codec " STR_FORMAT, STR_FMT(&pt->encoding)); + __make_stub(handler); + continue; + } + + // the sink does not support this codec XXX do something + ilog(LOG_DEBUG, "Sink does not support codec " STR_FORMAT, STR_FMT(&pt->encoding)); + __make_stub(handler); + } +} + +// call must be locked in R +struct codec_handler *codec_handler_get(struct call_media *m, int payload_type) { + struct codec_handler *h; + + if (payload_type < 0) + goto out; + + h = g_atomic_pointer_get(&m->codec_handler_cache); + if (G_LIKELY(G_LIKELY(h) && G_LIKELY(h->rtp_payload_type == payload_type))) + return h; + + h = g_hash_table_lookup(m->codec_handlers, &payload_type); + if (!h) + goto out; + + g_atomic_pointer_set(&m->codec_handler_cache, h); + + return h; + +out: + return &codec_handler_stub; +} + +void codec_handlers_free(struct call_media *m) { + g_hash_table_destroy(m->codec_handlers); + m->codec_handlers = NULL; + m->codec_handler_cache = NULL; +} + + +static int handler_func_stub(struct codec_handler *h, struct call_media *media, const str *s, GQueue *out) { + struct codec_packet *p = g_slice_alloc(sizeof(*p)); + p->s = *s; + p->free_func = NULL; + g_queue_push_tail(out, p); + return 0; +} + +void codec_packet_free(void *pp) { + struct codec_packet *p = pp; + if (p->free_func) + p->free_func(p->s.s); + g_slice_free1(sizeof(*p), p); +} diff --git a/daemon/codec.h b/daemon/codec.h new file mode 100644 index 000000000..83beeb66c --- /dev/null +++ b/daemon/codec.h @@ -0,0 +1,38 @@ +#ifndef __CODEC_H__ +#define __CODEC_H__ + + +#include +#include "str.h" + + +struct call_media; +struct codec_handler; + + +typedef int codec_handler_func(struct codec_handler *, struct call_media *, const str *, GQueue *); + + +struct codec_handler { + int rtp_payload_type; + codec_handler_func *func; +}; + +struct codec_packet { + str s; + void (*free_func)(void *); +}; + + +void codec_handlers_update(struct call_media *receiver, struct call_media *sink); + +struct codec_handler *codec_handler_get(struct call_media *, int payload_type); + +void codec_handlers_free(struct call_media *); + +void codec_packet_free(void *); + + + + +#endif diff --git a/daemon/media_socket.c b/daemon/media_socket.c index ef57a2820..02b7580dd 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -24,6 +24,7 @@ #include "ssrc.h" #include "iptables.h" #include "main.h" +#include "codec.h" #ifndef PORT_RANDOM_MIN @@ -68,6 +69,7 @@ struct packet_handler_ctx { struct packet_stream *sink; // where to send output packets to (forward destination) rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt struct packet_stream *in_srtp, *out_srtp; // SRTP contexts for decrypt/encrypt (relevant for muxed RTCP) + int payload_type; // -1 if unknown or not RTP struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp int rtcp; // true if this is an RTCP packet @@ -75,6 +77,9 @@ struct packet_handler_ctx { int update; // true if Redis info needs to be updated int unkernelize; // true if stream ought to be removed from kernel int kernelize; // true if stream can be kernelized + + // output: + GQueue packets_out; }; @@ -1281,6 +1286,8 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) struct rtp_header *rtp_h; struct rtcp_packet *rtcp_h; + phc->payload_type = -1; + if (G_UNLIKELY(!phc->media->protocol)) return; if (G_UNLIKELY(!phc->media->protocol->rtp)) @@ -1292,15 +1299,16 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) &phc->ssrc_out, phc->call->ssrc_hash); // check the payload type - int i = (rtp_h->m_pt & 0x7f); + // XXX redundant between SSRC handling and codec_handler stuff -> combine + phc->payload_type = (rtp_h->m_pt & 0x7f); if (G_LIKELY(phc->ssrc_in)) - phc->ssrc_in->parent->payload_type = i; + phc->ssrc_in->parent->payload_type = phc->payload_type; // XXX convert to array? or keep last pointer? - struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &i); + struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type); if (!rtp_s) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, - "RTP packet with unknown payload type %u received", i); + "RTP packet with unknown payload type %u received", phc->payload_type); atomic64_inc(&phc->stream->stats.errors); atomic64_inc(&rtpe_statsps.errors); } @@ -1348,21 +1356,25 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc) return ret; } -static int media_packet_encrypt(struct packet_handler_ctx *phc) -{ +static int media_packet_encrypt(struct packet_handler_ctx *phc) { + int ret = 0; + if (!phc->encrypt_func) return 0; mutex_lock(&phc->out_srtp->out_lock); - int ret = phc->encrypt_func(&phc->s, phc->out_srtp, NULL, NULL, NULL, phc->ssrc_out); + for (GList *l = phc->packets_out.head; l; l = l->next) { + struct codec_packet *p = l->data; + int encret = phc->encrypt_func(&p->s, phc->out_srtp, NULL, NULL, NULL, phc->ssrc_out); + if (encret == 1) + phc->update = 1; + else if (encret != 0) + ret = -1; + } mutex_unlock(&phc->out_srtp->out_lock); - if (ret == 1) { - phc->update = 1; - ret = 0; - } return ret; } @@ -1558,7 +1570,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { // this sets rtcp, in_srtp, out_srtp, and sink media_packet_rtcp_demux(phc); - // this set ssrc_in and ssrc_out + // this set payload_type, ssrc_in and ssrc_out media_packet_rtp(phc); @@ -1580,6 +1592,11 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (phc->call->recording) dump_packet(phc->call->recording, phc->stream, &phc->s); + // XXX use a handler for RTCP + struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type); + // this transfers the packet from 's' to 'packets_out' + transcoder->func(transcoder, phc->media, &phc->s, &phc->packets_out); + if (G_LIKELY(handler_ret >= 0)) handler_ret = media_packet_encrypt(phc); @@ -1606,9 +1623,19 @@ static int stream_packet(struct packet_handler_ctx *phc) { goto drop; } - ret = socket_sendto(&phc->sink->selected_sfd->socket, phc->s.s, phc->s.len, &phc->sink->endpoint); - __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&phc->sink->endpoint.address), - phc->sink->endpoint.port); + struct codec_packet *p; + ret = 0; + while ((p = g_queue_pop_head(&phc->packets_out))) { + __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&phc->sink->endpoint.address), + phc->sink->endpoint.port); + + ret = socket_sendto(&phc->sink->selected_sfd->socket, p->s.s, p->s.len, &phc->sink->endpoint); + + codec_packet_free(p); + + if (ret == -1) + break; + } mutex_unlock(&phc->sink->out_lock); @@ -1622,6 +1649,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { drop: ret = 0; + // XXX separate stats for received/sent atomic64_inc(&phc->stream->stats.packets); atomic64_add(&phc->stream->stats.bytes, phc->s.len); atomic64_set(&phc->stream->last_packet, rtpe_now.tv_sec); @@ -1637,6 +1665,8 @@ out: rwlock_unlock_r(&phc->call->master_lock); + g_queue_clear_full(&phc->packets_out, codec_packet_free); + return ret; } @@ -1662,7 +1692,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { } #endif - struct packet_handler_ctx phc = { 0, }; + struct packet_handler_ctx phc; + ZERO(phc); phc.sfd = sfd; ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, diff --git a/daemon/rtp.c b/daemon/rtp.c index 4ae11db16..4d90547df 100644 --- a/daemon/rtp.c +++ b/daemon/rtp.c @@ -91,7 +91,6 @@ int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { struct rtp_header *rtp; str payload, to_auth; u_int64_t index; - int ret = 0; if (G_UNLIKELY(!ssrc_ctx)) return -1; @@ -115,7 +114,7 @@ int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { s->len += c->params.crypto_suite->srtp_auth_tag; } - return ret; + return 0; } /* rfc 3711, section 3.3 */ @@ -124,7 +123,6 @@ int rtp_savp2avp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { u_int64_t index; str payload, to_auth, to_decrypt, auth_tag; char hmac[20]; - int ret = 0; if (G_UNLIKELY(!ssrc_ctx)) return -1; @@ -176,7 +174,7 @@ decrypt: *s = to_auth; - return ret; + return 0; error: ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Discarded invalid SRTP packet: authentication failed"); diff --git a/daemon/sdp.c b/daemon/sdp.c index 540e2ae46..9cde6bccb 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -1003,15 +1003,9 @@ int sdp_parse(str *body, GQueue *sessions) { /* if (attr->key.s) g_hash_table_insert(attrs->name_hash, &attr->key, attr); */ - /* attr_queue = g_hash_table_lookup(attrs->name_lists_hash, &attr->name); - if (!attr_queue) - g_hash_table_insert(attrs->name_lists_hash, &attr->name, - (attr_queue = g_queue_new())); + /* attr_queue = g_hash_table_lookup_queue_new(attrs->name_lists_hash, &attr->name); g_queue_push_tail(attr_queue, attr); */ - attr_queue = g_hash_table_lookup(attrs->id_lists_hash, &attr->attr); - if (!attr_queue) - g_hash_table_insert(attrs->id_lists_hash, &attr->attr, - (attr_queue = g_queue_new())); + attr_queue = g_hash_table_lookup_queue_new(attrs->id_lists_hash, &attr->attr); g_queue_push_tail(attr_queue, attr); break; @@ -1470,10 +1464,10 @@ static int replace_transport_protocol(struct sdp_chopper *chop, static int replace_codec_list(struct sdp_chopper *chop, struct sdp_media *media, struct call_media *cm) { - if (cm->codecs_prefs.length == 0) + if (cm->codecs_prefs_recv.length == 0) return 0; // legacy protocol or usage error - for (GList *l = cm->codecs_prefs.head; l; l = l->next) { + for (GList *l = cm->codecs_prefs_recv.head; l; l = l->next) { struct rtp_payload_type *pt = l->data; chopper_append_printf(chop, " %u", pt->payload_type); } @@ -1718,7 +1712,7 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_media * break; case ATTR_RTPMAP: - if (media->codecs_prefs.length == 0) + if (media->codecs_prefs_recv.length == 0) break; // legacy protocol or usage error if (!g_hash_table_lookup(media->codecs, &attr->u.rtpmap.rtp_pt.payload_type)) @@ -1726,7 +1720,7 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_media * break; case ATTR_FMTP: - if (media->codecs_prefs.length == 0) + if (media->codecs_prefs_recv.length == 0) break; // legacy protocol or usage error if (!g_hash_table_lookup(media->codecs, &attr->u.fmtp.payload_type))