Browse Source

MT#61368 add call_merge()

This function merges two distinct call objects into one. All contained
objects are moved, and the "source" call is then destroyed. Both call
IDs can then be used to refer to the same internal call objects. Call ID
aliases are kept in a list in the call object.

Change-Id: I8a37775fe0dc3e7ccfeb83e2a3b7d751601450fc
pull/1897/head
Richard Fuchs 1 year ago
parent
commit
ece5baa110
2 changed files with 124 additions and 0 deletions
  1. +121
    -0
      daemon/call.c
  2. +3
    -0
      include/call.h

+ 121
- 0
daemon/call.c View File

@ -3887,6 +3887,11 @@ void call_destroy(call_t *c) {
rwlock_lock_w(&rtpe_callhash_lock);
bool removed = __remove_call_id_from_hash(&c->callid, c);
for (auto_iter(l, c->callid_aliases.head); l; l = l->next) {
__auto_type alias = l->data;
if (__remove_call_id_from_hash(alias, c))
obj_put(c);
}
rwlock_unlock_w(&rtpe_callhash_lock);
// if call not found in callhash => previously deleted
@ -4192,6 +4197,7 @@ static void __call_free(void *p) {
t_hash_table_destroy(c->tags);
t_hash_table_destroy(c->viabranches);
t_hash_table_destroy(c->labels);
t_queue_clear(&c->callid_aliases);
while (c->streams.head) {
ps = t_queue_pop_head(&c->streams);
@ -4341,6 +4347,121 @@ call_t *call_get(const str *callid) {
return ret;
}
static gboolean fragment_move(str *key, fragment_q *q, void *c) {
call_t *call = c;
t_hash_table_insert(call->sdp_fragments, key, q);
return TRUE;
}
// both calls must be locked and a reference held. call2 will be released and set to NULL upon return
bool call_merge(call_t *call, call_t **call2p) {
call_t *call2 = *call2p;
// chcek for tag collisions: duplicate tags are a failure
for (auto_iter(l, call2->monologues.head); l; l = l->next) {
if (t_hash_table_lookup(call->tags, &l->data->tag))
return false;
}
ilog(LOG_DEBUG, "Merging call " STR_FORMAT_M " into " STR_FORMAT_M,
STR_FMT_M(&call2->callid), STR_FMT_M(&call->callid));
// move buffers
bencode_buffer_merge(&call->buffer, &call2->buffer);
// move all contained objects: we have to renumber all unique IDs, and redirect any
// `call` pointers
unsigned int last_id = call->monologues.head->data->unique_id;
while (call2->monologues.head) {
__auto_type ml = t_queue_pop_head(&call2->monologues);
ml->unique_id = ++last_id;
ml->call = call;
t_queue_push_tail(&call->monologues, ml);
t_hash_table_insert(call->tags, &ml->tag, ml);
for (auto_iter(l, ml->tag_aliases.head); l; l = l->next)
t_hash_table_insert(call->tags, l->data, ml);
if (ml->viabranch.len)
t_hash_table_insert(call->viabranches, &ml->viabranch, ml);
if (ml->label.len)
t_hash_table_insert(call->labels, &ml->label, ml);
}
last_id = call->medias.head->data->unique_id;
while (call2->medias.head) {
__auto_type media = t_queue_pop_head(&call2->medias);
media->unique_id = ++last_id;
media->call = call;
t_queue_push_tail(&call->medias, media);
}
t_hash_table_foreach_remove(call2->sdp_fragments, fragment_move, call);
last_id = call->streams.head->data->unique_id;
while (call2->streams.head) {
__auto_type stream = t_queue_pop_head(&call2->streams);
stream->unique_id = ++last_id;
stream->call = call;
t_queue_push_tail(&call->streams, stream);
}
last_id = call->stream_fds.head->data->unique_id;
while (call2->stream_fds.head) {
__auto_type sfd = t_queue_pop_head(&call2->stream_fds);
sfd->unique_id = ++last_id;
// call objects are held by reference here
if (sfd->call) {
obj_put(sfd->call);
sfd->call = obj_get(call);
}
t_queue_push_tail(&call->stream_fds, sfd);
}
last_id = call->endpoint_maps.head->data->unique_id;
while (call2->endpoint_maps.head) {
__auto_type endpoint_map = t_queue_pop_head(&call2->endpoint_maps);
endpoint_map->unique_id = ++last_id;
t_queue_push_tail(&call->endpoint_maps, endpoint_map);
}
// redirect hash table entry for old ID. store old ID in new call
str *old_id = call_str_dup(&call2->callid);
t_queue_push_tail(&call->callid_aliases, old_id);
rwlock_lock_w(&rtpe_callhash_lock);
call_t *call_ht = NULL;
t_hash_table_steal_extended(rtpe_callhash, &call2->callid, NULL, &call_ht);
if (call_ht) {
if (call_ht != call2) {
// already deleted and replace by a different call
t_hash_table_insert(rtpe_callhash, &call_ht->callid, call_ht);
call_ht = NULL;
}
else {
// insert a new reference under the old call ID
t_hash_table_insert(rtpe_callhash, old_id, obj_get(call));
RTPE_GAUGE_DEC(total_sessions);
}
} // else: already deleted
rwlock_unlock_w(&rtpe_callhash_lock);
if (call_ht)
obj_put(call_ht);
__call_iterator_remove(call2);
mqtt_timer_stop(&call2->mqtt_timer);
__call_cleanup(call2);
rwlock_unlock_w(&call2->master_lock);
obj_put(call2);
*call2p = NULL;
return true;
}
/* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */
call_t *call_get_opmode(const str *callid, enum ng_opmode opmode) {
if (opmode == OP_OFFER)


+ 3
- 0
include/call.h View File

@ -752,6 +752,7 @@ struct call {
struct mqtt_timer *mqtt_timer;
str callid;
str_q callid_aliases;
struct timeval created;
struct timeval destroyed;
time_t last_signal;
@ -832,6 +833,8 @@ struct call_monologue *call_get_monologue(call_t *call, const str *fromtag);
struct call_monologue *call_get_or_create_monologue(call_t *call, const str *fromtag);
__attribute__((nonnull(1)))
call_t *call_get(const str *callid);
__attribute__((nonnull(1, 2)))
bool call_merge(call_t *, call_t **);
__attribute__((nonnull(2, 3)))
int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q *streams, sdp_ng_flags *flags);
__attribute__((nonnull(1, 2, 3, 4)))


Loading…
Cancel
Save