diff --git a/daemon/call.c b/daemon/call.c index c36e6a181..3dd2ae758 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include @@ -378,7 +378,7 @@ fault: void kill_calls_timer(GSList *list, const char *url) { struct call *ca; GList *csl; - struct call_monologue *cm, *cd; + struct call_monologue *cm; char *url_prefix = NULL, *url_suffix = NULL; struct xmlrpc_helper *xh = NULL; char url_buf[128]; @@ -445,25 +445,32 @@ void kill_calls_timer(GSList *list, const char *url) { case XF_KAMAILIO: for (csl = ca->monologues.head; csl; csl = csl->next) { cm = csl->data; - cd = cm->active_dialogue; - if (!cm->tag.s || !cm->tag.len || !cd || !cd->tag.s || !cd->tag.len) + if (!cm->tag.s || !cm->tag.len) continue; - str *from_tag = g_hash_table_lookup(dup_tags, &cd->tag); - if (from_tag && !str_cmp_str(from_tag, &cm->tag)) - continue; + for (GList *sub = cm->subscribers.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *cd = cs->monologue; + + if (!cd->tag.s || !cd->tag.len) + continue; - from_tag = str_dup(&cm->tag); - str *to_tag = str_dup(&cd->tag); + str *from_tag = g_hash_table_lookup(dup_tags, &cd->tag); + if (from_tag && !str_cmp_str(from_tag, &cm->tag)) + continue; - g_queue_push_tail(&xh->strings, - strdup(url_buf)); - g_queue_push_tail(&xh->strings, - str_dup(&ca->callid)); - g_queue_push_tail(&xh->strings, from_tag); - g_queue_push_tail(&xh->strings, to_tag); + from_tag = str_dup(&cm->tag); + str *to_tag = str_dup(&cd->tag); - g_hash_table_insert(dup_tags, from_tag, to_tag); + g_queue_push_tail(&xh->strings, + strdup(url_buf)); + g_queue_push_tail(&xh->strings, + str_dup(&ca->callid)); + g_queue_push_tail(&xh->strings, from_tag); + g_queue_push_tail(&xh->strings, to_tag); + + g_hash_table_insert(dup_tags, from_tag, to_tag); + } } break; } @@ -521,7 +528,7 @@ static void call_timer(void *ptr) { struct iterator_helper hlp; GList *i, *l; struct rtpengine_list_entry *ke; - struct packet_stream *ps, *sink; + struct packet_stream *ps; struct stats tmpstats; int j, update; struct stream_fd *sfd; @@ -639,12 +646,17 @@ static void call_timer(void *ptr) { if (diff_packets) sfd->call->foreign_media = 0; - sink = packet_stream_sink(ps); - if (!ke->target.non_forwarding && diff_packets) { - // only check the first - struct rtpengine_output_info *o = &ke->outputs[0]; - if (sink && o->src_addr.family) { + for (GList *l = ps->rtp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + struct packet_stream *sink = sh->sink; + + if (sh->kernel_output_idx < 0 + || sh->kernel_output_idx >= ke->target.num_destinations) + continue; + + struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx]; + mutex_lock(&sink->out_lock); if (sink->crypto.params.crypto_suite && sink->ssrc_out && ntohl(ke->target.ssrc) == sink->ssrc_out->parent->h.ssrc @@ -1179,6 +1191,28 @@ void __rtp_stats_update(GHashTable *dst, struct codec_store *cs) { /* we leave previously added but now removed payload types in place */ } +void free_sink_handler(void *p) { + struct sink_handler *sh = p; + g_slice_free1(sizeof(*sh), sh); +} +void __add_sink_handler(GQueue *q, struct packet_stream *sink) { + struct sink_handler *sh = g_slice_alloc0(sizeof(*sh)); + sh->sink = sink; + sh->kernel_output_idx = -1; + g_queue_push_tail(q, sh); +} + +// called once before calling __init_streams once for each sink +static void __reset_streams(struct call_media *media) { + for (GList *l = media->streams.head; l; l = l->next) { + struct packet_stream *ps = l->data; + g_queue_clear_full(&ps->rtp_sinks, free_sink_handler); + g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler); + } +} +// called once on media A for each sink media B +// B can be NULL +// XXX this function seems to do two things - stream init (with B NULL) and sink init - split up? static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp, const struct sdp_ng_flags *flags) { GList *la, *lb; @@ -1194,11 +1228,13 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru b = lb->data; /* RTP */ - a->rtp_sink = b; - // reflect media - pretent reflection also for blackhole, as otherwise + // reflect media - pretend reflection also for blackhole, as otherwise // we get SSRC flip-flops on the opposite side + // XXX still necessary for blackhole? if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) - a->rtp_sink = a; + __add_sink_handler(&a->rtp_sinks, a); + else + __add_sink_handler(&a->rtp_sinks, b); PS_SET(a, RTP); /* XXX technically not correct, could be udptl too */ __rtp_stats_update(a->rtp_stats, &A->codecs); @@ -1227,14 +1263,13 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru b = lb->data; } - if (!MEDIA_ISSET(A, RTCP_MUX)) { - a->rtcp_sink = NULL; + if (!MEDIA_ISSET(A, RTCP_MUX)) PS_CLEAR(a, RTCP); - } else { - a->rtcp_sink = b; if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) - a->rtcp_sink = a->rtcp_sibling; + __add_sink_handler(&a->rtcp_sinks, a->rtcp_sibling); + else + __add_sink_handler(&a->rtcp_sinks, b); PS_SET(a, RTCP); PS_CLEAR(a, IMPLICIT_RTCP); } @@ -1247,10 +1282,10 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru assert(la != NULL); a = la->data; - a->rtp_sink = NULL; - a->rtcp_sink = b; if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) - a->rtcp_sink = a; + __add_sink_handler(&a->rtcp_sinks, a); + else + __add_sink_handler(&a->rtcp_sinks, b); PS_CLEAR(a, RTP); PS_SET(a, RTCP); a->rtcp_sibling = NULL; @@ -2211,16 +2246,68 @@ void codecs_offer_answer(struct call_media *media, struct call_media *other_medi } } + /* called with call->master_lock held in W */ -int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, +static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams, struct sdp_ng_flags *flags) { + GList *sl = streams ? streams->head : NULL; + + // create media iterators for all subscribers + GList *sub_medias[ml->subscribers.length]; + unsigned int num_subs = 0; + for (GList *l = ml->subscribers.head; l; l = l->next) { + struct call_subscription *cs = l->data; + struct call_monologue *sub_ml = cs->monologue; + sub_medias[num_subs++] = sub_ml->medias.head; + } + // keep num_subs as shortcut to ml->subscribers.length + + for (GList *l = ml->medias.head; l; l = l->next) { + struct call_media *media = l->data; + + struct stream_params *sp = NULL; + if (sl) { + sp = sl->data; + sl = sl->next; + } + + __ice_start(media); + + // update all subscribers + __reset_streams(media); + for (unsigned int i = 0; i < num_subs; i++) { + if (!sub_medias[i]) + continue; + + struct call_media *sub_media = sub_medias[i]->data; + sub_medias[i] = sub_medias[i]->next; + + if (__init_streams(media, sub_media, sp, flags)) + ilog(LOG_WARN, "Error initialising streams"); + } + + // we are now ready to fire up ICE if so desired and requested + ice_update(media->ice_agent, sp); // sp == NULL: update in case rtcp-mux changed + + recording_setup_media(media); + t38_gateway_start(media->t38_gateway); + + if (mqtt_publish_scope() == MPS_MEDIA) + mqtt_timer_start(&media->mqtt_timer, media->call, media); + } +} + + +/* called with call->master_lock held in W */ +int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, struct sdp_ng_flags *flags) { struct stream_params *sp; GList *media_iter, *ml_media, *other_ml_media; struct call_media *media, *other_media; - struct call_monologue *monologue; struct endpoint_map *em; struct call *call; + struct call_monologue *other_ml = dialogue[0]; + struct call_monologue *monologue = dialogue[1]; /* we must have a complete dialogue, even though the to-tag (monologue->tag) * may not be known yet */ @@ -2229,7 +2316,6 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, return -1; } - monologue = other_ml->active_dialogue; call = monologue->call; call->last_signal = MAX(call->last_signal, rtpe_now.tv_sec); @@ -2432,9 +2518,6 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, /* ICE stuff - must come after interface and address family selection */ __ice_offer(flags, media, other_media); - __ice_start(other_media); - __ice_start(media); - /* we now know what's being advertised by the other side */ @@ -2448,7 +2531,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, * generate media (or RTCP packets) for that stream. */ __disable_streams(media, sp->num_ports); __disable_streams(other_media, sp->num_ports); - goto init; + continue; } if (is_addr_unspecified(&sp->rtp_endpoint.address) && !MEDIA_ISSET(other_media, TRICKLE_ICE)) { /* Zero endpoint address, equivalent to setting the media stream @@ -2478,26 +2561,11 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, if (__wildcard_endpoint_map(other_media, sp->num_ports)) goto error_ports; } - -init: - if (__init_streams(media, other_media, NULL, NULL)) - return -1; - if (__init_streams(other_media, media, sp, flags)) - return -1; - - /* we are now ready to fire up ICE if so desired and requested */ - ice_update(other_media->ice_agent, sp); - ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */ - - recording_setup_media(media); - t38_gateway_start(media->t38_gateway); - - if (mqtt_publish_scope() == MPS_MEDIA) { - mqtt_timer_start(&media->mqtt_timer, call, media); - mqtt_timer_start(&other_media->mqtt_timer, call, other_media); - } } + __update_init_subscribers(other_ml, streams, flags); + __update_init_subscribers(monologue, NULL, NULL); + // set ipv4/ipv6/mixed media stats if (flags && (flags->opmode == OP_OFFER || flags->opmode == OP_ANSWER)) { statistics_update_ip46_inc_dec(call, CMC_INCREMENT); @@ -2515,6 +2583,73 @@ error_intf: } +static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs_link) { + struct call_subscription *cs = which_cs_link->data; + struct call_subscription *rev_cs = cs->link->data; + struct call_monologue *from = cs->monologue; + ilog(LOG_DEBUG, "Unsubscribing '" STR_FORMAT_M "' from '" STR_FORMAT_M "'", + STR_FMT_M(&which->tag), + STR_FMT_M(&from->tag)); + g_queue_delete_link(&from->subscribers, cs->link); + g_queue_delete_link(&which->subscriptions, which_cs_link); + g_slice_free1(sizeof(*cs), cs); + g_slice_free1(sizeof(*rev_cs), rev_cs); +} +//static void __unsubscribe_all(struct call_monologue *which) { +// while (which->subscriptions.head) +// __unsubscribe_one_link(which, which->subscriptions.head); +//} +static bool __unsubscribe_one(struct call_monologue *which, struct call_monologue *from) { + for (GList *l = which->subscriptions.head; l; l = l->next) { + struct call_subscription *cs = l->data; + if (cs->monologue != from) + continue; + __unsubscribe_one_link(which, l); + return true; + } + return false; +} +static void __unsubscribe_all_offer_answer(struct call_monologue *ml) { + for (GList *l = ml->subscriptions.head; l; ) { + struct call_subscription *cs = l->data; + if (!cs->offer_answer) { + l = l->next; + continue; + } + GList *next = l->next; + __unsubscribe_one_link(ml, l); + l = next; + } +} +void __add_subscription(struct call_monologue *which, struct call_monologue *to, bool offer_answer) { + ilog(LOG_DEBUG, "Subscribing '" STR_FORMAT_M "' to '" STR_FORMAT_M "'", + STR_FMT_M(&which->tag), + STR_FMT_M(&to->tag)); + struct call_subscription *which_cs = g_slice_alloc0(sizeof(*which_cs)); + struct call_subscription *to_rev_cs = g_slice_alloc0(sizeof(*to_rev_cs)); + which_cs->monologue = to; + to_rev_cs->monologue = which; + // keep offer-answer subscriptions first in the list + if (!offer_answer) { + g_queue_push_tail(&which->subscriptions, which_cs); + g_queue_push_tail(&to->subscribers, to_rev_cs); + which_cs->link = to->subscribers.tail; + to_rev_cs->link = which->subscriptions.tail; + } + else { + g_queue_push_head(&which->subscriptions, which_cs); + g_queue_push_head(&to->subscribers, to_rev_cs); + which_cs->link = to->subscribers.head; + to_rev_cs->link = which->subscriptions.head; + } + which_cs->offer_answer = offer_answer ? 1 : 0; + to_rev_cs->offer_answer = which_cs->offer_answer; +} +static void __subscribe_only_one_offer_answer(struct call_monologue *which, struct call_monologue *to) { + __unsubscribe_all_offer_answer(which); + __add_subscription(which, to, true); +} + static int __rtp_stats_sort(const void *ap, const void *bp) { const struct rtp_stats *a = ap, *b = bp; @@ -2602,8 +2737,8 @@ static void __call_cleanup(struct call *c) { g_queue_clear(&ps->sfds); crypto_cleanup(&ps->crypto); - ps->rtp_sink = NULL; - ps->rtcp_sink = NULL; + g_queue_clear_full(&ps->rtp_sinks, free_sink_handler); + g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler); } for (GList *l = c->medias.head; l; l = l->next) { @@ -2718,18 +2853,21 @@ void call_destroy(struct call *c) { // stats output only - no cleanups ilog(LOG_INFO, "--- Tag '" STR_FORMAT_M "'%s"STR_FORMAT"%s, created " - "%u:%02u ago for branch '" STR_FORMAT_M "', in dialogue with '" STR_FORMAT_M "'", + "%u:%02u ago for branch '" STR_FORMAT_M "'", STR_FMT_M(&ml->tag), ml->label.s ? " (label '" : "", STR_FMT(ml->label.s ? &ml->label : &STR_EMPTY), ml->label.s ? "')" : "", (unsigned int) (rtpe_now.tv_sec - ml->created) / 60, (unsigned int) (rtpe_now.tv_sec - ml->created) % 60, - STR_FMT_M(&ml->viabranch), - ml->active_dialogue ? rtpe_common_config_ptr->log_mark_prefix : "", - ml->active_dialogue ? (int) ml->active_dialogue->tag.len : 6, - ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)", - ml->active_dialogue ? rtpe_common_config_ptr->log_mark_suffix : ""); + STR_FMT_M(&ml->viabranch)); + + for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *csm = cs->monologue; + ilog(LOG_INFO, "--- subscribed to '" STR_FORMAT_M "'", + STR_FMT_M(&csm->tag)); + } for (k = ml->medias.head; k; k = k->next) { md = k->data; @@ -3073,7 +3211,6 @@ void __monologue_tag(struct call_monologue *ml, const str *tag) { } void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) { struct call *call = ml->call; - struct call_monologue *other = ml->active_dialogue; if (!viabranch || !viabranch->len) return; @@ -3081,15 +3218,25 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) { __C_DBG("tagging monologue with viabranch '"STR_FORMAT"'", STR_FMT(viabranch)); if (ml->viabranch.s) { g_hash_table_remove(call->viabranches, &ml->viabranch); - if (other) - g_hash_table_remove(other->branches, &ml->viabranch); + for (GList *sub = ml->subscribers.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + g_hash_table_remove(cs->monologue->branches, &ml->viabranch); + } } call_str_cpy(call, &ml->viabranch, viabranch); g_hash_table_insert(call->viabranches, &ml->viabranch, ml); - if (other) - g_hash_table_insert(other->branches, &ml->viabranch, ml); + for (GList *sub = ml->subscribers.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + g_hash_table_insert(cs->monologue->branches, &ml->viabranch, ml); + } } +static void __unconfirm_sinks(GQueue *q) { + for (GList *l = q->head; l; l = l->next) { + struct sink_handler *sh = l->data; + __stream_unconfirm(sh->sink); + } +} /* must be called with call->master_lock held in W */ void __monologue_unkernelize(struct call_monologue *monologue) { GList *l, *m; @@ -3108,14 +3255,30 @@ void __monologue_unkernelize(struct call_monologue *monologue) { for (m = media->streams.head; m; m = m->next) { stream = m->data; __stream_unconfirm(stream); - if (stream->rtp_sink) - __stream_unconfirm(stream->rtp_sink); - if (stream->rtcp_sink) - __stream_unconfirm(stream->rtcp_sink); + __unconfirm_sinks(&stream->rtp_sinks); + __unconfirm_sinks(&stream->rtcp_sinks); } } } +static void __dialogue_unkernelize(struct call_monologue *ml) { + __monologue_unkernelize(ml); + for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + __monologue_unkernelize(cs->monologue); + } + for (GList *sub = ml->subscribers.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + __monologue_unkernelize(cs->monologue); + } +} + +static void __unkernelize_sinks(GQueue *q) { + for (GList *l = q->head; l; l = l->next) { + struct sink_handler *sh = l->data; + unkernelize(sh->sink); + } +} /* call locked in R */ void call_media_unkernelize(struct call_media *media) { GList *m; @@ -3124,8 +3287,8 @@ void call_media_unkernelize(struct call_media *media) { for (m = media->streams.head; m; m = m->next) { stream = m->data; unkernelize(stream); - unkernelize(stream->rtp_sink); - unkernelize(stream->rtcp_sink); + __unkernelize_sinks(&stream->rtp_sinks); + __unkernelize_sinks(&stream->rtcp_sinks); } } @@ -3199,23 +3362,28 @@ static int monologue_destroy(struct call_monologue *ml) { /* must be called with call->master_lock held in W */ static void __fix_other_tags(struct call_monologue *one) { - struct call_monologue *two; - if (!one || !one->tag.len) return; - two = one->active_dialogue; - if (!two || !two->tag.len) - return; - g_hash_table_insert(one->other_tags, &two->tag, two); - g_hash_table_insert(two->other_tags, &one->tag, one); + for (GList *sub = one->subscribers.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *two = cs->monologue; + g_hash_table_insert(one->other_tags, &two->tag, two); + g_hash_table_insert(two->other_tags, &one->tag, one); + } } /* must be called with call->master_lock held in W */ -static struct call_monologue *call_get_monologue(struct call *call, const str *fromtag, const str *totag, +struct call_monologue *call_get_monologue(struct call *call, const str *fromtag) { + return g_hash_table_lookup(call->tags, fromtag); +} + +/* must be called with call->master_lock held in W */ +static int call_get_monologue_new(struct call_monologue *dialogue[2], struct call *call, + const str *fromtag, const str *totag, const str *viabranch) { - struct call_monologue *ret, *os; + struct call_monologue *ret, *os = NULL; __C_DBG("getting monologue for tag '"STR_FORMAT"' in call '"STR_FORMAT"'", STR_FMT(fromtag), STR_FMT(&call->callid)); @@ -3228,27 +3396,34 @@ static struct call_monologue *call_get_monologue(struct call *call, const str *f __C_DBG("found existing monologue"); __monologue_unkernelize(ret); - __monologue_unkernelize(ret->active_dialogue); + for (GList *sub = ret->subscriptions.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + __monologue_unkernelize(cs->monologue); + } if (!viabranch) goto ok_check_tag; - /* check the viabranch. if it's not known, then this is a branched offer and we need - * to create a new "other side" for this branch. */ - if (!ret->active_dialogue->viabranch.s) { - /* previous "other side" hasn't been tagged with the via-branch, so we'll just - * use this one and tag it */ - __monologue_viabranch(ret->active_dialogue, viabranch); - goto ok_check_tag; + for (GList *sub = ret->subscribers.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *csm = cs->monologue; + /* check the viabranch. if it's not known, then this is a branched offer and we need + * to create a new "other side" for this branch. */ + if (!csm->viabranch.s) { + /* previous "other side" hasn't been tagged with the via-branch, so we'll just + * use this one and tag it */ + __monologue_viabranch(csm, viabranch); + goto ok_check_tag; + } + if (!str_cmp_str(&csm->viabranch, viabranch)) + goto ok_check_tag; /* dialogue still intact */ } - if (!str_cmp_str(&ret->active_dialogue->viabranch, viabranch)) - goto ok_check_tag; /* dialogue still intact */ os = g_hash_table_lookup(call->viabranches, viabranch); if (os) { /* previously seen branch. use it */ __monologue_unkernelize(os); - os->active_dialogue = ret; - ret->active_dialogue = os; + __subscribe_only_one_offer_answer(ret, os); + __subscribe_only_one_offer_answer(os, ret); goto ok_check_tag; } @@ -3257,21 +3432,31 @@ static struct call_monologue *call_get_monologue(struct call *call, const str *f new_branch: __C_DBG("create new \"other side\" monologue for viabranch "STR_FORMAT, STR_FMT0(viabranch)); os = __monologue_create(call); - ret->active_dialogue = os; - os->active_dialogue = ret; + __subscribe_only_one_offer_answer(ret, os); + __subscribe_only_one_offer_answer(os, ret); __monologue_viabranch(os, viabranch); ok_check_tag: - os = ret->active_dialogue; - if (totag && totag->s && !os->tag.s) { - __monologue_tag(os, totag); - __fix_other_tags(ret); - } - return ret; + for (GList *sub = ret->subscriptions.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *csm = cs->monologue; + if (!os) + os = csm; + if (totag && totag->s && !csm->tag.s) { + __monologue_tag(csm, totag); + __fix_other_tags(ret); + } + break; // there should only be one + // XXX check if there's more than a one-to-one mapping here? + } + dialogue[0] = ret; + dialogue[1] = os; + return 0; } /* must be called with call->master_lock held in W */ -static struct call_monologue *call_get_dialogue(struct call *call, const str *fromtag, const str *totag, +static int call_get_dialogue(struct call_monologue *dialogue[2], struct call *call, const str *fromtag, + const str *totag, const str *viabranch) { struct call_monologue *ft, *tt; @@ -3282,7 +3467,7 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr /* we start with the to-tag. if it's not known, we treat it as a branched offer */ tt = g_hash_table_lookup(call->tags, totag); if (!tt) - return call_get_monologue(call, fromtag, totag, viabranch); + return call_get_monologue_new(dialogue, call, fromtag, totag, viabranch); /* if the from-tag is known already, return that */ ft = g_hash_table_lookup(call->tags, fromtag); @@ -3290,9 +3475,26 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr __C_DBG("found existing dialogue"); /* make sure that the dialogue is actually intact */ - /* fastpath for a common case */ - if (!str_cmp_str(totag, &ft->active_dialogue->tag)) - goto done; + if (ft->subscriptions.length != 1 || ft->subscribers.length != 1) + goto tag_setup; + if (tt->subscriptions.length != 1 || tt->subscribers.length != 1) + goto tag_setup; + + struct call_subscription *cs = ft->subscriptions.head->data; + if (cs->monologue != tt) + goto tag_setup; + cs = ft->subscribers.head->data; + if (cs->monologue != tt) + goto tag_setup; + + cs = tt->subscriptions.head->data; + if (cs->monologue != ft) + goto tag_setup; + cs = tt->subscribers.head->data; + if (cs->monologue != ft) + goto tag_setup; + + goto done; } else { /* perhaps we can determine the monologue from the viabranch */ @@ -3303,37 +3505,44 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr if (!ft) { /* if we don't have a fromtag monologue yet, we can use a half-complete dialogue * from the totag if there is one. otherwise we have to create a new one. */ - ft = tt->active_dialogue; - if (ft->tag.s) + if (tt->subscriptions.head) { + struct call_subscription *cs = tt->subscriptions.head->data; + ft = cs->monologue; + } + if (!ft || ft->tag.s) ft = __monologue_create(call); } +tag_setup: /* the fromtag monologue may be newly created, or half-complete from the totag, or * derived from the viabranch. */ if (!ft->tag.s || str_cmp_str(&ft->tag, fromtag)) __monologue_tag(ft, fromtag); - __monologue_unkernelize(ft->active_dialogue); - __monologue_unkernelize(tt->active_dialogue); - ft->active_dialogue = tt; - tt->active_dialogue = ft; + __dialogue_unkernelize(ft); + __dialogue_unkernelize(tt); + __subscribe_only_one_offer_answer(ft, tt); + __subscribe_only_one_offer_answer(tt, ft); __fix_other_tags(ft); done: __monologue_unkernelize(ft); - __monologue_unkernelize(ft->active_dialogue); - return ft; + __dialogue_unkernelize(ft); + dialogue[0] = ft; + dialogue[1] = tt; + return 0; } /* fromtag and totag strictly correspond to the directionality of the message, not to the actual * SIP headers. IOW, the fromtag corresponds to the monologue sending this message, even if the * tag is actually from the TO header of the SIP message (as it would be in a 200 OK) */ -struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag, +int call_get_mono_dialogue(struct call_monologue *dialogue[2], struct call *call, const str *fromtag, + const str *totag, const str *viabranch) { if (!totag || !totag->s) /* initial offer */ - return call_get_monologue(call, fromtag, NULL, viabranch); - return call_get_dialogue(call, fromtag, totag, viabranch); + return call_get_monologue_new(dialogue, call, fromtag, NULL, viabranch); + return call_get_dialogue(dialogue, call, fromtag, totag, viabranch); } @@ -3403,8 +3612,11 @@ int call_delete_branch(const str *callid, const str *branch, // if the associated dialogue has an empty tag (unknown) if (match_tag == totag) { ml = g_hash_table_lookup(c->tags, fromtag); - if (ml && ml->active_dialogue && ml->active_dialogue->tag.len == 0) - goto do_delete; + if (ml && ml->subscriptions.length == 1) { + struct call_subscription *cs = ml->subscriptions.head->data; + if (cs->monologue->tag.len == 0) + goto do_delete; + } } ilog(LOG_INFO, "Tag '"STR_FORMAT"' in delete message not found, ignoring", @@ -3417,8 +3629,10 @@ do_delete: ng_call_stats(c, fromtag, totag, output, NULL); monologue_stop(ml); - if (ml->active_dialogue && ml->active_dialogue->active_dialogue == ml) - monologue_stop(ml->active_dialogue); + for (GList *l = ml->subscribers.head; l; l = l->next) { + struct call_subscription *cs = l->data; + monologue_stop(cs->monologue); + } if (delete_delay > 0) { ilog(LOG_INFO, "Scheduling deletion of call branch '" STR_FORMAT_M "' " diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 7e4575532..7383ff148 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -170,7 +170,7 @@ static str *call_update_lookup_udp(char **out, enum call_opmode opmode, const ch const endpoint_t *sin) { struct call *c; - struct call_monologue *monologue; + struct call_monologue *dialogue[2]; GQueue q = G_QUEUE_INIT; struct stream_params sp; str *ret, callid, viabranch, fromtag, totag = STR_NULL; @@ -195,33 +195,32 @@ static str *call_update_lookup_udp(char **out, enum call_opmode opmode, const ch c->created_from_addr = sin->address; } - monologue = call_get_mono_dialogue(c, &fromtag, &totag, NULL); - if (!monologue) + if (call_get_mono_dialogue(dialogue, c, &fromtag, &totag, NULL)) goto ml_fail; if (opmode == OP_OFFER) { - monologue->tagtype = FROM_TAG; + dialogue[0]->tagtype = FROM_TAG; } else { - monologue->tagtype = TO_TAG; + dialogue[0]->tagtype = TO_TAG; } if (addr_parse_udp(&sp, out)) goto addr_fail; g_queue_push_tail(&q, &sp); - i = monologue_offer_answer(monologue, &q, NULL); + i = monologue_offer_answer(dialogue, &q, NULL); g_queue_clear(&q); if (i) goto unlock_fail; - ret = streams_print(&monologue->active_dialogue->medias, + ret = streams_print(&dialogue[1]->medias, sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); redis_update_onekey(c, rtpe_redis_write); - gettimeofday(&(monologue->started), NULL); + gettimeofday(&(dialogue[0]->started), NULL); ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT(ret)); goto out; @@ -307,7 +306,7 @@ static void streams_parse(const char *s, GQueue *q) { static str *call_request_lookup_tcp(char **out, enum call_opmode opmode) { struct call *c; - struct call_monologue *monologue; + struct call_monologue *dialogue[2]; GQueue s = G_QUEUE_INIT; str *ret = NULL, callid, fromtag, totag = STR_NULL; GHashTable *infohash; @@ -336,15 +335,14 @@ static str *call_request_lookup_tcp(char **out, enum call_opmode opmode) { str_swap(&fromtag, &totag); } - monologue = call_get_mono_dialogue(c, &fromtag, &totag, NULL); - if (!monologue) { + if (call_get_mono_dialogue(dialogue, c, &fromtag, &totag, NULL)) { ilog(LOG_WARNING, "Invalid dialogue association"); goto out2; } - if (monologue_offer_answer(monologue, &s, NULL)) + if (monologue_offer_answer(dialogue, &s, NULL)) goto out2; - ret = streams_print(&monologue->active_dialogue->medias, 1, s.length, NULL, SAF_TCP); + ret = streams_print(&dialogue[1]->medias, 1, s.length, NULL, SAF_TCP); out2: rwlock_unlock_w(&c->master_lock); @@ -1265,11 +1263,11 @@ static void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct mutex_unlock(&sdp_fragments_lock); } #define MAX_FRAG_AGE 3000000 -static void dequeue_sdp_fragments(struct call_monologue *monologue) { +static void dequeue_sdp_fragments(struct call_monologue *dialogue[2]) { struct fragment_key k; ZERO(k); - k.call_id = monologue->call->callid; - k.from_tag = monologue->tag; + k.call_id = dialogue[0]->call->callid; + k.from_tag = dialogue[0]->tag; mutex_lock(&sdp_fragments_lock); GQueue *frags = g_hash_table_lookup(sdp_fragments, &k); @@ -1290,7 +1288,7 @@ static void dequeue_sdp_fragments(struct call_monologue *monologue) { ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag)); - monologue_offer_answer(monologue, &frag->streams, &frag->flags); + monologue_offer_answer(dialogue, &frag->streams, &frag->flags); next: fragment_free(frag); @@ -1331,7 +1329,7 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t GQueue parsed = G_QUEUE_INIT; GQueue streams = G_QUEUE_INIT; struct call *call; - struct call_monologue *monologue; + struct call_monologue *dialogue[2]; int ret; struct sdp_ng_flags flags; struct sdp_chopper *chopper; @@ -1409,19 +1407,18 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t * need to hold a ref until we're done sending the reply */ call_bencode_hold_ref(call, output); - monologue = call_get_mono_dialogue(call, &flags.from_tag, &flags.to_tag, - flags.via_branch.s ? &flags.via_branch : NULL); errstr = "Invalid dialogue association"; - if (!monologue) { + if (call_get_mono_dialogue(dialogue, call, &flags.from_tag, &flags.to_tag, + flags.via_branch.s ? &flags.via_branch : NULL)) { rwlock_unlock_w(&call->master_lock); obj_put(call); goto out; } if (opmode == OP_OFFER) { - monologue->tagtype = FROM_TAG; + dialogue[0]->tagtype = FROM_TAG; } else { - monologue->tagtype = TO_TAG; + dialogue[0]->tagtype = TO_TAG; } chopper = sdp_chopper_new(&flags.sdp); @@ -1443,11 +1440,11 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t int do_dequeue = 1; - ret = monologue_offer_answer(monologue, &streams, &flags); + ret = monologue_offer_answer(dialogue, &streams, &flags); if (!ret) { // SDP fragments for trickle ICE are consumed with no replacement returned if (!flags.fragment) - ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags); + ret = sdp_replace(chopper, &parsed, dialogue[1], &flags); } else if (ret == ERROR_NO_ICE_AGENT && flags.fragment) { queue_sdp_fragment(ngbuf, &streams, &flags); @@ -1459,15 +1456,15 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t struct recording *recording = call->recording; if (recording != NULL) { - meta_write_sdp_before(recording, &flags.sdp, monologue, opmode); + meta_write_sdp_before(recording, &flags.sdp, dialogue[0], opmode); meta_write_sdp_after(recording, chopper->output, - monologue, opmode); + dialogue[0], opmode); recording_response(recording, output); } if (do_dequeue) - dequeue_sdp_fragments(monologue); + dequeue_sdp_fragments(dialogue); rwlock_unlock_w(&call->master_lock); @@ -1478,7 +1475,7 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t } obj_put(call); - gettimeofday(&(monologue->started), NULL); + gettimeofday(&(dialogue[0]->started), NULL); errstr = "Error rewriting SDP"; @@ -1697,8 +1694,20 @@ static void ng_stats_monologue(bencode_item_t *dict, const struct call_monologue if (ml->label.s) bencode_dictionary_add_str(sub, "label", &ml->label); bencode_dictionary_add_integer(sub, "created", ml->created); - if (ml->active_dialogue) - bencode_dictionary_add_str(sub, "in dialogue with", &ml->active_dialogue->tag); + bencode_item_t *subs = bencode_dictionary_add_list(sub, "subscriptions"); + for (GList *l = ml->subscriptions.head; l; l = l->next) { + struct call_subscription *cs = l->data; + bencode_item_t *sub1 = bencode_list_add_dictionary(subs); + bencode_dictionary_add_str(sub1, "tag", &cs->monologue->tag); + bencode_dictionary_add_string(sub1, "type", cs->offer_answer ? "offer/answer" : "pub/sub"); + } + subs = bencode_dictionary_add_list(sub, "subscribers"); + for (GList *l = ml->subscribers.head; l; l = l->next) { + struct call_subscription *cs = l->data; + bencode_item_t *sub1 = bencode_list_add_dictionary(subs); + bencode_dictionary_add_str(sub1, "tag", &cs->monologue->tag); + bencode_dictionary_add_string(sub1, "type", cs->offer_answer ? "offer/answer" : "pub/sub"); + } ng_stats_ssrc(bencode_dictionary_add_dictionary(sub, "SSRC"), ml->ssrc_hash); medias = bencode_dictionary_add_list(sub, "medias"); @@ -1818,7 +1827,10 @@ stats: ml = g_hash_table_lookup(call->tags, match_tag); if (ml) { ng_stats_monologue(tags, ml, totals); - ng_stats_monologue(tags, ml->active_dialogue, totals); + for (GList *l = ml->subscriptions.head; l; l = l->next) { + struct call_subscription *cs = l->data; + ng_stats_monologue(tags, cs->monologue, totals); + } } } @@ -1977,9 +1989,10 @@ found: ; } else if (flags->from_tag.s) { - *monologue = call_get_mono_dialogue(*call, &flags->from_tag, NULL, NULL); + *monologue = call_get_monologue(*call, &flags->from_tag); if (!*monologue) return "From-tag given, but no such tag exists"; + __monologue_unkernelize(*monologue); } return NULL; @@ -2382,23 +2395,26 @@ const char *call_play_dtmf_ng(bencode_item_t *input, bencode_item_t *output) { // XXX fall back to generating a secondary stream goto out; -found:; - struct call_monologue *dialogue = monologue->active_dialogue; - struct call_media *sink = NULL; - for (GList *l = dialogue->medias.head; l; l = l->next) { - sink = l->data; - if (media->type_id != MT_AUDIO) - continue; - goto found_sink; - } +found: + for (GList *k = monologue->subscribers.head; k; k = k->next) { + struct call_subscription *cs = k->data; + struct call_monologue *dialogue = cs->monologue; + struct call_media *sink = NULL; + for (GList *m = dialogue->medias.head; m; m = m->next) { + sink = m->data; + if (media->type_id != MT_AUDIO) + continue; + goto found_sink; + } - err = "Sink monologue has no media capable of DTMF playback"; - goto out; + err = "Sink monologue has no media capable of DTMF playback"; + goto out; found_sink: - err = dtmf_inject(media, code, volume, duration, pause, sink); - if (err) - break; + err = dtmf_inject(media, code, volume, duration, pause, sink); + if (err) + break; + } } out: diff --git a/daemon/cdr.c b/daemon/cdr.c index 71c9c7e4b..44d89bc4b 100644 --- a/daemon/cdr.c +++ b/daemon/cdr.c @@ -74,15 +74,20 @@ void cdr_update_entry(struct call* c) { "ml%i_duration=%ld.%06ld, " "ml%i_termination=%s, " "ml%i_local_tag=%s, " - "ml%i_local_tag_type=%s, " - "ml%i_remote_tag=%s, ", + "ml%i_local_tag_type=%s, ", cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec, cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec, cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec, cdrlinecnt, get_term_reason_text(ml->term_reason), cdrlinecnt, ml->tag.s, - cdrlinecnt, get_tag_type_text(ml->tagtype), - cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); + cdrlinecnt, get_tag_type_text(ml->tagtype)); + + for (k = ml->subscriptions.head; k; k = k->next) { + struct call_subscription *cs = k->data; + g_string_append_printf(cdr, + "ml%i_remote_tag=%s, ", + cdrlinecnt, cs->monologue->tag.s); + } } for (k = ml->medias.head; k; k = k->next) { diff --git a/daemon/cli.c b/daemon/cli.c index 386c08a61..fc5cd7805 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -571,14 +571,19 @@ static void cli_incoming_list_callid(str *instr, struct cli_writer *cw) { cw->cw_printf(cw, "--- Tag '" STR_FORMAT "', type: %s, label '" STR_FORMAT "', " "branch '" STR_FORMAT "', " "callduration " - "%ld.%06ld, in dialogue with '" STR_FORMAT "'\n", + "%ld.%06ld\n", STR_FMT(&ml->tag), get_tag_type_text(ml->tagtype), STR_FMT(ml->label.s ? &ml->label : &STR_EMPTY), STR_FMT(&ml->viabranch), tim_result_duration.tv_sec, - tim_result_duration.tv_usec, - ml->active_dialogue ? (int) ml->active_dialogue->tag.len : 6, - ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); + tim_result_duration.tv_usec); + + for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *csm = cs->monologue; + cw->cw_printf(cw, "--- subscribed to '" STR_FORMAT_M "'\n", + STR_FMT_M(&csm->tag)); + } for (k = ml->medias.head; k; k = k->next) { md = k->data; diff --git a/daemon/codec.c b/daemon/codec.c index 5990c4694..80a9bfc98 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -2265,12 +2265,13 @@ static void __dtx_send_later(struct codec_timer *ct) { __ssrc_unlock_both(&mp_copy); if (mp_copy.packets_out.length && ret == 0) { - struct packet_stream *sink = ps->rtp_sink; + struct sink_handler *sh = &mp_copy.sink; + struct packet_stream *sink = sh->sink; if (!sink) media_socket_dequeue(&mp_copy, NULL); // just free else { - if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, &mp_copy)) + if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &mp_copy)) ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); mutex_lock(&sink->out_lock); diff --git a/daemon/dtmf.c b/daemon/dtmf.c index 511f998bf..6cad1edcf 100644 --- a/daemon/dtmf.c +++ b/daemon/dtmf.c @@ -236,69 +236,73 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * { struct call *call = monologue->call; - if (!media->monologue || !media->monologue->active_dialogue) - return "No dialogue association"; - - struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, - media->monologue->active_dialogue->ssrc_hash, SSRC_DIR_OUTPUT, - monologue); - if (!ssrc_out) - return "No output SSRC context present"; // XXX generate stream - - int duration_samples = duration * ch->dest_pt.clock_rate / 1000; - int pause_samples = pause * ch->dest_pt.clock_rate / 1000; - - // we generate PCM DTMF by simulating a detected RFC event packet - // XXX this shouldn't require faking an actual RTP packet - struct telephone_event_payload tep = { - .event = code, - .volume = -1 * volume, - .end = 1, - .duration = htons(duration_samples), - }; - struct rtp_header rtp = { - .m_pt = 0xff, - .timestamp = 0, - .seq_num = htons(ssrc_in->parent->sequencer.seq), - .ssrc = htonl(ssrc_in->parent->h.ssrc), - }; - struct media_packet packet = { - .tv = rtpe_now, - .call = call, - .media = media, - .media_out = sink, - .rtp = &rtp, - .ssrc_in = ssrc_in, - .ssrc_out = ssrc_out, - .raw = { (void *) &tep, sizeof(tep) }, - .payload = { (void *) &tep, sizeof(tep) }, - }; - - // keep track of how much PCM we've generated - uint64_t encoder_pts = codec_encoder_pts(csh); - uint64_t skip_pts = codec_decoder_unskip_pts(csh); // reset to zero to take up our new samples - - ch->dtmf_injector->func(ch->dtmf_injector, &packet); - - // insert pause - tep.event = 0xff; - tep.duration = htons(pause_samples); - rtp.seq_num = htons(ssrc_in->parent->sequencer.seq); - - ch->dtmf_injector->func(ch->dtmf_injector, &packet); - - // skip generated samples - uint64_t pts_offset = codec_encoder_pts(csh) - encoder_pts; - skip_pts += av_rescale(pts_offset, ch->dest_pt.clock_rate, ch->source_pt.clock_rate); - codec_decoder_skip_pts(csh, skip_pts); - - // ready packets for send - // XXX handle encryption? - - media_socket_dequeue(&packet, packet_stream_sink(ps)); + for (GList *l = ps->rtp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + struct packet_stream *sink_ps = sh->sink; + struct call_monologue *sink_ml = sink_ps->media->monologue; + + struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, + sink_ml->ssrc_hash, SSRC_DIR_OUTPUT, + monologue); + if (!ssrc_out) + return "No output SSRC context present"; // XXX generate stream + + int duration_samples = duration * ch->dest_pt.clock_rate / 1000; + int pause_samples = pause * ch->dest_pt.clock_rate / 1000; + + // we generate PCM DTMF by simulating a detected RFC event packet + // XXX this shouldn't require faking an actual RTP packet + struct telephone_event_payload tep = { + .event = code, + .volume = -1 * volume, + .end = 1, + .duration = htons(duration_samples), + }; + struct rtp_header rtp = { + .m_pt = 0xff, + .timestamp = 0, + .seq_num = htons(ssrc_in->parent->sequencer.seq), + .ssrc = htonl(ssrc_in->parent->h.ssrc), + }; + struct media_packet packet = { + .tv = rtpe_now, + .call = call, + .media = media, + .media_out = sink, + .rtp = &rtp, + .ssrc_in = ssrc_in, + .ssrc_out = ssrc_out, + .raw = { (void *) &tep, sizeof(tep) }, + .payload = { (void *) &tep, sizeof(tep) }, + }; + + // keep track of how much PCM we've generated + uint64_t encoder_pts = codec_encoder_pts(csh); + uint64_t skip_pts = codec_decoder_unskip_pts(csh); // reset to zero to take up our new samples + + ch->dtmf_injector->func(ch->dtmf_injector, &packet); + + // insert pause + tep.event = 0xff; + tep.duration = htons(pause_samples); + rtp.seq_num = htons(ssrc_in->parent->sequencer.seq); + + ch->dtmf_injector->func(ch->dtmf_injector, &packet); + + // skip generated samples + uint64_t pts_offset = codec_encoder_pts(csh) - encoder_pts; + skip_pts += av_rescale(pts_offset, ch->dest_pt.clock_rate, ch->source_pt.clock_rate); + codec_decoder_skip_pts(csh, skip_pts); + + // ready packets for send + // XXX handle encryption? + + media_socket_dequeue(&packet, sink_ps); + + obj_put_o((struct obj *) csh); + ssrc_ctx_put(&ssrc_out); + } - obj_put_o((struct obj *) csh); - ssrc_ctx_put(&ssrc_out); return 0; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 8f246b74b..a150e7dd6 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -52,24 +52,26 @@ struct packet_handler_ctx { // inputs: str s; // raw input packet - struct packet_stream *sink; // where to send output packets to (forward destination) + GQueue *sinks; // where to send output packets to (forward destination) rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt rtcp_filter_func *rtcp_filter; struct packet_stream *in_srtp, *out_srtp; // SRTP contexts for decrypt/encrypt (relevant for muxed RTCP) int payload_type; // -1 if unknown or not RTP int rtcp; // true if this is an RTCP packet + GQueue rtcp_list; // verdicts: int update; // true if Redis info needs to be updated int unkernelize; // true if stream ought to be removed from kernel int kernelize; // true if stream can be kernelized + int rtcp_discard; // do not forward RTCP // output: struct media_packet mp; // passed to handlers }; -static void __determine_handler(struct packet_stream *in, const struct packet_stream *out); +static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *); static int __k_null(struct rtpengine_srtp *s, struct packet_stream *); static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *); @@ -1111,123 +1113,96 @@ static int __rtp_stats_pt_sort(const void *ap, const void *bp) { /* called with in_lock held */ -void kernelize(struct packet_stream *stream) { - struct rtpengine_target_info reti; - struct rtpengine_destination_info redi; +// sink_handler can be NULL +static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *outputs, + struct packet_stream *stream, struct sink_handler *sink_handler) +{ + struct rtpengine_destination_info *redi = NULL; struct call *call = stream->call; - struct packet_stream *sink = NULL; - const char *nk_warn_msg; - int non_forwarding = 0; struct call_media *media = stream->media; + struct packet_stream *sink = sink_handler ? sink_handler->sink : NULL; + bool non_forwarding = false; + bool blackhole = false; + + if (sink_handler) + sink_handler->kernel_output_idx = -1; - if (PS_ISSET(stream, KERNELIZED)) - return; - if (call->recording != NULL && !selected_recording_method->kernel_support) - goto no_kernel; - if (!kernel.is_wanted) - goto no_kernel; - nk_warn_msg = "interface to kernel module not open"; - if (!kernel.is_open) - goto no_kernel_warn; if (!PS_ISSET(stream, RTP)) { if (PS_ISSET(stream, RTCP) && PS_ISSET(stream, STRICT_SOURCE)) - non_forwarding = 1; // use the kernel's source checking capability + non_forwarding = true; // use the kernel's source checking capability else - goto no_kernel; + return NULL; } - if (MEDIA_ISSET(media, GENERATOR)) - goto no_kernel; - if (!stream->selected_sfd) - goto no_kernel; - if (media->monologue->block_media || call->block_media) - goto no_kernel; - if (!stream->endpoint.address.family) - goto no_kernel; if (MEDIA_ISSET(media, BLACKHOLE)) - non_forwarding = 1; + blackhole = true; + else if (!sink_handler) + blackhole = true; - ilog(LOG_INFO, "Kernelizing media stream: %s%s:%d%s", - FMT_M(sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port)); + if (blackhole) + non_forwarding = true; - sink = packet_stream_sink(stream); - if (!sink) { - ilog(LOG_WARNING, "Attempt to kernelize stream without sink"); - goto no_kernel; - } - if (!sink->endpoint.address.family) - goto no_kernel; + if (sink && !sink->endpoint.address.family) + return NULL; - __determine_handler(stream, sink); + if (sink) + ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s -> %s%s%s", + FMT_M(endpoint_print_buf(&stream->endpoint)), + endpoint_print_buf(&stream->selected_sfd->socket.local), + FMT_M(endpoint_print_buf(&sink->endpoint))); + else + ilog(LOG_INFO, "Kernelizing media stream: %s%s%s -> %s -> void", + FMT_M(endpoint_print_buf(&stream->endpoint)), + endpoint_print_buf(&stream->selected_sfd->socket.local)); - if (is_addr_unspecified(&sink->advertised_endpoint.address) - || !sink->advertised_endpoint.port) - goto no_kernel; - nk_warn_msg = "protocol not supported by kernel module"; - if (!stream->handler->in->kernel - || !stream->handler->out->kernel) - goto no_kernel_warn; + const struct streamhandler *handler = __determine_handler(stream, sink_handler); + + if (!handler->in->kernel || !handler->out->kernel) + return "protocol not supported by kernel module"; - ZERO(reti); - ZERO(redi); + // fill input if needed + + if (reti->local.family) + goto output; if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) { mutex_lock(&stream->out_lock); - __re_address_translate_ep(&reti.expected_src, &stream->endpoint); + __re_address_translate_ep(&reti->expected_src, &stream->endpoint); mutex_unlock(&stream->out_lock); if (PS_ISSET(stream, STRICT_SOURCE)) - reti.src_mismatch = MSM_DROP; + reti->src_mismatch = MSM_DROP; else if (PS_ISSET(stream, MEDIA_HANDOVER)) - reti.src_mismatch = MSM_PROPAGATE; + reti->src_mismatch = MSM_PROPAGATE; } - mutex_lock(&sink->out_lock); + __re_address_translate_ep(&reti->local, &stream->selected_sfd->socket.local); + reti->rtcp_mux = MEDIA_ISSET(media, RTCP_MUX); + reti->dtls = MEDIA_ISSET(media, DTLS); + reti->stun = media->ice_agent ? 1 : 0; + reti->non_forwarding = non_forwarding ? 1 : 0; + reti->blackhole = blackhole ? 1 : 0; + reti->rtp_stats = (MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0; + + handler->in->kernel(&reti->decrypt, stream); + if (!reti->decrypt.cipher || !reti->decrypt.hmac) + return "decryption cipher or HMAC not supported by kernel module"; - __re_address_translate_ep(&reti.local, &stream->selected_sfd->socket.local); - redi.local = reti.local; - redi.output.tos = call->tos; - reti.rtcp_mux = MEDIA_ISSET(media, RTCP_MUX); - reti.dtls = MEDIA_ISSET(media, DTLS); - reti.stun = media->ice_agent ? 1 : 0; - reti.non_forwarding = non_forwarding; - reti.blackhole = MEDIA_ISSET(media, BLACKHOLE) ? 1 : 0; - reti.rtp_stats = (MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0; - - reti.num_destinations = 1; - redi.num = 0; - - __re_address_translate_ep(&redi.output.dst_addr, &sink->endpoint); - __re_address_translate_ep(&redi.output.src_addr, &sink->selected_sfd->socket.local); if (stream->ssrc_in) { - reti.ssrc = htonl(stream->ssrc_in->parent->h.ssrc); - if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO)) { - redi.output.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out); - reti.transcoding = 1; - } + reti->ssrc = htonl(stream->ssrc_in->parent->h.ssrc); + if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO)) + reti->transcoding = 1; } - stream->handler->in->kernel(&reti.decrypt, stream); - stream->handler->out->kernel(&redi.output.encrypt, sink); - - mutex_unlock(&sink->out_lock); - - nk_warn_msg = "encryption cipher or HMAC not supported by kernel module"; - if (!redi.output.encrypt.cipher || !redi.output.encrypt.hmac) - goto no_kernel_warn; - nk_warn_msg = "decryption cipher or HMAC not supported by kernel module"; - if (!reti.decrypt.cipher || !reti.decrypt.hmac) - goto no_kernel_warn; - ZERO(stream->kernel_stats); - if (proto_is_rtp(media->protocol)) { + if (proto_is_rtp(media->protocol) && sink) { GList *values, *l; struct rtp_stats *rs; - reti.rtp = 1; + reti->rtp = 1; values = g_hash_table_get_values(stream->rtp_stats); values = g_list_sort(values, __rtp_stats_pt_sort); for (l = values; l; l = l->next) { - if (reti.num_payload_types >= G_N_ELEMENTS(reti.payload_types)) { + if (reti->num_payload_types >= G_N_ELEMENTS(reti->payload_types)) { ilog(LOG_WARNING, "Too many RTP payload types for kernel module"); break; } @@ -1236,23 +1211,110 @@ void kernelize(struct packet_stream *stream) { struct codec_handler *ch = codec_handler_get(media, rs->payload_type, sink->media); if (!ch->kernelize) continue; - reti.payload_types[reti.num_payload_types] = rs->payload_type; - reti.clock_rates[reti.num_payload_types] = ch->source_pt.clock_rate; - reti.num_payload_types++; + reti->payload_types[reti->num_payload_types] = rs->payload_type; + reti->clock_rates[reti->num_payload_types] = ch->source_pt.clock_rate; + reti->num_payload_types++; } g_list_free(values); } else { if (MEDIA_ISSET(media, TRANSCODE)) - goto no_kernel; + return NULL; } - recording_stream_kernel_info(stream, &reti); + recording_stream_kernel_info(stream, reti); - kernel_add_stream(&reti); - kernel_add_destination(&redi); - PS_SET(stream, KERNELIZED); +output: + // output section + if (non_forwarding) // also applies to sink == NULL + return NULL; // no output + if (is_addr_unspecified(&sink->advertised_endpoint.address) + || !sink->advertised_endpoint.port) + return NULL; + + redi = g_slice_alloc0(sizeof(*redi)); + redi->local = reti->local; + redi->output.tos = call->tos; + mutex_lock(&sink->out_lock); + + __re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint); + __re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local); + if (stream->ssrc_in && reti->transcoding) + redi->output.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out); + + handler->out->kernel(&redi->output.encrypt, sink); + + mutex_unlock(&sink->out_lock); + + if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) { + g_slice_free1(sizeof(*redi), redi); + return "encryption cipher or HMAC not supported by kernel module"; + } + + // got a new output + redi->num = reti->num_destinations; + reti->num_destinations++; + sink_handler->kernel_output_idx = redi->num; + g_queue_push_tail(outputs, redi); + assert(outputs->length == reti->num_destinations); + + return NULL; +} +/* called with in_lock held */ +void kernelize(struct packet_stream *stream) { + struct call *call = stream->call; + const char *nk_warn_msg; + struct call_media *media = stream->media; + + if (PS_ISSET(stream, KERNELIZED)) + return; + if (call->recording != NULL && !selected_recording_method->kernel_support) + goto no_kernel; + if (!kernel.is_wanted) + goto no_kernel; + nk_warn_msg = "interface to kernel module not open"; + if (!kernel.is_open) + goto no_kernel_warn; + if (MEDIA_ISSET(media, GENERATOR)) + goto no_kernel; + if (!stream->selected_sfd) + goto no_kernel; + if (media->monologue->block_media || call->block_media) + goto no_kernel; + if (!stream->endpoint.address.family) + goto no_kernel; + + GQueue *sinks = stream->rtp_sinks.length ? &stream->rtp_sinks : &stream->rtcp_sinks; + struct rtpengine_target_info reti; + ZERO(reti); // reti.local.family determines if anything can be done + GQueue outputs = G_QUEUE_INIT; + + if (!sinks->length) { + // add blackhole kernel rule + const char *err = kernelize_one(&reti, &outputs, stream, NULL); + if (err) + ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); + } + else { + for (GList *l = sinks->head; l; l = l->next) { + struct sink_handler *sh = l->data; + const char *err = kernelize_one(&reti, &outputs, stream, sh); + if (err) + ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err); + } + } + + if (reti.local.family) { + kernel_add_stream(&reti); + struct rtpengine_destination_info *redi; + while ((redi = g_queue_pop_head(&outputs))) { + kernel_add_destination(redi); + g_slice_free1(sizeof(*redi), redi); + } + } + + PS_SET(stream, KERNELIZED); return; no_kernel_warn: @@ -1342,11 +1404,21 @@ void __unkernelize(struct packet_stream *p) { } +void __reset_sink_handlers(struct packet_stream *ps) { + for (GList *l = ps->rtp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + sh->handler = NULL; + } + for (GList *l = ps->rtcp_sinks.head; l; l = l->next) { + struct sink_handler *sh = l->data; + sh->handler = NULL; + } +} void __stream_unconfirm(struct packet_stream *ps) { __unkernelize(ps); if (!MEDIA_ISSET(ps->media, ASYMMETRIC)) PS_CLEAR(ps, CONFIRMED); - ps->handler = NULL; + __reset_sink_handlers(ps); } static void stream_unconfirm(struct packet_stream *ps) { if (!ps) @@ -1355,6 +1427,12 @@ static void stream_unconfirm(struct packet_stream *ps) { __stream_unconfirm(ps); mutex_unlock(&ps->in_lock); } +static void unconfirm_sinks(GQueue *q) { + for (GList *l = q->head; l; l = l->next) { + struct sink_handler *sh = l->data; + stream_unconfirm(sh->sink); + } +} void unkernelize(struct packet_stream *ps) { if (!ps) return; @@ -1383,10 +1461,11 @@ void media_update_stats(struct call_media *m) { +// `out_media` can be NULL const struct streamhandler *determine_handler(const struct transport_protocol *in_proto, struct call_media *out_media, int must_recrypt) { - const struct transport_protocol *out_proto = out_media->protocol; + const struct transport_protocol *out_proto = out_media ? out_media->protocol : NULL; const struct streamhandler * const *sh_pp, *sh; const struct streamhandler * const * const *matrix; @@ -1399,7 +1478,7 @@ const struct streamhandler *determine_handler(const struct transport_protocol *i goto err; // special handling for RTP/AVP with advertised a=rtcp-fb - int out_proto_idx = out_proto->index; + int out_proto_idx = out_proto ? out_proto->index : in_proto->index; if (out_media && MEDIA_ISSET(out_media, RTCP_FB)) { if (!out_proto->avpf && out_proto->avpf_proto) out_proto_idx = out_proto->avpf_proto; @@ -1416,24 +1495,27 @@ err: } /* must be called with call->master_lock held in R, and in->in_lock held */ -static void __determine_handler(struct packet_stream *in, const struct packet_stream *out) { +// `sh` can be null +static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) { const struct transport_protocol *in_proto, *out_proto; int must_recrypt = 0; + struct packet_stream *out = sh ? sh->sink : NULL; + const struct streamhandler *ret = NULL; - if (in->handler) - return; + if (sh && sh->handler) + return sh->handler; if (MEDIA_ISSET(in->media, PASSTHRU)) goto noop; in_proto = in->media->protocol; - out_proto = out->media->protocol; + out_proto = out ? out->media->protocol : NULL; if (!in_proto) goto err; - if (!out_proto) - goto err; - if (dtmf_do_logging()) + if (!sh) + must_recrypt = 1; + else if (dtmf_do_logging()) must_recrypt = 1; else if (MEDIA_ISSET(in->media, DTLS) || MEDIA_ISSET(out->media, DTLS)) must_recrypt = 1; @@ -1441,39 +1523,36 @@ static void __determine_handler(struct packet_stream *in, const struct packet_st must_recrypt = 1; else if (in->call->recording) must_recrypt = 1; + else if (in->rtp_sinks.length > 1 || in->rtcp_sinks.length > 1) // need a proper decrypter? + must_recrypt = 1; else if (in_proto->srtp && out_proto->srtp && in->selected_sfd && out->selected_sfd && (crypto_params_cmp(&in->crypto.params, &out->selected_sfd->crypto.params) || crypto_params_cmp(&out->crypto.params, &in->selected_sfd->crypto.params))) must_recrypt = 1; - in->handler = determine_handler(in_proto, out->media, must_recrypt); - return; + ret = determine_handler(in_proto, out ? out->media : NULL, must_recrypt); + if (sh) + sh->handler = ret; + return ret; err: ilog(LOG_WARNING, "Unknown transport protocol encountered"); noop: - in->handler = &__sh_noop; - return; + ret = &__sh_noop; + if (sh) + sh->handler = ret; + return ret; } -// check and update SSRC pointers -static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *out_srtp, uint32_t ssrc_bs, - struct packet_handler_ctx *phc) +// check and update input SSRC pointers +static int __stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, + struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash) { uint32_t in_ssrc = ntohl(ssrc_bs); - uint32_t out_ssrc; - struct ssrc_ctx **ssrc_in_p = &phc->mp.ssrc_in; - struct ssrc_ctx **ssrc_out_p = &phc->mp.ssrc_out; - - if (!phc->mp.media || !phc->mp.media_out) - return; - - struct ssrc_hash *ssrc_hash_in = phc->mp.media->monologue->ssrc_hash; - struct ssrc_hash *ssrc_hash_out = phc->mp.media_out->monologue->ssrc_hash; + int ret = 0; - // input direction mutex_lock(&in_srtp->in_lock); (*ssrc_in_p) = in_srtp->ssrc_in; @@ -1483,10 +1562,10 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o ssrc_ctx_put(ssrc_in_p); ssrc_ctx_put(&in_srtp->ssrc_in); (*ssrc_in_p) = in_srtp->ssrc_in = - get_ssrc_ctx(in_ssrc, ssrc_hash_in, SSRC_DIR_INPUT, in_srtp->media->monologue); + get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT, in_srtp->media->monologue); ssrc_ctx_hold(in_srtp->ssrc_in); - phc->unkernelize = 1; + ret = 1; ilog(LOG_DEBUG, ">>> in_ssrc changed for: %s%s:%d new: %x %s", FMT_M(sockaddr_print_buf(&in_srtp->endpoint.address), in_srtp->endpoint.port, in_ssrc)); } @@ -1495,10 +1574,18 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o if (!MEDIA_ISSET(in_srtp->media, TRANSCODE) && !MEDIA_ISSET(in_srtp->media, ECHO)) (*ssrc_in_p)->ssrc_map_out = in_ssrc; - out_ssrc = (*ssrc_in_p)->ssrc_map_out; mutex_unlock(&in_srtp->in_lock); + return ret; +} +// check and update output SSRC pointers +static int __stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ssrc_bs, + struct ssrc_ctx *ssrc_in, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash) +{ + uint32_t in_ssrc = ntohl(ssrc_bs); + uint32_t out_ssrc; + int ret = 0; - // out direction + out_ssrc = ssrc_in->ssrc_map_out; mutex_lock(&out_srtp->out_lock); (*ssrc_out_p) = out_srtp->ssrc_out; @@ -1508,11 +1595,10 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o ssrc_ctx_put(ssrc_out_p); ssrc_ctx_put(&out_srtp->ssrc_out); (*ssrc_out_p) = out_srtp->ssrc_out = - get_ssrc_ctx(out_ssrc, ssrc_hash_out, SSRC_DIR_OUTPUT, out_srtp->media->monologue); + get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT, out_srtp->media->monologue); ssrc_ctx_hold(out_srtp->ssrc_out); - // coverity[missing_lock : FALSE] - phc->unkernelize = 1; + ret = 1; ilog(LOG_DEBUG, ">>> out_ssrc changed for %s%s:%d new: %x %s", FMT_M(sockaddr_print_buf(&out_srtp->endpoint.address), out_srtp->endpoint.port, out_ssrc)); } @@ -1521,6 +1607,7 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o (*ssrc_out_p)->ssrc_map_out = in_ssrc; mutex_unlock(&out_srtp->out_lock); + return ret; } @@ -1592,35 +1679,43 @@ loop_ok: -// in_srtp and out_srtp are set to point to the SRTP contexts to use -// sink is set to where to forward the packet to +// in_srtp is set to point to the SRTP context to use +// sinks is set to where to forward the packet to static void media_packet_rtcp_demux(struct packet_handler_ctx *phc) { phc->in_srtp = phc->mp.stream; - phc->sink = phc->mp.stream->rtp_sink; - if (!phc->sink && PS_ISSET(phc->mp.stream, RTCP)) { - phc->sink = phc->mp.stream->rtcp_sink; - phc->rtcp = 1; - } - else if (phc->mp.stream->rtcp_sink) { - int muxed_rtcp = rtcp_demux(&phc->s, phc->mp.media); - if (muxed_rtcp == 2) { - phc->sink = phc->mp.stream->rtcp_sink; + phc->sinks = &phc->mp.stream->rtp_sinks; + // is this RTCP? + if (PS_ISSET(phc->mp.stream, RTCP)) { + int is_rtcp = 1; + // plain RTCP or are we muxing? + if (MEDIA_ISSET(phc->mp.media, RTCP_MUX)) { + is_rtcp = 0; + int muxed_rtcp = rtcp_demux(&phc->s, phc->mp.media); + if (muxed_rtcp == 2) { + is_rtcp = 1; + phc->in_srtp = phc->mp.stream->rtcp_sibling; // use RTCP SRTP context + } + } + if (is_rtcp) { + phc->sinks = &phc->mp.stream->rtcp_sinks; phc->rtcp = 1; - phc->in_srtp = phc->mp.stream->rtcp_sibling; // use RTCP SRTP context } } - phc->out_srtp = phc->sink; - if (!phc->sink) - return; - if (phc->rtcp && phc->sink->rtcp_sibling) - phc->out_srtp = phc->sink->rtcp_sibling; // use RTCP SRTP context +} +// out_srtp is set to point to the SRTP context to use +static void media_packet_rtcp_mux(struct packet_handler_ctx *phc, struct sink_handler *sh) +{ + phc->out_srtp = sh->sink; + if (phc->rtcp && sh->sink->rtcp_sibling) + phc->out_srtp = sh->sink->rtcp_sibling; // use RTCP SRTP context - phc->mp.media_out = phc->sink->media; + phc->mp.media_out = sh->sink->media; + phc->mp.sink = *sh; } -static void media_packet_rtp(struct packet_handler_ctx *phc) +static void media_packet_rtp_in(struct packet_handler_ctx *phc) { phc->payload_type = -1; @@ -1629,11 +1724,13 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) if (G_UNLIKELY(!proto_is_rtp(phc->mp.media->protocol))) return; + int unkern = 0; + if (G_LIKELY(!phc->rtcp && !rtp_payload(&phc->mp.rtp, &phc->mp.payload, &phc->s))) { rtp_padding(phc->mp.rtp, &phc->mp.payload); - if (G_LIKELY(phc->out_srtp != NULL)) - __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtp->ssrc, phc); + unkern = __stream_ssrc_in(phc->in_srtp, phc->mp.rtp->ssrc, &phc->mp.ssrc_in, + phc->mp.media->monologue->ssrc_hash); // check the payload type // XXX redundant between SSRC handling and codec_handler stuff -> combine @@ -1644,7 +1741,8 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) // XXX yet another hash table per payload type -> combine struct rtp_stats *rtp_s = g_atomic_pointer_get(&phc->mp.stream->rtp_stats_cache); if (G_UNLIKELY(!rtp_s) || G_UNLIKELY(rtp_s->payload_type != phc->payload_type)) - rtp_s = g_hash_table_lookup(phc->mp.stream->rtp_stats, GINT_TO_POINTER(phc->payload_type)); + rtp_s = g_hash_table_lookup(phc->mp.stream->rtp_stats, + GUINT_TO_POINTER(phc->payload_type)); if (!rtp_s) { ilog(LOG_WARNING | LOG_FLAG_LIMIT, "RTP packet with unknown payload type %u received from %s%s%s", @@ -1660,27 +1758,45 @@ static void media_packet_rtp(struct packet_handler_ctx *phc) } } else if (phc->rtcp && !rtcp_payload(&phc->mp.rtcp, NULL, &phc->s)) { - if (G_LIKELY(phc->out_srtp != NULL)) - __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtcp->ssrc, phc); + unkern = __stream_ssrc_in(phc->in_srtp, phc->mp.rtcp->ssrc, &phc->mp.ssrc_in, + phc->mp.media->monologue->ssrc_hash); } + + if (unkern) + phc->unkernelize = 1; +} +static void media_packet_rtp_out(struct packet_handler_ctx *phc) +{ + if (G_UNLIKELY(!proto_is_rtp(phc->mp.media->protocol))) + return; + + int unkern = 0; + + if (G_LIKELY(!phc->rtcp && phc->mp.rtp)) { + unkern = __stream_ssrc_out(phc->out_srtp, phc->mp.rtp->ssrc, phc->mp.ssrc_in, + &phc->mp.ssrc_out, phc->mp.media_out->monologue->ssrc_hash); + } + else if (phc->rtcp && phc->mp.rtcp) { + unkern = __stream_ssrc_out(phc->out_srtp, phc->mp.rtcp->ssrc, phc->mp.ssrc_in, + &phc->mp.ssrc_out, phc->mp.media_out->monologue->ssrc_hash); + } + + if (unkern) + phc->unkernelize = 1; } static int media_packet_decrypt(struct packet_handler_ctx *phc) { mutex_lock(&phc->in_srtp->in_lock); - __determine_handler(phc->in_srtp, phc->sink); + struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL; + const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh); // XXX use an array with index instead of if/else - if (G_LIKELY(!phc->rtcp)) { - phc->decrypt_func = phc->in_srtp->handler->in->rtp_crypt; - phc->encrypt_func = phc->in_srtp->handler->out->rtp_crypt; - } - else { - phc->decrypt_func = phc->in_srtp->handler->in->rtcp_crypt; - phc->encrypt_func = phc->in_srtp->handler->out->rtcp_crypt; - phc->rtcp_filter = phc->in_srtp->handler->in->rtcp_filter; - } + if (G_LIKELY(!phc->rtcp)) + phc->decrypt_func = sh->in->rtp_crypt; + else + phc->decrypt_func = sh->in->rtcp_crypt; /* return values are: 0 = forward packet, -1 = error/don't forward, * 1 = forward and push update to redis */ @@ -1701,6 +1817,20 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc) } return ret; } +static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh) +{ + mutex_lock(&phc->in_srtp->in_lock); + __determine_handler(phc->in_srtp, sh); + + // XXX use an array with index instead of if/else + if (G_LIKELY(!phc->rtcp)) + phc->encrypt_func = sh->handler->out->rtp_crypt; + else { + phc->encrypt_func = sh->handler->out->rtcp_crypt; + phc->rtcp_filter = sh->handler->in->rtcp_filter; + } + mutex_unlock(&phc->in_srtp->in_lock); +} int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) { int ret = 0x00; // 0x01 = error, 0x02 = update @@ -1767,9 +1897,13 @@ static int media_packet_address_check(struct packet_handler_ctx *phc) if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || rtpe_config.endpoint_learning == EL_OFF) PS_SET(phc->mp.stream, CONFIRMED); - /* confirm sink for unidirectional streams in order to kernelize */ - if (MEDIA_ISSET(phc->mp.media, UNIDIRECTIONAL)) - PS_SET(phc->sink, CONFIRMED); + /* confirm sinks for unidirectional streams in order to kernelize */ + if (MEDIA_ISSET(phc->mp.media, UNIDIRECTIONAL)) { + for (GList *l = phc->sinks->head; l; l = l->next) { + struct sink_handler *sh = l->data; + PS_SET(sh->sink, CONFIRMED); + } + } /* if we have already updated the endpoint in the past ... */ if (PS_ISSET(phc->mp.stream, CONFIRMED)) { @@ -1896,26 +2030,30 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) { return; } - if (!phc->sink) { - __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), - phc->mp.stream->endpoint.port); - return; - } +// if (!phc->sinks->length) { +// __C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), +// phc->mp.stream->endpoint.port); +// return; +// } - if (MEDIA_ISSET(phc->sink->media, ASYMMETRIC)) - PS_SET(phc->sink, CONFIRMED); + for (GList *l = phc->sinks->head; l; l = l->next) { + struct sink_handler *sh = l->data; - if (!PS_ISSET(phc->sink, CONFIRMED)) { - __C_DBG("sink not CONFIRMED for stream %s:%d", - sockaddr_print_buf(&phc->mp.stream->endpoint.address), - phc->mp.stream->endpoint.port); - return; - } + if (MEDIA_ISSET(sh->sink->media, ASYMMETRIC)) + PS_SET(sh->sink, CONFIRMED); - if (!PS_ISSET(phc->sink, FILLED)) { - __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), - phc->mp.stream->endpoint.port); - return; + if (!PS_ISSET(sh->sink, CONFIRMED)) { + __C_DBG("sink not CONFIRMED for stream %s:%d", + sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); + return; + } + + if (!PS_ISSET(sh->sink, FILLED)) { + __C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), + phc->mp.stream->endpoint.port); + return; + } } mutex_lock(&phc->mp.stream->in_lock); @@ -1924,30 +2062,30 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) { } -static int do_rtcp(struct packet_handler_ctx *phc) { - int ret = -1; - - GQueue rtcp_list = G_QUEUE_INIT; - int rtcp_ret = rtcp_parse(&rtcp_list, &phc->mp); +static int do_rtcp_parse(struct packet_handler_ctx *phc) { + int rtcp_ret = rtcp_parse(&phc->rtcp_list, &phc->mp); if (rtcp_ret < 0) - goto out; + return -1; if (rtcp_ret == 1) - goto ok; + phc->rtcp_discard = 1; + return 0; +} +static int do_rtcp_output(struct packet_handler_ctx *phc) { + if (phc->rtcp_discard) + return 0; + if (phc->rtcp_filter) - if (phc->rtcp_filter(&phc->mp, &rtcp_list)) - goto out; + if (phc->rtcp_filter(&phc->mp, &phc->rtcp_list)) + return -1; // queue for output codec_add_raw_packet(&phc->mp, 0); -ok: - ret = 0; -out: - rtcp_list_free(&rtcp_list); - return ret; + return 0; } // appropriate locks must be held +// only frees the output queue if no `sink` is given int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink) { struct codec_packet *p; while ((p = g_queue_pop_head(&mp->packets_out))) { @@ -1980,12 +2118,28 @@ void media_packet_release(struct media_packet *mp) { obj_put(&mp->ssrc_in->parent->h); if (mp->ssrc_out) obj_put(&mp->ssrc_out->parent->h); - g_queue_clear_full(&mp->packets_out, codec_packet_free); + media_socket_dequeue(mp, NULL); g_free(mp->rtp); g_free(mp->rtcp); } +static int media_packet_queue_dup(GQueue *q) { + for (GList *l = q->head; l; l = l->next) { + struct codec_packet *p = l->data; + if (p->free_func) // nothing to do, already private + continue; + char *buf = malloc(p->s.len + RTP_BUFFER_TAIL_ROOM); + if (!buf) + return -1; + memcpy(buf, p->s.s, p->s.len); + p->s.s = buf; + p->free_func = free; + } + return 0; +} + + /* called lock-free */ static int stream_packet(struct packet_handler_ctx *phc) { /** @@ -2017,12 +2171,13 @@ static int stream_packet(struct packet_handler_ctx *phc) { phc->mp.stream = phc->mp.sfd->stream; if (G_UNLIKELY(!phc->mp.stream)) goto out; - __C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address), - phc->mp.stream->endpoint.port); + __C_DBG("Handling packet on: %s", endpoint_print_buf(&phc->mp.stream->endpoint)); phc->mp.media = phc->mp.stream->media; + ///////////////// INGRESS HANDLING + if (!phc->mp.stream->selected_sfd) goto out; @@ -2051,11 +2206,11 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (rtpe_config.active_switchover && IS_FOREIGN_CALL(phc->mp.call)) call_make_own_foreign(phc->mp.call, 0); - // this sets rtcp, in_srtp, out_srtp, and sink + // this sets rtcp, in_srtp, and sinks media_packet_rtcp_demux(phc); - // this set payload_type, ssrc_in, ssrc_out and mp - media_packet_rtp(phc); + // this set payload_type, ssrc_in, and mp payloads + media_packet_rtp_in(phc); // SSRC receive stats if (phc->mp.ssrc_in && phc->mp.rtp) { @@ -2076,103 +2231,146 @@ static int stream_packet(struct packet_handler_ctx *phc) { } } - - /* do we have somewhere to forward it to? */ - - if (G_UNLIKELY(!phc->sink || !phc->sink->selected_sfd || !phc->out_srtp - || !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd)) - { - ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Media packet from %s%s%s discarded due to lack of sink", - FMT_M(endpoint_print_buf(&phc->mp.fsin))); - atomic64_inc(&phc->mp.stream->stats.errors); - RTPE_STATS_INC(errors, 1); - goto out; - } - - + // decrypt in place + // XXX check handler_ret along the paths handler_ret = media_packet_decrypt(phc); + if (handler_ret < 0) + goto out; // receive error // If recording pcap dumper is set, then we record the call. if (phc->mp.call->recording) dump_packet(&phc->mp, &phc->s); - // ready to process - phc->mp.raw = phc->s; + // XXX separate stats for received/sent + atomic64_inc(&phc->mp.stream->stats.packets); + atomic64_add(&phc->mp.stream->stats.bytes, phc->s.len); + atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec); + RTPE_STATS_INC(packets, 1); + RTPE_STATS_INC(bytes, phc->s.len); + if (phc->rtcp) { - if (do_rtcp(phc)) - goto drop; - } - else { - struct codec_handler *transcoder = codec_handler_get(phc->mp.media, phc->payload_type, - phc->mp.media_out); - // this transfers the packet from 's' to 'packets_out' - if (transcoder->func(transcoder, &phc->mp)) + handler_ret = -1; + if (do_rtcp_parse(phc)) + goto out; + if (phc->rtcp_discard) goto drop; } - if (G_LIKELY(handler_ret >= 0)) - handler_ret = __media_packet_encrypt(phc); - - if (phc->unkernelize) // for RTCP packet index updates - unkernelize(phc->mp.stream); - - int address_check = media_packet_address_check(phc); - if (phc->kernelize) - media_packet_kernel_check(phc); if (address_check) goto drop; - mutex_lock(&phc->sink->out_lock); + ///////////////// EGRESS HANDLING - if (!phc->sink->advertised_endpoint.port - || (is_addr_unspecified(&phc->sink->advertised_endpoint.address) - && !is_trickle_ice_address(&phc->sink->advertised_endpoint)) - || handler_ret < 0) - { - mutex_unlock(&phc->sink->out_lock); - goto drop; - } + for (GList *sink = phc->sinks->head; sink; sink = sink->next) { + struct sink_handler *sh = sink->data; - if (!MEDIA_ISSET(phc->mp.media, BLACKHOLE)) - ret = media_socket_dequeue(&phc->mp, phc->sink); - else - ret = media_socket_dequeue(&phc->mp, NULL); + // this sets rtcp, in_srtp, out_srtp, media_out, and sink + media_packet_rtcp_mux(phc, sh); + + // this set ssrc_out + media_packet_rtp_out(phc); + + if (G_UNLIKELY(!sh->sink->selected_sfd || !phc->out_srtp + || !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd)) + { + errno = ENOENT; + ilog(LOG_WARNING | LOG_FLAG_LIMIT, + "Media packet from %s%s%s discarded due to lack of sink", + FMT_M(endpoint_print_buf(&phc->mp.fsin))); + goto err_next; + } + + media_packet_set_encrypt(phc, sh); - mutex_unlock(&phc->sink->out_lock); + if (phc->rtcp) { + if (do_rtcp_output(phc)) + goto err_next; + } + else { + struct codec_handler *transcoder = codec_handler_get(phc->mp.media, phc->payload_type, + phc->mp.media_out); + // this transfers the packet from 's' to 'packets_out' + if (transcoder->func(transcoder, &phc->mp)) + goto err_next; + } + + // if this is not the last sink, duplicate the output queue packets if necessary + if (sink->next) { + ret = media_packet_queue_dup(&phc->mp.packets_out); + errno = ENOMEM; + if (ret) + goto err_next; + } + + ret = __media_packet_encrypt(phc); + errno = ENOTTY; + if (ret) + goto err_next; + + mutex_lock(&sh->sink->out_lock); + + if (!sh->sink->advertised_endpoint.port + || (is_addr_unspecified(&sh->sink->advertised_endpoint.address) + && !is_trickle_ice_address(&sh->sink->advertised_endpoint))) + { + mutex_unlock(&sh->sink->out_lock); + goto next; + } + + if (!MEDIA_ISSET(phc->mp.media, BLACKHOLE)) + ret = media_socket_dequeue(&phc->mp, sh->sink); + else + ret = media_socket_dequeue(&phc->mp, NULL); + + mutex_unlock(&sh->sink->out_lock); - if (ret == -1) { + if (ret == 0) + goto next; + +err_next: ret = -errno; - ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno)); - atomic64_inc(&phc->mp.stream->stats.errors); + ilog(LOG_DEBUG,"Error when sending message. Error: %s", strerror(errno)); + atomic64_inc(&sh->sink->stats.errors); RTPE_STATS_INC(errors, 1); - goto out; + goto next; + +next: + media_socket_dequeue(&phc->mp, NULL); // just free if anything left + ssrc_ctx_put(&phc->mp.ssrc_out); } + ///////////////// INGRESS POST-PROCESSING HANDLING + + if (phc->unkernelize) // for RTCP packet index updates + unkernelize(phc->mp.stream); + if (phc->kernelize) + media_packet_kernel_check(phc); + drop: ret = 0; - // XXX separate stats for received/sent - atomic64_inc(&phc->mp.stream->stats.packets); - atomic64_add(&phc->mp.stream->stats.bytes, phc->s.len); - atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec); - RTPE_STATS_INC(packets, 1); - RTPE_STATS_INC(bytes, phc->s.len); + handler_ret = 0; out: if (phc->unkernelize) { stream_unconfirm(phc->mp.stream); - stream_unconfirm(phc->mp.stream->rtp_sink); - stream_unconfirm(phc->mp.stream->rtcp_sink); + unconfirm_sinks(&phc->mp.stream->rtp_sinks); + unconfirm_sinks(&phc->mp.stream->rtcp_sinks); + } + + if (handler_ret < 0) { + atomic64_inc(&phc->mp.stream->stats.errors); + RTPE_STATS_INC(errors, 1); } rwlock_unlock_r(&phc->mp.call->master_lock); - g_queue_clear_full(&phc->mp.packets_out, codec_packet_free); + media_socket_dequeue(&phc->mp, NULL); // just free ssrc_ctx_put(&phc->mp.ssrc_in); - ssrc_ctx_put(&phc->mp.ssrc_out); + rtcp_list_free(&phc->rtcp_list); return ret; } diff --git a/daemon/recording.c b/daemon/recording.c index dee608888..bb178f790 100644 --- a/daemon/recording.c +++ b/daemon/recording.c @@ -302,7 +302,7 @@ void recording_start(struct call *call, const char *prefix, str *metadata) { struct packet_stream *ps = l->data; recording_setup_stream(ps); __unkernelize(ps); - ps->handler = NULL; + __reset_sink_handlers(ps); } recording_update_flags(call); diff --git a/daemon/redis.c b/daemon/redis.c index 3cb540e77..23a054155 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1568,7 +1568,32 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ for (i = 0; i < tags->len; i++) { ml = tags->ptrs[i]; - ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); + if (json_build_list(&q, c, "subscriptions-oa", &c->callid, i, tags, root_reader)) + return -1; + for (l = q.head; l; l = l->next) { + other_ml = l->data; + if (!other_ml) + return -1; + __add_subscription(ml, other_ml, true); + } + g_queue_clear(&q); + + if (json_build_list(&q, c, "subscriptions-noa", &c->callid, i, tags, root_reader)) + return -1; + for (l = q.head; l; l = l->next) { + other_ml = l->data; + if (!other_ml) + return -1; + __add_subscription(ml, other_ml, false); + } + g_queue_clear(&q); + + // backwards compatibility + if (!ml->subscriptions.length) { + other_ml = redis_list_get_ptr(tags, &tags->rh[i], "active"); + if (other_ml) + __add_subscription(ml, other_ml, true); + } if (json_build_list(&q, c, "other_tags", &c->callid, i, tags, root_reader)) return -1; @@ -1602,19 +1627,53 @@ static int json_link_streams(struct call *c, struct redis_list *streams, { unsigned int i; struct packet_stream *ps; + GQueue q = G_QUEUE_INIT; + GList *l; for (i = 0; i < streams->len; i++) { ps = streams->ptrs[i]; ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); - ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); - ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds, root_reader)) return -1; + if (json_build_list(&q, c, "rtp_sinks", &c->callid, i, streams, root_reader)) + return -1; + for (l = q.head; l; l = l->next) { + struct packet_stream *sink = l->data; + if (!sink) + return -1; + __add_sink_handler(&ps->rtp_sinks, sink); + } + g_queue_clear(&q); + + // backwards compatibility + if (!ps->rtp_sinks.length) { + struct packet_stream *sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); + if (sink) + __add_sink_handler(&ps->rtp_sinks, sink); + } + + if (json_build_list(&q, c, "rtcp_sinks", &c->callid, i, streams, root_reader)) + return -1; + for (l = q.head; l; l = l->next) { + struct packet_stream *sink = l->data; + if (!sink) + return -1; + __add_sink_handler(&ps->rtcp_sinks, sink); + } + g_queue_clear(&q); + + // backwards compatibility + if (!ps->rtcp_sinks.length) { + struct packet_stream *sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); + if (sink) + __add_sink_handler(&ps->rtcp_sinks, sink); + } + if (ps->media) __rtp_stats_update(ps->rtp_stats, &ps->media->codecs); @@ -1646,12 +1705,15 @@ static int json_link_medias(struct call *c, struct redis_list *medias, // find the pair media struct call_monologue *ml = med->monologue; - struct call_monologue *other_ml = ml->active_dialogue; - for (GList *l = other_ml->medias.head; l; l = l->next) { - struct call_media *other_m = l->data; - if (other_m->index == med->index) { - codec_handlers_update(med, other_m, NULL, NULL); - break; + for (GList *sub = ml->subscriptions.head; sub; sub = sub->next) { + struct call_subscription *cs = sub->data; + struct call_monologue *other_ml = cs->monologue; + for (GList *l = other_ml->medias.head; l; l = l->next) { + struct call_media *other_m = l->data; + if (other_m->index == med->index) { + codec_handlers_update(med, other_m, NULL, NULL); + break; + } } } } @@ -2170,8 +2232,6 @@ char* redis_encode_json(struct call *c) { { JSON_SET_SIMPLE("media","%u",ps->media->unique_id); JSON_SET_SIMPLE("sfd","%u",ps->selected_sfd ? ps->selected_sfd->unique_id : -1); - JSON_SET_SIMPLE("rtp_sink","%u",ps->rtp_sink ? ps->rtp_sink->unique_id : -1); - JSON_SET_SIMPLE("rtcp_sink","%u",ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1); JSON_SET_SIMPLE("rtcp_sibling","%u",ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1); JSON_SET_SIMPLE("last_packet",UINT64F,atomic64_get(&ps->last_packet)); JSON_SET_SIMPLE("ps_flags","%u",ps->ps_flags); @@ -2196,6 +2256,7 @@ char* redis_encode_json(struct call *c) { for (l = c->streams.head; l; l = l->next) { ps = l->data; + // XXX these should all go into the above loop mutex_lock(&ps->in_lock); mutex_lock(&ps->out_lock); @@ -2209,6 +2270,26 @@ char* redis_encode_json(struct call *c) { } json_builder_end_array (builder); + snprintf(tmp, sizeof(tmp), "rtp_sinks-%u", ps->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array(builder); + for (k = ps->rtp_sinks.head; k; k = k->next) { + struct sink_handler *sh = k->data; + struct packet_stream *sink = sh->sink; + JSON_ADD_STRING("%u", sink->unique_id); + } + json_builder_end_array (builder); + + snprintf(tmp, sizeof(tmp), "rtcp_sinks-%u", ps->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array(builder); + for (k = ps->rtcp_sinks.head; k; k = k->next) { + struct sink_handler *sh = k->data; + struct packet_stream *sink = sh->sink; + JSON_ADD_STRING("%u", sink->unique_id); + } + json_builder_end_array (builder); + mutex_unlock(&ps->in_lock); mutex_unlock(&ps->out_lock); } @@ -2224,7 +2305,6 @@ char* redis_encode_json(struct call *c) { { JSON_SET_SIMPLE("created","%llu",(long long unsigned) ml->created); - JSON_SET_SIMPLE("active","%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1); JSON_SET_SIMPLE("deleted","%llu",(long long unsigned) ml->deleted); JSON_SET_SIMPLE("block_dtmf","%i",ml->block_dtmf ? 1 : 0); JSON_SET_SIMPLE("block_media","%i",ml->block_media ? 1 : 0); @@ -2245,6 +2325,7 @@ char* redis_encode_json(struct call *c) { for (l = c->monologues.head; l; l = l->next) { ml = l->data; // -- we do it again here since the jsonbuilder is linear straight forward + // XXX these should all go into the above loop k = g_hash_table_get_values(ml->other_tags); snprintf(tmp, sizeof(tmp), "other_tags-%u", ml->unique_id); json_builder_set_member_name(builder, tmp); @@ -2304,6 +2385,28 @@ char* redis_encode_json(struct call *c) { g_list_free(k); rwlock_unlock_r(&ml->ssrc_hash->lock); + + snprintf(tmp, sizeof(tmp), "subscriptions-oa-%u", ml->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array(builder); + for (k = ml->subscriptions.head; k; k = k->next) { + struct call_subscription *cs = k->data; + if (!cs->offer_answer) + continue; + JSON_ADD_STRING("%u", cs->monologue->unique_id); + } + json_builder_end_array(builder); + + snprintf(tmp, sizeof(tmp), "subscriptions-noa-%u", ml->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array(builder); + for (k = ml->subscriptions.head; k; k = k->next) { + struct call_subscription *cs = k->data; + if (cs->offer_answer) + continue; + JSON_ADD_STRING("%u", cs->monologue->unique_id); + } + json_builder_end_array(builder); } diff --git a/daemon/rtcp.c b/daemon/rtcp.c index 2b2725fda..fda0616b9 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1567,15 +1567,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { ps = next_ps; } - struct call_media *other_media = NULL; - if (ps->rtp_sink) - other_media = ps->rtp_sink->media; - else if (ps->rtcp_sink) - other_media = ps->rtcp_sink->media; - media_update_stats(media); - if (other_media) - media_update_stats(other_media); log_info_stream_fd(ps->selected_sfd); @@ -1598,12 +1590,22 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { socket_sendto(&ps->selected_sfd->socket, sr->str, sr->len, &ps->endpoint); g_string_free(sr, TRUE); - if (other_media) + GQueue *sinks = ps->rtp_sinks.length ? &ps->rtp_sinks : &ps->rtcp_sinks; + for (GList *l = sinks->head; l; l = l->next) { + struct sink_handler *sh = l->data; + struct packet_stream *sink = sh->sink; + struct call_media *other_media = sink->media; + + media_update_stats(other_media); + ssrc_sender_report(other_media, &ssr, &rtpe_now); - struct ssrc_receiver_report *srr; - while ((srr = g_queue_pop_head(&srrs))) { - if (other_media) + for (GList *k = srrs.head; k; k = k->next) { + struct ssrc_receiver_report *srr = k->data; ssrc_receiver_report(other_media, srr, &rtpe_now); + } + } + while (srrs.length) { + struct ssrc_receiver_report *srr = g_queue_pop_head(&srrs); g_slice_free1(sizeof(*srr), srr); } } diff --git a/daemon/ssrc.c b/daemon/ssrc.c index bc81dab5b..32f3a2df2 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -88,7 +88,7 @@ static void mos_calc(struct ssrc_stats_block *ssb) { ssb->mos = intmos; } -static struct ssrc_entry *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht) { +static void *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht) { rwlock_lock_r(&ht->lock); struct ssrc_entry *ret = g_atomic_pointer_get(&ht->cache); if (!ret || ret->ssrc != ssrc) { @@ -253,6 +253,18 @@ static void *__do_time_report_item(struct call_media *m, size_t struct_size, siz return sti; } +// call must be locked in R +static struct ssrc_entry_call *hunt_ssrc(struct call_monologue *ml, uint32_t ssrc) { + for (GList *l = ml->subscriptions.head; l; l = l->next) { + struct call_subscription *cs = l->data; + struct call_monologue *other = cs->monologue; + struct ssrc_entry_call *e = find_ssrc(ssrc, other->ssrc_hash); + if (e) + return e; + } + return NULL; +} + static long long __calc_rtt(struct call_monologue *ml, uint32_t ssrc, uint32_t ntp_middle_bits, uint32_t delay, size_t reports_queue_offset, const struct timeval *tv, int *pt_p) { @@ -262,7 +274,7 @@ static long long __calc_rtt(struct call_monologue *ml, uint32_t ssrc, uint32_t n if (!ntp_middle_bits || !delay) return 0; - struct ssrc_entry_call *e = get_ssrc(ssrc, ml->ssrc_hash); + struct ssrc_entry_call *e = hunt_ssrc(ml, ssrc); if (G_UNLIKELY(!e)) return 0; @@ -334,7 +346,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor int pt; - long long rtt = __calc_rtt(m->monologue->active_dialogue, rr->ssrc, rr->lsr, rr->dlsr, + long long rtt = __calc_rtt(m->monologue, rr->ssrc, rr->lsr, rr->dlsr, G_STRUCT_OFFSET(struct ssrc_entry_call, sender_reports), tv, &pt); struct ssrc_entry_call *other_e = get_ssrc(rr->from, m->monologue->ssrc_hash); @@ -440,7 +452,7 @@ void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr, FMT_M(dlrr->from), FMT_M(dlrr->ssrc), dlrr->lrr, dlrr->dlrr); - __calc_rtt(m->monologue->active_dialogue, dlrr->ssrc, dlrr->lrr, dlrr->dlrr, + __calc_rtt(m->monologue, dlrr->ssrc, dlrr->lrr, dlrr->dlrr, G_STRUCT_OFFSET(struct ssrc_entry_call, rr_time_reports), tv, NULL); } diff --git a/daemon/statistics.c b/daemon/statistics.c index db439ed0a..78af5b1fb 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -236,26 +236,8 @@ void statistics_update_oneway(struct call* c) { } if (!found) ps = NULL; - found = 0; - - if (ml->active_dialogue) { - // --- go through partner ml and search the RTP - for (k = ml->active_dialogue->medias.head; k; k = k->next) { - md = k->data; - - for (o = md->streams.head; o; o = o->next) { - ps2 = o->data; - if (PS_ISSET(ps2, RTP)) { - // --- only RTP is interesting - found = 1; - break; - } - } - if (found) { break; } - } - } - if (!found) - ps2 = NULL; + struct sink_handler *sh = g_queue_peek_head(&ps->rtp_sinks); + ps2 = sh ? sh->sink : NULL; if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { if (atomic64_get(&ps->stats.packets)!=0 && IS_OWN_CALL(c)){ diff --git a/include/call.h b/include/call.h index 0fdc5f0a0..9f6866312 100644 --- a/include/call.h +++ b/include/call.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "compat.h" #include "socket.h" #include "media_socket.h" @@ -200,7 +201,6 @@ struct call; struct redis; struct crypto_suite; struct rtpengine_srtp; -struct streamhandler; struct sdp_ng_flags; struct local_interface; struct call_monologue; @@ -290,18 +290,17 @@ struct packet_stream { struct recording_stream recording; /* LOCK: call->master_lock */ GQueue sfds; /* LOCK: call->master_lock */ - struct stream_fd * volatile selected_sfd; + struct stream_fd * selected_sfd; struct dtls_connection ice_dtls; /* LOCK: in_lock */ - struct packet_stream *rtp_sink; /* LOCK: call->master_lock */ - struct packet_stream *rtcp_sink; /* LOCK: call->master_lock */ + GQueue rtp_sinks; // LOCK: call->master_lock, in_lock for streamhandler + GQueue rtcp_sinks; // LOCK: call->master_lock, in_lock for streamhandler struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ - const struct streamhandler *handler; /* LOCK: in_lock */ struct endpoint endpoint; /* LOCK: out_lock */ struct endpoint detected_endpoints[4]; /* LOCK: out_lock */ struct timeval ep_detect_signal; /* LOCK: out_lock */ struct endpoint advertised_endpoint; /* RO */ struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these + struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ *ssrc_out; /* LOCK: out_lock */ struct send_timer *send_timer; /* RO */ struct jitter_buffer *jb; /* RO */ @@ -374,6 +373,13 @@ struct call_media { volatile unsigned int media_flags; }; +// link between subscribers and subscriptions +struct call_subscription { + struct call_monologue *monologue; + GList *link; // link into the corresponding opposite list + unsigned int offer_answer:1; // bidirectional, exclusive +}; + /* half a dialogue */ /* protected by call->master_lock, except the RO elements */ struct call_monologue { @@ -386,12 +392,13 @@ struct call_monologue { str label; time_t created; /* RO */ time_t deleted; - struct timeval started; /* for CDR */ - struct timeval terminated; /* for CDR */ - enum termination_reason term_reason; + struct timeval started; /* for CDR */ + struct timeval terminated; /* for CDR */ + enum termination_reason term_reason; GHashTable *other_tags; GHashTable *branches; - struct call_monologue *active_dialogue; + GQueue subscriptions; // who am I subscribed to (sources) + GQueue subscribers; // who is subscribed to me (sinks) GQueue medias; GHashTable *media_ids; struct media_player *player; @@ -517,15 +524,20 @@ struct call_monologue *__monologue_create(struct call *call); void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); +void __add_subscription(struct call_monologue *ml, struct call_monologue *other, bool offer_answer); +void free_sink_handler(void *); +void __add_sink_handler(GQueue *, struct packet_stream *); struct call *call_get_or_create(const str *callid, int foreign); struct call *call_get_opmode(const str *callid, enum call_opmode opmode); void call_make_own_foreign(struct call *c, int foreign); -struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag, +int call_get_mono_dialogue(struct call_monologue *dialogue[2], struct call *call, const str *fromtag, + const str *totag, const str *viabranch); +struct call_monologue *call_get_monologue(struct call *call, const str *fromtag); struct call *call_get(const str *callid); -int monologue_offer_answer(struct call_monologue *monologue, GQueue *streams, struct sdp_ng_flags *flags); +int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, struct sdp_ng_flags *flags); void codecs_offer_answer(struct call_media *media, struct call_media *other_media, struct stream_params *sp, struct sdp_ng_flags *flags); int call_delete_branch(const str *callid, const str *branch, @@ -601,13 +613,6 @@ INLINE str *call_str_init_dup(struct call *c, char *s) { str_init(&t, s); return call_str_dup(c, &t); } -INLINE struct packet_stream *packet_stream_sink(struct packet_stream *ps) { - struct packet_stream *ret; - ret = ps->rtp_sink; - if (!ret) - ret = ps->rtcp_sink; - return ret; -} INLINE void __call_unkernelize(struct call *call) { for (GList *l = call->monologues.head; l; l = l->next) { struct call_monologue *ml = l->data; diff --git a/include/media_socket.h b/include/media_socket.h index de2933769..9a0723969 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -122,6 +122,11 @@ struct stream_fd { struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ }; +struct sink_handler { + struct packet_stream *sink; + const struct streamhandler *handler; + int kernel_output_idx; +}; struct media_packet { str raw; @@ -132,6 +137,7 @@ struct media_packet { struct packet_stream *stream; // sfd->stream struct call_media *media; // stream->media struct call_media *media_out; // output media + struct sink_handler sink; struct rtp_header *rtp; struct rtcp_packet *rtcp; @@ -176,6 +182,7 @@ void kernelize(struct packet_stream *); void __unkernelize(struct packet_stream *); void unkernelize(struct packet_stream *); void __stream_unconfirm(struct packet_stream *); +void __reset_sink_handlers(struct packet_stream *); void media_update_stats(struct call_media *m); diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index 65df1b503..1eaf725e0 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -4385,13 +4385,13 @@ not_stun: goto skip_error; src_check_ok: + if (g->target.dtls && is_dtls(skb)) + goto skip1; if (g->target.non_forwarding) { if (g->target.blackhole) error_nf_action = NF_DROP; goto skip1; } - if (g->target.dtls && is_dtls(skb)) - goto skip1; rtp.ok = 0; if (!g->target.rtp) diff --git a/t/test-transcode.c b/t/test-transcode.c index 3f4454f42..25b91d0d4 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -87,8 +87,6 @@ static void __start(const char *file, int line) { str_init(&ml_B.tag, "tag_B"); media_B->monologue = &ml_B; media_B->protocol = &transport_protocols[PROTO_RTP_AVP]; - ml_A.active_dialogue = &ml_B; - ml_B.active_dialogue = &ml_A; __init(); } @@ -233,7 +231,7 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media // from __stream_ssrc() if (!MEDIA_ISSET(media, TRANSCODE)) mp.ssrc_in->ssrc_map_out = ntohl(ssrc); - mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, media->monologue->active_dialogue->ssrc_hash, SSRC_DIR_OUTPUT, NULL); + mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, other_media->monologue->ssrc_hash, SSRC_DIR_OUTPUT, NULL); payload_tracker_add(&mp.ssrc_in->tracker, pt_in & 0x7f); int packet_len = sizeof(struct rtp_header) + pl.len;