From 7167237f5066cd520057408a72e9697cfcddfc62 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 30 Sep 2024 09:33:43 -0400 Subject: [PATCH] MT#55283 change scope of ICE fragment storage Instead of keeping one global hash/list of all received ICE fragments, indexed by string call ID and from-tag, move the hash/list of ICE fragments into the call object, now indxed only by the from-tag. This requires a call object to exist before an ICE fragment can be stored. Change the order of operations to always create a call object first, then parse the SDP, then check for ICE fragment processing. The determining factor now is only whether the monologue exists. Duplicate the raw SDP into the call's memory arena instead of onto the heap. ICE fragments now don't need to store the pre-parsed SDP any more, and the trickle ICE handling code is guaranteed to always have the fully parsed SDP information. Strings for fragment indexes can now be allocated from the call's memory arena. Obsolete the extra thread used to delete old ICE fragments. This is now done in-line with regular call timeout processing. Change-Id: I7acbc4c52666c4cdf1c02324bf33cf0bfcb4f0d0 --- daemon/call.c | 6 ++- daemon/call_interfaces.c | 98 +++++++++++++++++++---------------- daemon/ice.c | 109 ++++++--------------------------------- daemon/janus.c | 22 ++++---- daemon/main.c | 4 -- include/call.h | 6 +++ include/ice.h | 6 +-- t/test-stats.c | 3 -- 8 files changed, 96 insertions(+), 158 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 000afbba3..e117adfa2 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -176,6 +176,8 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { && rtpe_now.tv_sec - c->last_signal <= atomic_get_na(&rtpe_config.timeout)) goto out; + ice_fragments_cleanup(c->sdp_fragments, false); + for (__auto_type it = c->streams.head; it; it = it->next) { ps = it->data; @@ -4040,7 +4042,6 @@ void __monologue_free(struct call_monologue *m) { free_ssrc_hash(&m->ssrc_hash); if (m->last_out_sdp) g_string_free(m->last_out_sdp, TRUE); - str_free_dup(&m->last_in_sdp); if (m->session_sdp_orig) sdp_orig_free(m->session_sdp_orig); if (m->session_last_sdp_orig) @@ -4101,6 +4102,8 @@ static void __call_free(void *p) { } call_buffer_free(&c->buffer); + ice_fragments_cleanup(c->sdp_fragments, true); + t_hash_table_destroy(c->sdp_fragments); rwlock_destroy(&c->master_lock); assert(c->stream_fds.head == NULL); @@ -4122,6 +4125,7 @@ static call_t *call_create(const str *callid) { c->dtls_cert = dtls_cert(); c->tos = rtpe_config.default_tos; c->poller = rtpe_get_poller(); + c->sdp_fragments = fragments_ht_new(); if (rtpe_config.cpu_affinity) c->cpu_affinity = call_socket_cpu_affinity++ % rtpe_config.cpu_affinity; else diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 547016e9c..6a294a0ac 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -2104,9 +2104,7 @@ static enum load_limit_reasons call_offer_session_limit(void) { void save_last_sdp(struct call_monologue *ml, str *sdp, sdp_sessions_q *parsed, sdp_streams_q *streams) { - str_free_dup(&ml->last_in_sdp); ml->last_in_sdp = *sdp; - *sdp = STR_NULL; sdp_sessions_clear(&ml->last_in_sdp_parsed); ml->last_in_sdp_parsed = *parsed; @@ -2131,11 +2129,34 @@ static enum basic_errors call_ng_basic_checks(sdp_ng_flags *flags, enum call_opm return 0; } +static const char *call_offer_get_call(call_t **callp, sdp_ng_flags *flags) { + // are we allowed to create a call? use `errstr` to determine + const char *errstr = NULL; // creation is allowed + enum load_limit_reasons limit = call_offer_session_limit(); + if (limit != LOAD_LIMIT_NONE) { + if (!flags->supports_load_limit) + errstr = "Parallel session limit reached"; // legacy protocol + else + errstr = magic_load_limit_strings[limit]; + // errstr is set, creation not allowed + } + + if (!errstr) + *callp = call_get_or_create(&flags->call_id, false); + else { + *callp = call_get(&flags->call_id); + if (!*callp) + return errstr; + } + + return NULL; +} + static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode opmode, const char* addr, const endpoint_t *sin) { const char *errstr; - g_auto(str) sdp = STR_NULL; + str sdp = STR_NULL; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; g_auto(sdp_streams_q) streams = TYPED_GQUEUE_INIT; g_autoptr(call_t) call = NULL; @@ -2151,11 +2172,23 @@ static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode if ((ret = call_ng_basic_checks(&flags, opmode)) > 0) return _ng_basic_errors[ret]; - /* for answer: swap To against From tag */ - if (opmode == OP_ANSWER) + if (opmode == OP_OFFER) { + errstr = call_offer_get_call(&call, &flags); + if (errstr) + goto out; + } + else if (opmode == OP_ANSWER) { + call = call_get(&flags.call_id); + + errstr = "Unknown call-id"; + if (!call) + goto out; + + /* for answer: swap To against From tag */ str_swap(&flags.to_tag, &flags.from_tag); + } - sdp = str_dup_str(&flags.sdp); + sdp = call_str_cpy(&flags.sdp); errstr = "Failed to parse SDP"; if (sdp_parse(&sdp, &parsed, &flags)) @@ -2168,37 +2201,17 @@ static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode goto out; } - /* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */ - call = call_get(&flags.call_id); + errstr = "Incomplete SDP specification"; + if (sdp_streams(&parsed, &streams, &flags)) + goto out; // SDP fragments for trickle ICE must always operate on an existing call - if (opmode == OP_OFFER && trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) { + if (opmode == OP_OFFER && trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) { errstr = NULL; // SDP fragments for trickle ICE are consumed with no replacement returned goto out; } - if (opmode == OP_OFFER && !call) { - enum load_limit_reasons limit = call_offer_session_limit(); - if (limit != LOAD_LIMIT_NONE) { - if (!flags.supports_load_limit) - errstr = "Parallel session limit reached"; // legacy protocol - else - errstr = magic_load_limit_strings[limit]; - goto out; - } - - call = call_get_or_create(&flags.call_id, false); - } - - errstr = "Unknown call-id"; - if (!call) - goto out; - - errstr = "Incomplete SDP specification"; - if (sdp_streams(&parsed, &streams, &flags)) - goto out; - if (flags.debug) CALL_SET(call, DEBUG); @@ -3677,7 +3690,7 @@ const char *call_publish_ng(ng_command_ctx_t *ctx, g_auto(sdp_ng_flags) flags; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; g_auto(sdp_streams_q) streams = TYPED_GQUEUE_INIT; - g_auto(str) sdp_in = STR_NULL; + str sdp_in = STR_NULL; g_auto(str) sdp_out = STR_NULL; g_autoptr(call_t) call = NULL; int ret; @@ -3688,22 +3701,19 @@ const char *call_publish_ng(ng_command_ctx_t *ctx, if ((ret = call_ng_basic_checks(&flags, OP_PUBLISH)) > 0) return _ng_basic_errors[ret]; - sdp_in = str_dup_str(&flags.sdp); + call = call_get_or_create(&flags.call_id, false); + + sdp_in = call_str_cpy(&flags.sdp); if (sdp_parse(&sdp_in, &parsed, &flags)) return "Failed to parse SDP"; - call = call_get(&flags.call_id); - - if (trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) - return NULL; - - if (!call) - call = call_get_or_create(&flags.call_id, false); - if (sdp_streams(&parsed, &streams, &flags)) return "Incomplete SDP specification"; + if (trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) + return NULL; + updated_created_from(call, addr, sin); struct call_monologue *ml = call_get_or_create_monologue(call, &flags.from_tag); @@ -3858,7 +3868,10 @@ const char *call_subscribe_answer_ng(ng_command_ctx_t *ctx) { if (sdp_parse(&flags.sdp, &parsed, &flags)) return "Failed to parse SDP"; - if (trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) + if (sdp_streams(&parsed, &streams, &flags)) + return "Incomplete SDP specification"; + + if (trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) return NULL; if (!flags.to_tag.s) @@ -3871,9 +3884,6 @@ const char *call_subscribe_answer_ng(ng_command_ctx_t *ctx) { if (!dest_ml) return "To-tag not found"; - if (sdp_streams(&parsed, &streams, &flags)) - return "Incomplete SDP specification"; - int ret = monologue_subscribe_answer(dest_ml, &flags, &streams); if (ret) return "Failed to process subscription answer"; diff --git a/daemon/ice.c b/daemon/ice.c index 34739c98b..a546318a1 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -33,15 +33,10 @@ STR_FMT_M(&(p)->remote_candidate->foundation), \ (p)->remote_candidate->component_id -struct fragment_key { - str call_id; - str from_tag; -}; struct sdp_fragment { ng_buffer *ngbuf; struct timeval received; sdp_streams_q streams; - sdp_sessions_q sdp; sdp_ng_flags flags; }; @@ -88,35 +83,13 @@ const char * const ice_type_strings[] = { -static unsigned int frag_key_hash(const struct fragment_key *A); -static int frag_key_eq(const struct fragment_key *A, const struct fragment_key *B); -static void fragment_key_free(struct fragment_key *); - -TYPED_GQUEUE(fragment, struct sdp_fragment) -TYPED_GHASHTABLE(fragments_ht, struct fragment_key, fragment_q, frag_key_hash, frag_key_eq, - fragment_key_free, NULL) -TYPED_GHASHTABLE_LOOKUP_INSERT(fragments_ht, fragment_key_free, fragment_q_new) - -static mutex_t sdp_fragments_lock; -static fragments_ht sdp_fragments; +TYPED_GHASHTABLE_LOOKUP_INSERT(fragments_ht, NULL, fragment_q_new) -static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *streams, sdp_sessions_q *sdp, +static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *streams, sdp_ng_flags *flags) { - g_auto(sdp_streams_q) streams_local = TYPED_GQUEUE_INIT; - - if (!streams) - streams = &streams_local; - - if (!streams->head) { - if (sdp_streams(sdp, streams, flags)) { - ilogs(ice, LOG_WARN, "Incomplete SDP specification for tricle ICE"); - return; - } - } - for (__auto_type l = streams->head; l; l = l->next) { struct stream_params *sp = l->data; struct call_media *media = NULL; @@ -148,41 +121,19 @@ static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *s } -static unsigned int frag_key_hash(const struct fragment_key *a) { - return str_hash(&a->call_id) ^ str_hash(&a->from_tag); -} -static int frag_key_eq(const struct fragment_key *a, const struct fragment_key *b) { - return str_equal(&a->call_id, &b->call_id) - && str_equal(&a->from_tag, &b->from_tag); -} - static void fragment_free(struct sdp_fragment *frag) { sdp_streams_clear(&frag->streams); - sdp_sessions_clear(&frag->sdp); call_ng_free_flags(&frag->flags); obj_put(frag->ngbuf); g_slice_free1(sizeof(*frag), frag); } -static void fragment_key_free(struct fragment_key *k) { - g_free(k->call_id.s); - g_free(k->from_tag.s); - g_slice_free1(sizeof(*k), k); -} -static void queue_sdp_fragment(ng_buffer *ngbuf, sdp_streams_q *streams, sdp_sessions_q *sdp, sdp_ng_flags *flags) { +static void queue_sdp_fragment(ng_buffer *ngbuf, call_t *call, str *key, sdp_streams_q *streams, sdp_ng_flags *flags) { ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag)); - struct fragment_key *k = g_slice_alloc0(sizeof(*k)); - k->call_id = str_dup_str(&flags->call_id); - k->from_tag = str_dup_str(&flags->from_tag); - struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag)); frag->received = rtpe_now; frag->ngbuf = obj_get(ngbuf); - if (sdp) { - frag->sdp = *sdp; - t_queue_init(sdp); - } if (streams) { frag->streams = *streams; t_queue_init(streams); @@ -190,48 +141,36 @@ static void queue_sdp_fragment(ng_buffer *ngbuf, sdp_streams_q *streams, sdp_ses frag->flags = *flags; ZERO(*flags); - mutex_lock(&sdp_fragments_lock); - fragment_q *frags = fragments_ht_lookup_insert(sdp_fragments, k); + fragment_q *frags = fragments_ht_lookup_insert(call->sdp_fragments, call_str_dup(key)); t_queue_push_tail(frags, frag); - mutex_unlock(&sdp_fragments_lock); } bool trickle_ice_update(ng_buffer *ngbuf, call_t *call, sdp_ng_flags *flags, - sdp_streams_q *streams, sdp_sessions_q *sdp) + sdp_streams_q *streams) { if (!flags->fragment) return false; - if (!call) { - queue_sdp_fragment(ngbuf, streams, sdp, flags); - return true; - } struct call_monologue *ml = call_get_monologue(call, &flags->from_tag); if (!ml) { - queue_sdp_fragment(ngbuf, streams, sdp, flags); + queue_sdp_fragment(ngbuf, call, &flags->from_tag, streams, flags); return true; } - ice_update_media_streams(ml, streams, sdp, flags); + ice_update_media_streams(ml, streams, flags); return true; } #define MAX_FRAG_AGE 3000000 void dequeue_sdp_fragments(struct call_monologue *monologue) { - struct fragment_key k; - ZERO(k); - k.call_id = monologue->call->callid; - k.from_tag = monologue->tag; + call_t *call = monologue->call; fragment_q *frags = NULL; - { - LOCK(&sdp_fragments_lock); - t_hash_table_steal_extended(sdp_fragments, &k, NULL, &frags); - if (!frags) - return; + t_hash_table_steal_extended(call->sdp_fragments, &monologue->tag, NULL, &frags); + if (!frags) + return; - // we own the queue now - } + // we own the queue now struct sdp_fragment *frag; while ((frag = t_queue_pop_head(frags))) { @@ -239,9 +178,9 @@ void dequeue_sdp_fragments(struct call_monologue *monologue) { goto next; ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, - STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag)); + STR_FMT_M(&call->callid), STR_FMT_M(&monologue->tag)); - ice_update_media_streams(monologue, &frag->streams, &frag->sdp, &frag->flags); + ice_update_media_streams(monologue, &frag->streams, &frag->flags); next: fragment_free(frag); @@ -249,7 +188,7 @@ next: t_queue_free(frags); } -static gboolean fragment_check_cleanup(struct fragment_key *key, fragment_q *frags, void *p) { +static gboolean fragment_check_cleanup(str *key, fragment_q *frags, void *p) { bool all = GPOINTER_TO_INT(p); if (!key || !frags) return TRUE; @@ -266,10 +205,8 @@ static gboolean fragment_check_cleanup(struct fragment_key *key, fragment_q *fra } return FALSE; } -static void fragments_cleanup(bool all) { - mutex_lock(&sdp_fragments_lock); - t_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all)); - mutex_unlock(&sdp_fragments_lock); +void ice_fragments_cleanup(fragments_ht ht, bool all) { + t_hash_table_foreach_remove(ht, fragment_check_cleanup, GINT_TO_POINTER(all)); } @@ -764,22 +701,10 @@ static void __agent_deschedule(struct ice_agent *ag) { void ice_init(void) { random_string((void *) &tie_breaker, sizeof(tie_breaker)); timerthread_init(&ice_agents_timer_thread, 1, ice_agents_timer_run); - - sdp_fragments = fragments_ht_new(); - mutex_init(&sdp_fragments_lock); } void ice_free(void) { timerthread_free(&ice_agents_timer_thread); - - fragments_cleanup(true); - t_hash_table_destroy_ptr(&sdp_fragments); - mutex_destroy(&sdp_fragments_lock); -} - -enum thread_looper_action ice_slow_timer(void) { - fragments_cleanup(false); - return TLA_CONTINUE; } static void __fail_pair(struct ice_candidate_pair *pair) { diff --git a/daemon/janus.c b/daemon/janus.c index 1d78d8d02..0d838839f 100644 --- a/daemon/janus.c +++ b/daemon/janus.c @@ -851,7 +851,7 @@ static const char *janus_videoroom_configure(struct websocket_message *wm, struc if (strcmp(jsep_type, "offer")) return "Not an offer"; - g_auto(str) sdp_in = STR_DUP(jsep_sdp); + str sdp_in = call_str_cpy_c(jsep_sdp); g_auto(sdp_ng_flags) flags; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; @@ -956,7 +956,15 @@ static const char *janus_videoroom_start(struct websocket_message *wm, struct ja if (strcmp(jsep_type, "answer")) return "Not an answer"; - g_auto(str) sdp_in = STR_DUP(jsep_sdp); + struct janus_room *room = t_hash_table_lookup(janus_rooms, &room_id); + *retcode = 426; + if (!room) + return "No such room"; + g_autoptr(call_t) call = call_get(&room->call_id); + if (!call) + return "No such room"; + + str sdp_in = call_str_cpy_c(jsep_sdp); g_auto(sdp_ng_flags) flags; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; @@ -966,14 +974,6 @@ static const char *janus_videoroom_start(struct websocket_message *wm, struct ja if (sdp_parse(&sdp_in, &parsed, &flags)) return "Failed to parse SDP"; - struct janus_room *room = t_hash_table_lookup(janus_rooms, &room_id); - *retcode = 426; - if (!room) - return "No such room"; - g_autoptr(call_t) call = call_get(&room->call_id); - if (!call) - return "No such room"; - *retcode = 512; if (sdp_streams(&parsed, &streams, &flags)) return "Incomplete SDP specification"; @@ -1677,7 +1677,7 @@ static const char *janus_trickle(JsonReader *reader, struct janus_session *sessi bencode_strdup_str(&ngbuf->buffer, &sp->ice_ufrag, ufrag); // finally do the update - trickle_ice_update(ngbuf, call, &flags, &streams, NULL); + trickle_ice_update(ngbuf, call, &flags, &streams); return NULL; } diff --git a/daemon/main.c b/daemon/main.c index bc1f2de7a..b616790c0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1521,10 +1521,6 @@ int main(int argc, char **argv) { thread_create_looper(call_rate_stats_updater, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "call stats", 1000000); - /* separate thread for ice slow timer functionality */ - thread_create_looper(ice_slow_timer, rtpe_config.idle_scheduling, - rtpe_config.idle_priority, "ICE slow", 1000000); - /* thread to close expired call */ thread_create_looper(call_timer, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "kill calls", 1000000); diff --git a/include/call.h b/include/call.h index 65c40e0b1..0e5b6e789 100644 --- a/include/call.h +++ b/include/call.h @@ -634,6 +634,11 @@ TYPED_GQUEUE(monologues, struct call_monologue) G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(monologues_q, monologues_q_clear) TYPED_GHASHTABLE(tags_ht, str, struct call_monologue, str_hash, str_equal, NULL, NULL) +struct sdp_fragment; +TYPED_GQUEUE(fragment, struct sdp_fragment) +TYPED_GHASHTABLE(fragments_ht, str, fragment_q, str_hash, str_equal, NULL, NULL) + + struct call_iterator_list { call_list *first; mutex_t lock; // protects .first and every entry's .data @@ -738,6 +743,7 @@ struct call { tags_ht tags; tags_ht viabranches; labels_ht labels; + fragments_ht sdp_fragments; packet_stream_q streams; stream_fd_q stream_fds; /* stream_fd */ endpoint_map_q endpoint_maps; diff --git a/include/ice.h b/include/ice.h index c2e3a4fda..4903ca8d6 100644 --- a/include/ice.h +++ b/include/ice.h @@ -14,6 +14,7 @@ #include "socket.h" #include "timerthread.h" #include "types.h" +#include "call.h" #define MAX_COMPONENTS 2 #define TIMER_RUN_INTERVAL 20 /* ms */ @@ -167,9 +168,8 @@ int ice_response(stream_fd *, const endpoint_t *src, void dequeue_sdp_fragments(struct call_monologue *); bool trickle_ice_update(ng_buffer *ngbuf, call_t *call, sdp_ng_flags *flags, - sdp_streams_q *streams, sdp_sessions_q *sdp); - -enum thread_looper_action ice_slow_timer(void); + sdp_streams_q *streams); +void ice_fragments_cleanup(fragments_ht ht, bool all); #include "call.h" diff --git a/t/test-stats.c b/t/test-stats.c index 1f3cb6121..e9b05f2a8 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -3240,7 +3240,6 @@ int main(void) { call_timer(); stats_counters_calc_rate(rtpe_stats, 150000000, &rtpe_stats_intv, &rtpe_stats_rate); stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate); - ice_slow_timer(); RTPE_STATS_ADD(ng_commands[NGC_OFFER], 100); rtpe_now.tv_sec += 2; @@ -3249,7 +3248,6 @@ int main(void) { call_timer(); stats_counters_calc_rate(rtpe_stats, 2000000, &rtpe_stats_intv, &rtpe_stats_rate); stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate); - ice_slow_timer(); // timer run time interval increased rtpe_now.tv_sec += 5; @@ -3258,7 +3256,6 @@ int main(void) { call_timer(); stats_counters_calc_rate(rtpe_stats, 5000000, &rtpe_stats_intv, &rtpe_stats_rate); stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate); - ice_slow_timer(); graph_str = print_graphite_data(); assert_g_string_eq(graph_str,