diff --git a/daemon/call.c b/daemon/call.c index fec234f33..9ec0b3f91 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -70,14 +70,16 @@ static struct mqtt_timer *global_mqtt_timer; unsigned int call_socket_cpu_affinity = 0; -/* ********** */ - +/** + * locally needed static declarations + */ static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start, struct timeval *interval_duration); static void __call_free(void *p); static void __call_cleanup(struct call *c); static void __monologue_stop(struct call_monologue *ml); static void media_stop(struct call_media *m); +static void __subscribe_medias_both_ways(struct call_media * a, struct call_media * b); /* called with call->master_lock held in R */ static int call_timer_delete_monologues(struct call *c) { @@ -2932,6 +2934,26 @@ void call_subscriptions_clear(GQueue *q) { g_queue_clear_full(q, call_subscription_free); } +static void __unsubscribe_media_link(struct call_media * which, GList * which_cm_link) +{ + struct media_subscription * ms = which_cm_link->data; + struct media_subscription * rev_ms = ms->link->data; + struct call_media * from = ms->media; + + ilog(LOG_DEBUG, "Unsubscribing media with monologue tag '" STR_FORMAT_M "' (index: %d)" + "from media with monologue tag '" STR_FORMAT_M "' (index: %d)", + STR_FMT_M(&which->monologue->tag), which->index, + STR_FMT_M(&from->monologue->tag), from->index); + + g_queue_delete_link(&from->media_subscribers, ms->link); + g_queue_delete_link(&which->media_subscriptions, which_cm_link); + + g_hash_table_remove(which->media_subscriptions_ht, ms->media); + g_hash_table_remove(from->media_subscribers_ht, rev_ms->media); + + g_slice_free1(sizeof(*ms), ms); + g_slice_free1(sizeof(*rev_ms), rev_ms); +} static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs_link) { struct call_subscription *cs = which_cs_link->data; struct call_subscription *rev_cs = cs->link->data; @@ -2946,6 +2968,26 @@ static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs g_slice_free1(sizeof(*cs), cs); g_slice_free1(sizeof(*rev_cs), rev_cs); } +/** + * Unsubscribe one particular media subscriber from this call media. + */ +static bool __unsubscribe_media(struct call_media * which, struct call_media * from) +{ + GList * l = g_hash_table_lookup(which->media_subscriptions_ht, from); + + if (!l) { + ilog(LOG_DEBUG, "Media with monologue tag '" STR_FORMAT_M "' (index: %d) " + "is not subscribed to media with monologue tag '" STR_FORMAT_M "' " + "(index: %d). Cannot remove this media subscriber.", + STR_FMT_M(&which->monologue->tag), which->index, + STR_FMT_M(&from->monologue->tag), from->index); + + return false; + } + + __unsubscribe_media_link(which, l); + return true; +} static bool __unsubscribe_one(struct call_monologue *which, struct call_monologue *from) { GList *l = g_hash_table_lookup(which->subscriptions_ht, from); if (!l) { @@ -2971,6 +3013,27 @@ static void __unsubscribe_all_offer_answer_subscribers(struct call_monologue *ml l = next; } } +/** + * Deletes all offer/answer media subscriptions. + */ +static void __unsubscribe_all_offer_answer_medias(struct call_media * cm) { + for (GList *l = cm->media_subscribers.head; l; ) + { + struct media_subscription * ms = l->data; + + if (!ms->attrs.offer_answer) { + l = l->next; + continue; + } + + GList * next = l->next; + struct call_media * other_cm = ms->media; + + __unsubscribe_media(other_cm, cm); + __unsubscribe_media(cm, other_cm); + l = next; + } +} static void __unsubscribe_from_all(struct call_monologue *ml) { for (GList *l = ml->subscriptions.head; l; ) { GList *next = l->next; @@ -2978,6 +3041,68 @@ static void __unsubscribe_from_all(struct call_monologue *ml) { l = next; } } +static void __unsubscribe_medias_from_all(struct call_monologue *ml) { + for (int i = 0; i < ml->medias->len; i++) + { + struct call_media * media = ml->medias->pdata[i]; + if (!media) + continue; + + for (GList * subcription = media->media_subscriptions.head; subcription; ) + { + GList *next = subcription->next; + __unsubscribe_media_link(media, subcription); + subcription = next; + } + } +} +static void __add_media_subscription(struct call_media * which, struct call_media * to, + const struct sink_attrs *attrs) +{ + if (g_hash_table_lookup(which->media_subscriptions_ht, to)) { + ilog(LOG_DEBUG, "Media with monologue tag '" STR_FORMAT_M "' (index: %d) is already subscribed" + " to media with monologue tag '" STR_FORMAT_M "' (index: %d)", + STR_FMT_M(&which->monologue->tag), which->index, + STR_FMT_M(&to->monologue->tag), to->index); + return; + } + + ilog(LOG_DEBUG, "Subscribing media with monologue tag '" STR_FORMAT_M "' (index: %d) " + "to media with monologue tag '" STR_FORMAT_M "' (index: %d)", + STR_FMT_M(&which->monologue->tag), which->index, + STR_FMT_M(&to->monologue->tag), to->index); + + struct media_subscription *which_ms = g_slice_alloc0(sizeof(*which_ms)); + struct media_subscription *to_rev_ms = g_slice_alloc0(sizeof(*to_rev_ms)); + + which_ms->media = to; + to_rev_ms->media = which; + + which_ms->monologue = to->monologue; + to_rev_ms->monologue = which->monologue; + + /* preserve attributes if they were present previously */ + if (attrs) { + which_ms->attrs = * attrs; + to_rev_ms->attrs = * attrs; + } + + /* keep offer-answer subscriptions first in the list */ + if (!attrs || !attrs->offer_answer) { + g_queue_push_tail(&which->media_subscriptions, which_ms); + g_queue_push_tail(&to->media_subscribers, to_rev_ms); + which_ms->link = to->media_subscribers.tail; + to_rev_ms->link = which->media_subscriptions.tail; + } else { + g_queue_push_head(&which->media_subscriptions, which_ms); + g_queue_push_head(&to->media_subscribers, to_rev_ms); + which_ms->link = to->media_subscribers.head; + to_rev_ms->link = which->media_subscriptions.head; + } + + g_hash_table_insert(which->media_subscriptions_ht, to, to_rev_ms->link); + g_hash_table_insert(to->media_subscribers_ht, which, which_ms->link); +} void __add_subscription(struct call_monologue *which, struct call_monologue *to, unsigned int offset, const struct sink_attrs *attrs) { @@ -3017,6 +3142,40 @@ void __add_subscription(struct call_monologue *which, struct call_monologue *to, g_hash_table_insert(which->subscriptions_ht, to, to_rev_cs->link); g_hash_table_insert(to->subscribers_ht, which, which_cs->link); } +/** + * Subscribe medias to each other. + */ +static void __subscribe_medias_both_ways(struct call_media * a, struct call_media * b) +{ + if (!a || !b) + return; + + /* retrieve previous subscriptions to retain attributes */ + struct media_subscription *a_ms = call_get_media_subscription(a->media_subscriptions_ht, b); + struct media_subscription *b_ms = call_get_media_subscription(b->media_subscriptions_ht, a); + + /* copy out attributes */ + struct sink_attrs a_attrs = {0,}; + struct sink_attrs b_attrs = {0,}; + + if (a_ms) + a_attrs = a_ms->attrs; + if (b_ms) + b_attrs = b_ms->attrs; + + /* override/reset some attributes */ + a_attrs.offer_answer = b_attrs.offer_answer = true; + a_attrs.egress = b_attrs.egress = false; + a_attrs.rtcp_only = b_attrs.rtcp_only = false; + + /* release existing subscriptions both ways */ + __unsubscribe_all_offer_answer_medias(a); + __unsubscribe_all_offer_answer_medias(b); + + /* (re)create, preserving existing attributes if there have been any */ + __add_media_subscription(a, b, &a_attrs); + __add_media_subscription(b, a, &b_attrs); +} static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct call_monologue *b) { // retrieve previous subscriptions to retain attributes struct call_subscription *a_cs = call_get_call_subscription(a->subscriptions_ht, b); @@ -3040,6 +3199,52 @@ static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct __add_subscription(b, a, 0, &b_attrs); } +/** + * Subscribe media lines to each other respecting the given order in the SDP offer/answer. + * If there are `media_id` (mid) presented, then use a mid ordering instead. + */ +static void __subscribe_matched_medias(struct call_monologue * a_ml, struct call_monologue * b_ml) +{ + GPtrArray * a_medias = a_ml->medias; + GPtrArray * b_medias = b_ml->medias; + + /* A properly formed answer SDP has the same number of m= lines as the offer SDP, + * and in the same order. Media types must match up. */ + if (a_medias->len != b_medias->len) { + ilog(LOG_WARN, "Non-matching amount of media sections in monologues, cannot subscribe them!"); + return; + } + + for (int i = 0; i < a_medias->len; i++) + { + struct call_media * a_media = a_medias->pdata[i]; + struct call_media * b_media; + + if (!a_media) + continue; + + /* first try matching based on media_id */ + if (a_media->media_id.s) { + b_media = g_hash_table_lookup(b_ml->media_ids, &a_media->media_id); + if (b_media) { + __subscribe_medias_both_ways(a_media, b_media); + continue; /* we found a matched one, go ahead to another one */ + } + } + + /* then a matching based on usual ordering */ + b_media = b_medias->pdata[i]; + if (!b_media) + continue; + + if (a_media->type_id != b_media->type_id) { + ilog(LOG_WARN, "Wrong ordering of media sections in monologues, skip the '%d' media section.", i); + continue; + } + __subscribe_medias_both_ways(a_media, b_media); + } +} + // return subscription objects, valid only immediately after __subscribe_offer_answer_both_ways static void __offer_answer_get_subscriptions(struct call_monologue *a, struct call_monologue *b, struct call_subscription *rets[2]) @@ -3048,8 +3253,15 @@ static void __offer_answer_get_subscriptions(struct call_monologue *a, struct ca rets[1] = a->subscribers.head->data; } - - +/** + * Retrieve exsisting media subscriptions for a call monologue. + */ +struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm) { + GList * l = g_hash_table_lookup(ht, cm); + if (!l) + return NULL; + return l->data; +} struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml) { GList *l = g_hash_table_lookup(ht, ml); if (!l) @@ -3057,8 +3269,6 @@ struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call return l->data; } - - /* called with call->master_lock held in W */ __attribute__((nonnull(1, 2, 3))) int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_flags *flags) { @@ -4179,6 +4389,75 @@ static bool call_monologues_associations_left(struct call * c) { return false; } +/** + * Check whether given totag is already subscribed to the given monologue medias. + * Returns: true - subscribed, false - not subscribed. + */ +static bool call_totag_subscribed_to_monologue(const str * totag, const struct call_monologue * monologue) +{ + if (!totag && !totag->s) + return false; + + for (int i = 0; i < monologue->medias->len; i++) + { + struct call_media * media = monologue->medias->pdata[i]; + if (!media) + continue; + + for (GList * subscriber = media->media_subscribers.head; + subscriber; + subscriber = subscriber->next) + { + struct media_subscription * ms = subscriber->data; + if (!ms->attrs.offer_answer) /* is this really needed? */ + continue; + + struct call_monologue * subscriber_monologue = ms->monologue; + if (!str_cmp_str(&subscriber_monologue->tag, totag)) /* subscriber found */ + return true; + } + } + return false; +} +/** + * Check whether given viabranch is intact with a monologue, which owns + * existing media subscribriptions to it. + * + * It tags a monologue, media of which is subscribed to given monologue, using given viabranch, + * in case previous other side hasn't been tagged with the via-branch + * + * Returns: true - intact, false - not intact. + */ +static bool call_viabranch_intact_monologue(const str * viabranch, struct call_monologue * monologue) +{ + for (int i = 0; i < monologue->medias->len; i++) + { + struct call_media * media = monologue->medias->pdata[i]; + if (!media) + continue; + + for (GList * subscriber = media->media_subscribers.head; + subscriber; + subscriber = subscriber->next) + { + struct media_subscription * ms = subscriber->data; + struct call_monologue * subscriber_monologue = ms->monologue; + + /* check the viabranch. if it's not known, then this is a branched offer and we need + * to create a new "other side" for this branch. */ + if (!subscriber_monologue->viabranch.s) { + /* previous "other side" hasn't been tagged with the via-branch, so we'll just + * use this one and tag it */ + __monologue_viabranch(subscriber_monologue, viabranch); + return true; + } + if (!str_cmp_str(&subscriber_monologue->viabranch, viabranch)) + return true; /* dialogue still intact */ + } + } + return false; +} + /** * Based on given From-tag create a new monologue for this dialog, * if given tag wasn't present in 'tags' of this call. diff --git a/include/call.h b/include/call.h index 25c47aed1..ff6c72039 100644 --- a/include/call.h +++ b/include/call.h @@ -722,6 +722,7 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); void __add_subscription(struct call_monologue *ml, struct call_monologue *other, unsigned int media_offset, const struct sink_attrs *); +struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm); struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml); void free_sink_handler(void *); void __add_sink_handler(GQueue *, struct packet_stream *, const struct sink_attrs *);