From ece5baa110fa4ad37e521b2abde0cbde095ef8a5 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 19 Dec 2024 15:10:02 -0400 Subject: [PATCH] MT#61368 add call_merge() This function merges two distinct call objects into one. All contained objects are moved, and the "source" call is then destroyed. Both call IDs can then be used to refer to the same internal call objects. Call ID aliases are kept in a list in the call object. Change-Id: I8a37775fe0dc3e7ccfeb83e2a3b7d751601450fc --- daemon/call.c | 121 +++++++++++++++++++++++++++++++++++++++++++++++++ include/call.h | 3 ++ 2 files changed, 124 insertions(+) diff --git a/daemon/call.c b/daemon/call.c index 05bee2942..6410f555d 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -3887,6 +3887,11 @@ void call_destroy(call_t *c) { rwlock_lock_w(&rtpe_callhash_lock); bool removed = __remove_call_id_from_hash(&c->callid, c); + for (auto_iter(l, c->callid_aliases.head); l; l = l->next) { + __auto_type alias = l->data; + if (__remove_call_id_from_hash(alias, c)) + obj_put(c); + } rwlock_unlock_w(&rtpe_callhash_lock); // if call not found in callhash => previously deleted @@ -4192,6 +4197,7 @@ static void __call_free(void *p) { t_hash_table_destroy(c->tags); t_hash_table_destroy(c->viabranches); t_hash_table_destroy(c->labels); + t_queue_clear(&c->callid_aliases); while (c->streams.head) { ps = t_queue_pop_head(&c->streams); @@ -4341,6 +4347,121 @@ call_t *call_get(const str *callid) { return ret; } +static gboolean fragment_move(str *key, fragment_q *q, void *c) { + call_t *call = c; + t_hash_table_insert(call->sdp_fragments, key, q); + return TRUE; +} + +// both calls must be locked and a reference held. call2 will be released and set to NULL upon return +bool call_merge(call_t *call, call_t **call2p) { + call_t *call2 = *call2p; + + // chcek for tag collisions: duplicate tags are a failure + for (auto_iter(l, call2->monologues.head); l; l = l->next) { + if (t_hash_table_lookup(call->tags, &l->data->tag)) + return false; + } + + ilog(LOG_DEBUG, "Merging call " STR_FORMAT_M " into " STR_FORMAT_M, + STR_FMT_M(&call2->callid), STR_FMT_M(&call->callid)); + + // move buffers + bencode_buffer_merge(&call->buffer, &call2->buffer); + + // move all contained objects: we have to renumber all unique IDs, and redirect any + // `call` pointers + + unsigned int last_id = call->monologues.head->data->unique_id; + while (call2->monologues.head) { + __auto_type ml = t_queue_pop_head(&call2->monologues); + ml->unique_id = ++last_id; + ml->call = call; + t_queue_push_tail(&call->monologues, ml); + t_hash_table_insert(call->tags, &ml->tag, ml); + for (auto_iter(l, ml->tag_aliases.head); l; l = l->next) + t_hash_table_insert(call->tags, l->data, ml); + if (ml->viabranch.len) + t_hash_table_insert(call->viabranches, &ml->viabranch, ml); + if (ml->label.len) + t_hash_table_insert(call->labels, &ml->label, ml); + } + + last_id = call->medias.head->data->unique_id; + while (call2->medias.head) { + __auto_type media = t_queue_pop_head(&call2->medias); + media->unique_id = ++last_id; + media->call = call; + t_queue_push_tail(&call->medias, media); + } + + t_hash_table_foreach_remove(call2->sdp_fragments, fragment_move, call); + + last_id = call->streams.head->data->unique_id; + while (call2->streams.head) { + __auto_type stream = t_queue_pop_head(&call2->streams); + stream->unique_id = ++last_id; + stream->call = call; + t_queue_push_tail(&call->streams, stream); + } + + last_id = call->stream_fds.head->data->unique_id; + while (call2->stream_fds.head) { + __auto_type sfd = t_queue_pop_head(&call2->stream_fds); + sfd->unique_id = ++last_id; + // call objects are held by reference here + if (sfd->call) { + obj_put(sfd->call); + sfd->call = obj_get(call); + } + t_queue_push_tail(&call->stream_fds, sfd); + } + + last_id = call->endpoint_maps.head->data->unique_id; + while (call2->endpoint_maps.head) { + __auto_type endpoint_map = t_queue_pop_head(&call2->endpoint_maps); + endpoint_map->unique_id = ++last_id; + t_queue_push_tail(&call->endpoint_maps, endpoint_map); + } + + // redirect hash table entry for old ID. store old ID in new call + + str *old_id = call_str_dup(&call2->callid); + t_queue_push_tail(&call->callid_aliases, old_id); + + rwlock_lock_w(&rtpe_callhash_lock); + + call_t *call_ht = NULL; + t_hash_table_steal_extended(rtpe_callhash, &call2->callid, NULL, &call_ht); + if (call_ht) { + if (call_ht != call2) { + // already deleted and replace by a different call + t_hash_table_insert(rtpe_callhash, &call_ht->callid, call_ht); + call_ht = NULL; + } + else { + // insert a new reference under the old call ID + t_hash_table_insert(rtpe_callhash, old_id, obj_get(call)); + RTPE_GAUGE_DEC(total_sessions); + } + } // else: already deleted + + rwlock_unlock_w(&rtpe_callhash_lock); + + if (call_ht) + obj_put(call_ht); + + __call_iterator_remove(call2); + mqtt_timer_stop(&call2->mqtt_timer); + __call_cleanup(call2); + + rwlock_unlock_w(&call2->master_lock); + obj_put(call2); + *call2p = NULL; + + return true; +} + /* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */ call_t *call_get_opmode(const str *callid, enum ng_opmode opmode) { if (opmode == OP_OFFER) diff --git a/include/call.h b/include/call.h index 2726be840..a3821b134 100644 --- a/include/call.h +++ b/include/call.h @@ -752,6 +752,7 @@ struct call { struct mqtt_timer *mqtt_timer; str callid; + str_q callid_aliases; struct timeval created; struct timeval destroyed; time_t last_signal; @@ -832,6 +833,8 @@ struct call_monologue *call_get_monologue(call_t *call, const str *fromtag); struct call_monologue *call_get_or_create_monologue(call_t *call, const str *fromtag); __attribute__((nonnull(1))) call_t *call_get(const str *callid); +__attribute__((nonnull(1, 2))) +bool call_merge(call_t *, call_t **); __attribute__((nonnull(2, 3))) int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q *streams, sdp_ng_flags *flags); __attribute__((nonnull(1, 2, 3, 4)))