From 6a792f2bdf77d1db59f5d2191fa37c0f5c4faea4 Mon Sep 17 00:00:00 2001 From: Donat Zenichev Date: Sat, 11 Nov 2023 22:45:01 +0100 Subject: [PATCH] MT#57550 Demount `call_subscription` concept From now on the `call_subscription` concept gets deprecated, and instead of it the `media_subscriptions` concept gets applied. Benefits of this change is: - ability to subscribe one-to-multiple medias (different monologues) - media level manipulations, without affecting whole SDP session - no need to use medias offset, to detect proper subscription's media - there is no need of particular medias order, they can be subscribed to each other in any possible way (even though RFC still requires to always have proper ordering) Deprecated objects: - `struct call_subscription` - `GQueue subscriptions` - `GQueue subscribers` - `GHashTable * subscriptions_ht` - `GHashTable * subscribers_ht` Deprecated functionality: - `__unsubscribe_one()` - `__unsubscribe_all_offer_answer_subscribers()` - `__unsubscribe_from_all()` - `__subscribe_offer_answer_both_ways()` - `__add_subscription()` - `__unsubscribe_one_link()` - `call_get_call_subscription()` - `call_subscriptions_clear()` - `call_subscriptions_free()` Offtopic: additionally this commit adds helper func: - `call_media_subscribed_to_monologue()` Change-Id: Ifb44f7a1ba5b483b1472882b1b8d06444dba1727 --- daemon/call.c | 200 +++++++++------------------------------ daemon/call_interfaces.c | 72 ++++++++------ include/call.h | 29 +----- include/codec.h | 1 - 4 files changed, 90 insertions(+), 212 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index bbb73f85b..1c5d99cfc 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1172,7 +1172,7 @@ void free_sink_handler(void *p) { } /** - * A transfer of flags from the subscription (call_subscription) to the sink handlers (sink_handler) is done + * A transfer of flags from the subscription to the sink handlers (sink_handler) is done * using the __init_streams() through __add_sink_handler(). */ void __add_sink_handler(GQueue *q, struct packet_stream *sink, const struct sink_attrs *attrs) { @@ -2984,11 +2984,6 @@ error_intf: return ERROR_NO_FREE_LOGS; } - -void call_subscriptions_clear(GQueue *q) { - g_queue_clear_full(q, call_subscription_free); -} - void media_subscriptions_clear(GQueue *q) { g_queue_clear_full(q, media_subscription_free); } @@ -3013,20 +3008,6 @@ static void __unsubscribe_media_link(struct call_media * which, GList * which_cm g_slice_free1(sizeof(*ms), ms); g_slice_free1(sizeof(*rev_ms), rev_ms); } -static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs_link) { - struct call_subscription *cs = which_cs_link->data; - struct call_subscription *rev_cs = cs->link->data; - struct call_monologue *from = cs->monologue; - ilog(LOG_DEBUG, "Unsubscribing '" STR_FORMAT_M "' from '" STR_FORMAT_M "'", - STR_FMT_M(&which->tag), - STR_FMT_M(&from->tag)); - g_queue_delete_link(&from->subscribers, cs->link); - g_queue_delete_link(&which->subscriptions, which_cs_link); - g_hash_table_remove(which->subscriptions_ht, cs->monologue); - g_hash_table_remove(from->subscribers_ht, rev_cs->monologue); - g_slice_free1(sizeof(*cs), cs); - g_slice_free1(sizeof(*rev_cs), rev_cs); -} /** * Unsubscribe one particular media subscriber from this call media. */ @@ -3047,31 +3028,6 @@ static bool __unsubscribe_media(struct call_media * which, struct call_media * f __unsubscribe_media_link(which, l); return true; } -static bool __unsubscribe_one(struct call_monologue *which, struct call_monologue *from) { - GList *l = g_hash_table_lookup(which->subscriptions_ht, from); - if (!l) { - ilog(LOG_DEBUG, "Tag '" STR_FORMAT_M "' is not subscribed to '" STR_FORMAT_M "'", - STR_FMT_M(&which->tag), - STR_FMT_M(&from->tag)); - return false; - } - __unsubscribe_one_link(which, l); - return true; -} -static void __unsubscribe_all_offer_answer_subscribers(struct call_monologue *ml) { - for (GList *l = ml->subscribers.head; l; ) { - struct call_subscription *cs = l->data; - if (!cs->attrs.offer_answer) { - l = l->next; - continue; - } - GList *next = l->next; - struct call_monologue *other_ml = cs->monologue; - __unsubscribe_one(other_ml, ml); - __unsubscribe_one(ml, other_ml); - l = next; - } -} /** * Deletes all offer/answer media subscriptions. */ @@ -3093,13 +3049,6 @@ static void __unsubscribe_all_offer_answer_medias(struct call_media * cm) { l = next; } } -static void __unsubscribe_from_all(struct call_monologue *ml) { - for (GList *l = ml->subscriptions.head; l; ) { - GList *next = l->next; - __unsubscribe_one_link(ml, l); - l = next; - } -} static void __unsubscribe_medias_from_all(struct call_monologue *ml) { for (int i = 0; i < ml->medias->len; i++) { @@ -3190,45 +3139,6 @@ void __add_media_subscription(struct call_media * which, struct call_media * to, g_hash_table_insert(which->media_subscriptions_ht, to, to_rev_ms->link); g_hash_table_insert(to->media_subscribers_ht, which, which_ms->link); } -void __add_subscription(struct call_monologue *which, struct call_monologue *to, - unsigned int offset, const struct sink_attrs *attrs) -{ - if (g_hash_table_lookup(which->subscriptions_ht, to)) { - ilog(LOG_DEBUG, "Tag '" STR_FORMAT_M "' is already subscribed to '" STR_FORMAT_M "'", - STR_FMT_M(&which->tag), - STR_FMT_M(&to->tag)); - return; - } - ilog(LOG_DEBUG, "Subscribing '" STR_FORMAT_M "' to '" STR_FORMAT_M "'", - STR_FMT_M(&which->tag), - STR_FMT_M(&to->tag)); - struct call_subscription *which_cs = g_slice_alloc0(sizeof(*which_cs)); - struct call_subscription *to_rev_cs = g_slice_alloc0(sizeof(*to_rev_cs)); - which_cs->monologue = to; - to_rev_cs->monologue = which; - which_cs->media_offset = offset; - to_rev_cs->media_offset = offset; - // preserve attributes if they were present previously - if (attrs) { - which_cs->attrs = *attrs; - to_rev_cs->attrs = *attrs; - } - // keep offer-answer subscriptions first in the list - if (!attrs || !attrs->offer_answer) { - g_queue_push_tail(&which->subscriptions, which_cs); - g_queue_push_tail(&to->subscribers, to_rev_cs); - which_cs->link = to->subscribers.tail; - to_rev_cs->link = which->subscriptions.tail; - } - else { - g_queue_push_head(&which->subscriptions, which_cs); - g_queue_push_head(&to->subscribers, to_rev_cs); - which_cs->link = to->subscribers.head; - to_rev_cs->link = which->subscriptions.head; - } - g_hash_table_insert(which->subscriptions_ht, to, to_rev_cs->link); - g_hash_table_insert(to->subscribers_ht, which, which_cs->link); -} /** * Subscribe medias to each other. */ @@ -3263,28 +3173,6 @@ static void __subscribe_medias_both_ways(struct call_media * a, struct call_medi __add_media_subscription(a, b, &a_attrs); __add_media_subscription(b, a, &b_attrs); } -static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct call_monologue *b) { - // retrieve previous subscriptions to retain attributes - struct call_subscription *a_cs = call_get_call_subscription(a->subscriptions_ht, b); - struct call_subscription *b_cs = call_get_call_subscription(b->subscriptions_ht, a); - // copy out attributes - struct sink_attrs a_attrs = {0,}; - struct sink_attrs b_attrs = {0,}; - if (a_cs) - a_attrs = a_cs->attrs; - if (b_cs) - b_attrs = b_cs->attrs; - // override/reset some attributes - a_attrs.offer_answer = b_attrs.offer_answer = true; - a_attrs.egress = b_attrs.egress = false; - a_attrs.rtcp_only = b_attrs.rtcp_only = false; - // delete existing subscriptions - __unsubscribe_all_offer_answer_subscribers(a); - __unsubscribe_all_offer_answer_subscribers(b); - // (re)create, preserving existing attributes if there were any - __add_subscription(a, b, 0, &a_attrs); - __add_subscription(b, a, 0, &b_attrs); -} /** * Subscribe media lines to each other respecting the given order in the SDP offer/answer. @@ -3341,12 +3229,6 @@ struct media_subscription *call_get_media_subscription(GHashTable *ht, struct ca return NULL; return l->data; } -struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml) { - GList *l = g_hash_table_lookup(ht, ml); - if (!l) - return NULL; - return l->data; -} /* called with call->master_lock held in W */ __attribute__((nonnull(1, 2, 3))) @@ -3617,32 +3499,31 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, struct sdp_ng_flag /* called with call->master_lock held in W */ __attribute__((nonnull(1, 2))) int monologue_unsubscribe(struct call_monologue *dst_ml, struct sdp_ng_flags *flags) { - for (GList *l = dst_ml->subscriptions.head; l; ) { - GList *next = l->next; - struct call_subscription *cs = l->data; - struct call_monologue *src_ml = cs->monologue; + for (unsigned int i = 0; i < dst_ml->medias->len; i++) + { + struct call_media *media = dst_ml->medias->pdata[i]; + if (!media) + continue; - __unsubscribe_one_link(dst_ml, l); + __media_unconfirm(media, "media unsubscribe"); + __update_init_subscribers(media, NULL, NULL, flags->opmode); - for (unsigned int i = 0; i < dst_ml->medias->len; i++) + /* TODO: should we care about subscribers as well? */ + for (GList *l = media->media_subscriptions.head; l; ) { - struct call_media *media = dst_ml->medias->pdata[i]; - if (!media) - continue; - __update_init_subscribers(media, NULL, NULL, flags->opmode); - } - for (unsigned int i = 0; i < src_ml->medias->len; i++) - { - struct call_media *media = src_ml->medias->pdata[i]; - if (!media) + GList *next = l->next; + struct media_subscription * ms = l->data; + struct call_media * src_media = ms->media; + + if (!src_media) continue; - __update_init_subscribers(media, NULL, NULL, flags->opmode); - } - dialogue_unconfirm(src_ml, "monologue unsubscribe"); - dialogue_unconfirm(dst_ml, "monologue unsubscribe"); + __media_unconfirm(src_media, "media unsubscribe"); + __update_init_subscribers(src_media, NULL, NULL, flags->opmode); + __unsubscribe_media_link(media, l); - l = next; + l = next; + } } return 0; @@ -4038,10 +3919,6 @@ void call_media_free(struct call_media **mdp) { *mdp = NULL; } -void call_subscription_free(void *p) { - g_slice_free1(sizeof(struct call_subscription), p); -} - void __monologue_free(struct call_monologue *m) { g_ptr_array_free(m->medias, true); g_hash_table_destroy(m->associated_tags); @@ -4053,10 +3930,6 @@ void __monologue_free(struct call_monologue *m) { sdp_free(&m->last_in_sdp_parsed); g_queue_clear_full(&m->sdp_attributes, free); sdp_streams_free(&m->last_in_sdp_streams); - g_hash_table_destroy(m->subscribers_ht); - g_hash_table_destroy(m->subscriptions_ht); - g_queue_clear_full(&m->subscribers, call_subscription_free); - g_queue_clear_full(&m->subscriptions, call_subscription_free); g_slice_free1(sizeof(*m), m); } @@ -4264,8 +4137,6 @@ struct call_monologue *__monologue_create(struct call *call) { ret->medias = g_ptr_array_new(); ret->media_ids = g_hash_table_new(str_hash, str_equal); ret->ssrc_hash = create_ssrc_hash_call(); - ret->subscribers_ht = g_hash_table_new(g_direct_hash, g_direct_equal); - ret->subscriptions_ht = g_hash_table_new(g_direct_hash, g_direct_equal); gettimeofday(&ret->started, NULL); @@ -4540,6 +4411,29 @@ static bool call_monologues_associations_left(struct call * c) { return false; } +/** + * Check whether given media is subscribed to any of given monologue medias. + * Returns: media subscription or NULL. + */ +struct media_subscription * call_media_subscribed_to_monologue(const struct call_media * media, + const struct call_monologue * monologue) +{ + if (!media || !monologue) + return false; + + for (int i = 0; i < monologue->medias->len; i++) + { + struct call_media * sub_media = monologue->medias->pdata[i]; + if (!sub_media) + continue; + + GList * l = g_hash_table_lookup(sub_media->media_subscribers_ht, media); + if (l) + return l->data; /* found */ + } + return NULL; +} + /** * Check whether given totag is already subscribed to the given monologue medias. * Returns: true - subscribed, false - not subscribed. @@ -4668,9 +4562,6 @@ static int call_get_monologue_new(struct call_monologue *monologues[2], struct c /* if monologue doesn't exist, then nothing to subscribe yet */ if (!monologue) goto new_branch; - - __subscribe_offer_answer_both_ways(ret, monologue); /* TODO: deprecate */ - /* susbcribe existing medias */ __subscribe_matched_medias(ret, monologue); } @@ -4688,8 +4579,6 @@ static int call_get_monologue_new(struct call_monologue *monologues[2], struct c if (os) { /* previously seen branch. use it */ __monologue_unconfirm(os, "dialogue/branch association changed"); - __subscribe_offer_answer_both_ways(ret, os); /* TODO: deprecate */ - /* susbcribe medias to medias */ __subscribe_matched_medias(ret, os); goto monologues_intact; @@ -4701,9 +4590,7 @@ static int call_get_monologue_new(struct call_monologue *monologues[2], struct c new_branch: __C_DBG("create new \"other side\" monologue for viabranch "STR_FORMAT, STR_FMT0(viabranch)); os = __monologue_create(call); - __subscribe_offer_answer_both_ways(ret, os); /* TODO: deprecate */ __monologue_viabranch(os, viabranch); - /* susbcribe medias to medias */ __subscribe_matched_medias(ret, os); @@ -4821,7 +4708,6 @@ tag_setup: dialogue_unconfirm(ft, "dialogue signalling event"); dialogue_unconfirm(tt, "dialogue signalling event"); - __subscribe_offer_answer_both_ways(ft, tt); /* susbcribe medias to medias */ __subscribe_matched_medias(ft, tt); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index c841400b6..9d73bbb6f 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -3101,22 +3101,37 @@ static const char *call_block_silence_media(bencode_item_t *input, bool on_off, } if (sinks.length) { for (GList *l = sinks.head; l; l = l->next) { - struct call_monologue *sink = l->data; - GList *link = g_hash_table_lookup(monologue->subscribers_ht, sink); - if (!link) { - ilog(LOG_WARN, "Media flow '" STR_FORMAT_M "' -> '" STR_FORMAT_M "' doesn't " - "exist for media %s (to-tag not subscribed)", - STR_FMT_M(&monologue->tag), STR_FMT_M(&flags.to_tag), - lcase_verb); - return "Media flow not found (to-tag not subscribed)"; - } - struct call_subscription *cs = link->data; + struct call_monologue *sink_ml = l->data; - ilog(LOG_INFO, "%s directional media flow " - "(tag '" STR_FORMAT_M "' -> '" STR_FORMAT_M "')", - ucase_verb, - STR_FMT_M(&monologue->tag), STR_FMT_M(&sink->tag)); - G_STRUCT_MEMBER(bool, &cs->attrs, attr_offset) = on_off; + /* check if at least one sink_ml's media is subscribed + * to any of monologue medias. */ + for (unsigned int i = 0; i < sink_ml->medias->len; i++) + { + struct call_media *media = sink_ml->medias->pdata[i]; + if (!media) + continue; + + struct media_subscription * ms = call_media_subscribed_to_monologue(media, monologue); + if (!ms) { + if (l->next) + continue; /* check other medias */ + + /* no one of sink ml medias is subscribed to monologue medias */ + ilog(LOG_WARN, "Media flow '" STR_FORMAT_M "' -> '" STR_FORMAT_M "' doesn't " + "exist for media %s (to-tag not subscribed)", + STR_FMT_M(&monologue->tag), STR_FMT_M(&flags.to_tag), + lcase_verb); + return "Media flow not found (to-tag not subscribed)"; + + } else { + ilog(LOG_INFO, "%s directional media flow " + "(tag '" STR_FORMAT_M "' -> '" STR_FORMAT_M "')", + ucase_verb, + STR_FMT_M(&monologue->tag), STR_FMT_M(&sink_ml->tag)); + G_STRUCT_MEMBER(bool, &ms->attrs, attr_offset) = on_off; + break; /* now check other sink mls */ + } + } } update_init_subscribers(monologue, OP_OTHER); } @@ -3347,23 +3362,26 @@ found: ML_SET(monologue, DTMF_INJECTION_ACTIVE); dialogue_unconfirm(monologue, "DTMF playback"); - for (GList *k = monologue->subscribers.head; k; k = k->next) { - struct call_subscription *cs = k->data; - struct call_monologue *dialogue = cs->monologue; - struct call_media *sink = NULL; - for (unsigned int i = 0; i < dialogue->medias->len; i++) { - sink = dialogue->medias->pdata[i]; - if (!sink) - continue; - if (sink->type_id != MT_AUDIO) + for (unsigned int i = 0; i < monologue->medias->len; i++) + { + struct call_media *ml_media = monologue->medias->pdata[i]; + if (!ml_media) + continue; + + struct call_media * ms_media_sink = NULL; + + for (GList *ll = ml_media->media_subscribers.head; ll; ll = ll->next) + { + struct media_subscription * ms = ll->data; + ms_media_sink = ms->media; + if (!ms_media_sink || ms_media_sink->type_id != MT_AUDIO) continue; goto found_sink; } - return "Sink monologue has no media capable of DTMF playback"; - + return "There is no sink media capable of DTMF playback"; found_sink: - err = dtmf_inject(media, code, flags.volume, flags.duration, flags.pause, sink); + err = dtmf_inject(media, code, flags.volume, flags.duration, flags.pause, ms_media_sink); if (err) return err; } diff --git a/include/call.h b/include/call.h index cd56758de..66e451e2e 100644 --- a/include/call.h +++ b/include/call.h @@ -497,25 +497,6 @@ struct call_media { volatile unsigned int media_flags; }; -/** - * Link between subscribers and subscriptions. - * - * Contain flags and attributes, which can be used - * to mark a subscription (for example, as an egress subscription). - * - * During signalling events, the list of subscriptions for each call_monologue - * is used to create the list of rtp_sink and rtcp_sink given in each packet_stream. - * - * Each entry in these lists is a sink_handler object, which again contains flags and attributes. - * Flags from a call_subscription are copied into the sink_handler. - */ -struct call_subscription { - struct call_monologue *monologue; - GList *link; // link into the corresponding opposite list - unsigned int media_offset; // 0 if media indexes match up - struct sink_attrs attrs; -}; - struct media_subscription { struct call_media * media; /* media itself */ struct call_monologue * monologue; /* whom media belongs to */ @@ -549,9 +530,6 @@ struct call_monologue { sockfamily_t *desired_family; const struct logical_intf *logical_intf; GHashTable *associated_tags; - GQueue subscriptions; /* who am I subscribed to (sources) */ - GHashTable *subscriptions_ht; /* for quick lookup */ - GQueue subscribers; /* who is subscribed to me (sinks) */ GHashTable *subscribers_ht; /* for quick lookup */ GPtrArray *medias; GHashTable *media_ids; @@ -738,17 +716,14 @@ void __monologue_free(struct call_monologue *m); void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); -void __add_subscription(struct call_monologue *ml, struct call_monologue *other, - unsigned int media_offset, const struct sink_attrs *); void __add_media_subscription(struct call_media * which, struct call_media * to, const struct sink_attrs *attrs); struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm); -struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml); +struct media_subscription * call_media_subscribed_to_monologue(const struct call_media * media, + const struct call_monologue * monologue); void free_sink_handler(void *); void __add_sink_handler(GQueue *, struct packet_stream *, const struct sink_attrs *); -void call_subscription_free(void *); -void call_subscriptions_clear(GQueue *q); void media_subscription_free(void *); void media_subscriptions_clear(GQueue *q); diff --git a/include/codec.h b/include/codec.h index 04770f87f..996a745cf 100644 --- a/include/codec.h +++ b/include/codec.h @@ -28,7 +28,6 @@ struct codec_store; struct call_monologue; struct delay_buffer; struct sink_handler; -struct call_subscription; typedef int codec_handler_func(struct codec_handler *, struct media_packet *);