Browse Source

MT#55283 change scope of ICE fragment storage

Instead of keeping one global hash/list of all received ICE fragments,
indexed by string call ID and from-tag, move the hash/list of ICE
fragments into the call object, now indxed only by the from-tag.

This requires a call object to exist before an ICE fragment can be
stored. Change the order of operations to always create a call object
first, then parse the SDP, then check for ICE fragment processing. The
determining factor now is only whether the monologue exists.

Duplicate the raw SDP into the call's memory arena instead of onto the
heap.

ICE fragments now don't need to store the pre-parsed SDP any more, and
the trickle ICE handling code is guaranteed to always have the fully
parsed SDP information.

Strings for fragment indexes can now be allocated from the call's memory
arena.

Obsolete the extra thread used to delete old ICE fragments. This is now
done in-line with regular call timeout processing.

Change-Id: I7acbc4c52666c4cdf1c02324bf33cf0bfcb4f0d0
pull/1870/head
Richard Fuchs 1 year ago
parent
commit
7167237f50
8 changed files with 96 additions and 158 deletions
  1. +5
    -1
      daemon/call.c
  2. +54
    -44
      daemon/call_interfaces.c
  3. +17
    -92
      daemon/ice.c
  4. +11
    -11
      daemon/janus.c
  5. +0
    -4
      daemon/main.c
  6. +6
    -0
      include/call.h
  7. +3
    -3
      include/ice.h
  8. +0
    -3
      t/test-stats.c

+ 5
- 1
daemon/call.c View File

@ -176,6 +176,8 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
&& rtpe_now.tv_sec - c->last_signal <= atomic_get_na(&rtpe_config.timeout))
goto out;
ice_fragments_cleanup(c->sdp_fragments, false);
for (__auto_type it = c->streams.head; it; it = it->next) {
ps = it->data;
@ -4040,7 +4042,6 @@ void __monologue_free(struct call_monologue *m) {
free_ssrc_hash(&m->ssrc_hash);
if (m->last_out_sdp)
g_string_free(m->last_out_sdp, TRUE);
str_free_dup(&m->last_in_sdp);
if (m->session_sdp_orig)
sdp_orig_free(m->session_sdp_orig);
if (m->session_last_sdp_orig)
@ -4101,6 +4102,8 @@ static void __call_free(void *p) {
}
call_buffer_free(&c->buffer);
ice_fragments_cleanup(c->sdp_fragments, true);
t_hash_table_destroy(c->sdp_fragments);
rwlock_destroy(&c->master_lock);
assert(c->stream_fds.head == NULL);
@ -4122,6 +4125,7 @@ static call_t *call_create(const str *callid) {
c->dtls_cert = dtls_cert();
c->tos = rtpe_config.default_tos;
c->poller = rtpe_get_poller();
c->sdp_fragments = fragments_ht_new();
if (rtpe_config.cpu_affinity)
c->cpu_affinity = call_socket_cpu_affinity++ % rtpe_config.cpu_affinity;
else


+ 54
- 44
daemon/call_interfaces.c View File

@ -2104,9 +2104,7 @@ static enum load_limit_reasons call_offer_session_limit(void) {
void save_last_sdp(struct call_monologue *ml, str *sdp, sdp_sessions_q *parsed, sdp_streams_q *streams) {
str_free_dup(&ml->last_in_sdp);
ml->last_in_sdp = *sdp;
*sdp = STR_NULL;
sdp_sessions_clear(&ml->last_in_sdp_parsed);
ml->last_in_sdp_parsed = *parsed;
@ -2131,11 +2129,34 @@ static enum basic_errors call_ng_basic_checks(sdp_ng_flags *flags, enum call_opm
return 0;
}
static const char *call_offer_get_call(call_t **callp, sdp_ng_flags *flags) {
// are we allowed to create a call? use `errstr` to determine
const char *errstr = NULL; // creation is allowed
enum load_limit_reasons limit = call_offer_session_limit();
if (limit != LOAD_LIMIT_NONE) {
if (!flags->supports_load_limit)
errstr = "Parallel session limit reached"; // legacy protocol
else
errstr = magic_load_limit_strings[limit];
// errstr is set, creation not allowed
}
if (!errstr)
*callp = call_get_or_create(&flags->call_id, false);
else {
*callp = call_get(&flags->call_id);
if (!*callp)
return errstr;
}
return NULL;
}
static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode opmode, const char* addr,
const endpoint_t *sin)
{
const char *errstr;
g_auto(str) sdp = STR_NULL;
str sdp = STR_NULL;
g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT;
g_auto(sdp_streams_q) streams = TYPED_GQUEUE_INIT;
g_autoptr(call_t) call = NULL;
@ -2151,11 +2172,23 @@ static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode
if ((ret = call_ng_basic_checks(&flags, opmode)) > 0)
return _ng_basic_errors[ret];
/* for answer: swap To against From tag */
if (opmode == OP_ANSWER)
if (opmode == OP_OFFER) {
errstr = call_offer_get_call(&call, &flags);
if (errstr)
goto out;
}
else if (opmode == OP_ANSWER) {
call = call_get(&flags.call_id);
errstr = "Unknown call-id";
if (!call)
goto out;
/* for answer: swap To against From tag */
str_swap(&flags.to_tag, &flags.from_tag);
}
sdp = str_dup_str(&flags.sdp);
sdp = call_str_cpy(&flags.sdp);
errstr = "Failed to parse SDP";
if (sdp_parse(&sdp, &parsed, &flags))
@ -2168,37 +2201,17 @@ static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode
goto out;
}
/* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */
call = call_get(&flags.call_id);
errstr = "Incomplete SDP specification";
if (sdp_streams(&parsed, &streams, &flags))
goto out;
// SDP fragments for trickle ICE must always operate on an existing call
if (opmode == OP_OFFER && trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) {
if (opmode == OP_OFFER && trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) {
errstr = NULL;
// SDP fragments for trickle ICE are consumed with no replacement returned
goto out;
}
if (opmode == OP_OFFER && !call) {
enum load_limit_reasons limit = call_offer_session_limit();
if (limit != LOAD_LIMIT_NONE) {
if (!flags.supports_load_limit)
errstr = "Parallel session limit reached"; // legacy protocol
else
errstr = magic_load_limit_strings[limit];
goto out;
}
call = call_get_or_create(&flags.call_id, false);
}
errstr = "Unknown call-id";
if (!call)
goto out;
errstr = "Incomplete SDP specification";
if (sdp_streams(&parsed, &streams, &flags))
goto out;
if (flags.debug)
CALL_SET(call, DEBUG);
@ -3677,7 +3690,7 @@ const char *call_publish_ng(ng_command_ctx_t *ctx,
g_auto(sdp_ng_flags) flags;
g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT;
g_auto(sdp_streams_q) streams = TYPED_GQUEUE_INIT;
g_auto(str) sdp_in = STR_NULL;
str sdp_in = STR_NULL;
g_auto(str) sdp_out = STR_NULL;
g_autoptr(call_t) call = NULL;
int ret;
@ -3688,22 +3701,19 @@ const char *call_publish_ng(ng_command_ctx_t *ctx,
if ((ret = call_ng_basic_checks(&flags, OP_PUBLISH)) > 0)
return _ng_basic_errors[ret];
sdp_in = str_dup_str(&flags.sdp);
call = call_get_or_create(&flags.call_id, false);
sdp_in = call_str_cpy(&flags.sdp);
if (sdp_parse(&sdp_in, &parsed, &flags))
return "Failed to parse SDP";
call = call_get(&flags.call_id);
if (trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed))
return NULL;
if (!call)
call = call_get_or_create(&flags.call_id, false);
if (sdp_streams(&parsed, &streams, &flags))
return "Incomplete SDP specification";
if (trickle_ice_update(ctx->ngbuf, call, &flags, &streams))
return NULL;
updated_created_from(call, addr, sin);
struct call_monologue *ml = call_get_or_create_monologue(call, &flags.from_tag);
@ -3858,7 +3868,10 @@ const char *call_subscribe_answer_ng(ng_command_ctx_t *ctx) {
if (sdp_parse(&flags.sdp, &parsed, &flags))
return "Failed to parse SDP";
if (trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed))
if (sdp_streams(&parsed, &streams, &flags))
return "Incomplete SDP specification";
if (trickle_ice_update(ctx->ngbuf, call, &flags, &streams))
return NULL;
if (!flags.to_tag.s)
@ -3871,9 +3884,6 @@ const char *call_subscribe_answer_ng(ng_command_ctx_t *ctx) {
if (!dest_ml)
return "To-tag not found";
if (sdp_streams(&parsed, &streams, &flags))
return "Incomplete SDP specification";
int ret = monologue_subscribe_answer(dest_ml, &flags, &streams);
if (ret)
return "Failed to process subscription answer";


+ 17
- 92
daemon/ice.c View File

@ -33,15 +33,10 @@
STR_FMT_M(&(p)->remote_candidate->foundation), \
(p)->remote_candidate->component_id
struct fragment_key {
str call_id;
str from_tag;
};
struct sdp_fragment {
ng_buffer *ngbuf;
struct timeval received;
sdp_streams_q streams;
sdp_sessions_q sdp;
sdp_ng_flags flags;
};
@ -88,35 +83,13 @@ const char * const ice_type_strings[] = {
static unsigned int frag_key_hash(const struct fragment_key *A);
static int frag_key_eq(const struct fragment_key *A, const struct fragment_key *B);
static void fragment_key_free(struct fragment_key *);
TYPED_GQUEUE(fragment, struct sdp_fragment)
TYPED_GHASHTABLE(fragments_ht, struct fragment_key, fragment_q, frag_key_hash, frag_key_eq,
fragment_key_free, NULL)
TYPED_GHASHTABLE_LOOKUP_INSERT(fragments_ht, fragment_key_free, fragment_q_new)
static mutex_t sdp_fragments_lock;
static fragments_ht sdp_fragments;
TYPED_GHASHTABLE_LOOKUP_INSERT(fragments_ht, NULL, fragment_q_new)
static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *streams, sdp_sessions_q *sdp,
static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *streams,
sdp_ng_flags *flags)
{
g_auto(sdp_streams_q) streams_local = TYPED_GQUEUE_INIT;
if (!streams)
streams = &streams_local;
if (!streams->head) {
if (sdp_streams(sdp, streams, flags)) {
ilogs(ice, LOG_WARN, "Incomplete SDP specification for tricle ICE");
return;
}
}
for (__auto_type l = streams->head; l; l = l->next) {
struct stream_params *sp = l->data;
struct call_media *media = NULL;
@ -148,41 +121,19 @@ static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *s
}
static unsigned int frag_key_hash(const struct fragment_key *a) {
return str_hash(&a->call_id) ^ str_hash(&a->from_tag);
}
static int frag_key_eq(const struct fragment_key *a, const struct fragment_key *b) {
return str_equal(&a->call_id, &b->call_id)
&& str_equal(&a->from_tag, &b->from_tag);
}
static void fragment_free(struct sdp_fragment *frag) {
sdp_streams_clear(&frag->streams);
sdp_sessions_clear(&frag->sdp);
call_ng_free_flags(&frag->flags);
obj_put(frag->ngbuf);
g_slice_free1(sizeof(*frag), frag);
}
static void fragment_key_free(struct fragment_key *k) {
g_free(k->call_id.s);
g_free(k->from_tag.s);
g_slice_free1(sizeof(*k), k);
}
static void queue_sdp_fragment(ng_buffer *ngbuf, sdp_streams_q *streams, sdp_sessions_q *sdp, sdp_ng_flags *flags) {
static void queue_sdp_fragment(ng_buffer *ngbuf, call_t *call, str *key, sdp_streams_q *streams, sdp_ng_flags *flags) {
ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M,
STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag));
struct fragment_key *k = g_slice_alloc0(sizeof(*k));
k->call_id = str_dup_str(&flags->call_id);
k->from_tag = str_dup_str(&flags->from_tag);
struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag));
frag->received = rtpe_now;
frag->ngbuf = obj_get(ngbuf);
if (sdp) {
frag->sdp = *sdp;
t_queue_init(sdp);
}
if (streams) {
frag->streams = *streams;
t_queue_init(streams);
@ -190,48 +141,36 @@ static void queue_sdp_fragment(ng_buffer *ngbuf, sdp_streams_q *streams, sdp_ses
frag->flags = *flags;
ZERO(*flags);
mutex_lock(&sdp_fragments_lock);
fragment_q *frags = fragments_ht_lookup_insert(sdp_fragments, k);
fragment_q *frags = fragments_ht_lookup_insert(call->sdp_fragments, call_str_dup(key));
t_queue_push_tail(frags, frag);
mutex_unlock(&sdp_fragments_lock);
}
bool trickle_ice_update(ng_buffer *ngbuf, call_t *call, sdp_ng_flags *flags,
sdp_streams_q *streams, sdp_sessions_q *sdp)
sdp_streams_q *streams)
{
if (!flags->fragment)
return false;
if (!call) {
queue_sdp_fragment(ngbuf, streams, sdp, flags);
return true;
}
struct call_monologue *ml = call_get_monologue(call, &flags->from_tag);
if (!ml) {
queue_sdp_fragment(ngbuf, streams, sdp, flags);
queue_sdp_fragment(ngbuf, call, &flags->from_tag, streams, flags);
return true;
}
ice_update_media_streams(ml, streams, sdp, flags);
ice_update_media_streams(ml, streams, flags);
return true;
}
#define MAX_FRAG_AGE 3000000
void dequeue_sdp_fragments(struct call_monologue *monologue) {
struct fragment_key k;
ZERO(k);
k.call_id = monologue->call->callid;
k.from_tag = monologue->tag;
call_t *call = monologue->call;
fragment_q *frags = NULL;
{
LOCK(&sdp_fragments_lock);
t_hash_table_steal_extended(sdp_fragments, &k, NULL, &frags);
if (!frags)
return;
t_hash_table_steal_extended(call->sdp_fragments, &monologue->tag, NULL, &frags);
if (!frags)
return;
// we own the queue now
}
// we own the queue now
struct sdp_fragment *frag;
while ((frag = t_queue_pop_head(frags))) {
@ -239,9 +178,9 @@ void dequeue_sdp_fragments(struct call_monologue *monologue) {
goto next;
ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M,
STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag));
STR_FMT_M(&call->callid), STR_FMT_M(&monologue->tag));
ice_update_media_streams(monologue, &frag->streams, &frag->sdp, &frag->flags);
ice_update_media_streams(monologue, &frag->streams, &frag->flags);
next:
fragment_free(frag);
@ -249,7 +188,7 @@ next:
t_queue_free(frags);
}
static gboolean fragment_check_cleanup(struct fragment_key *key, fragment_q *frags, void *p) {
static gboolean fragment_check_cleanup(str *key, fragment_q *frags, void *p) {
bool all = GPOINTER_TO_INT(p);
if (!key || !frags)
return TRUE;
@ -266,10 +205,8 @@ static gboolean fragment_check_cleanup(struct fragment_key *key, fragment_q *fra
}
return FALSE;
}
static void fragments_cleanup(bool all) {
mutex_lock(&sdp_fragments_lock);
t_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all));
mutex_unlock(&sdp_fragments_lock);
void ice_fragments_cleanup(fragments_ht ht, bool all) {
t_hash_table_foreach_remove(ht, fragment_check_cleanup, GINT_TO_POINTER(all));
}
@ -764,22 +701,10 @@ static void __agent_deschedule(struct ice_agent *ag) {
void ice_init(void) {
random_string((void *) &tie_breaker, sizeof(tie_breaker));
timerthread_init(&ice_agents_timer_thread, 1, ice_agents_timer_run);
sdp_fragments = fragments_ht_new();
mutex_init(&sdp_fragments_lock);
}
void ice_free(void) {
timerthread_free(&ice_agents_timer_thread);
fragments_cleanup(true);
t_hash_table_destroy_ptr(&sdp_fragments);
mutex_destroy(&sdp_fragments_lock);
}
enum thread_looper_action ice_slow_timer(void) {
fragments_cleanup(false);
return TLA_CONTINUE;
}
static void __fail_pair(struct ice_candidate_pair *pair) {


+ 11
- 11
daemon/janus.c View File

@ -851,7 +851,7 @@ static const char *janus_videoroom_configure(struct websocket_message *wm, struc
if (strcmp(jsep_type, "offer"))
return "Not an offer";
g_auto(str) sdp_in = STR_DUP(jsep_sdp);
str sdp_in = call_str_cpy_c(jsep_sdp);
g_auto(sdp_ng_flags) flags;
g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT;
@ -956,7 +956,15 @@ static const char *janus_videoroom_start(struct websocket_message *wm, struct ja
if (strcmp(jsep_type, "answer"))
return "Not an answer";
g_auto(str) sdp_in = STR_DUP(jsep_sdp);
struct janus_room *room = t_hash_table_lookup(janus_rooms, &room_id);
*retcode = 426;
if (!room)
return "No such room";
g_autoptr(call_t) call = call_get(&room->call_id);
if (!call)
return "No such room";
str sdp_in = call_str_cpy_c(jsep_sdp);
g_auto(sdp_ng_flags) flags;
g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT;
@ -966,14 +974,6 @@ static const char *janus_videoroom_start(struct websocket_message *wm, struct ja
if (sdp_parse(&sdp_in, &parsed, &flags))
return "Failed to parse SDP";
struct janus_room *room = t_hash_table_lookup(janus_rooms, &room_id);
*retcode = 426;
if (!room)
return "No such room";
g_autoptr(call_t) call = call_get(&room->call_id);
if (!call)
return "No such room";
*retcode = 512;
if (sdp_streams(&parsed, &streams, &flags))
return "Incomplete SDP specification";
@ -1677,7 +1677,7 @@ static const char *janus_trickle(JsonReader *reader, struct janus_session *sessi
bencode_strdup_str(&ngbuf->buffer, &sp->ice_ufrag, ufrag);
// finally do the update
trickle_ice_update(ngbuf, call, &flags, &streams, NULL);
trickle_ice_update(ngbuf, call, &flags, &streams);
return NULL;
}


+ 0
- 4
daemon/main.c View File

@ -1521,10 +1521,6 @@ int main(int argc, char **argv) {
thread_create_looper(call_rate_stats_updater, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "call stats", 1000000);
/* separate thread for ice slow timer functionality */
thread_create_looper(ice_slow_timer, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "ICE slow", 1000000);
/* thread to close expired call */
thread_create_looper(call_timer, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "kill calls", 1000000);


+ 6
- 0
include/call.h View File

@ -634,6 +634,11 @@ TYPED_GQUEUE(monologues, struct call_monologue)
G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(monologues_q, monologues_q_clear)
TYPED_GHASHTABLE(tags_ht, str, struct call_monologue, str_hash, str_equal, NULL, NULL)
struct sdp_fragment;
TYPED_GQUEUE(fragment, struct sdp_fragment)
TYPED_GHASHTABLE(fragments_ht, str, fragment_q, str_hash, str_equal, NULL, NULL)
struct call_iterator_list {
call_list *first;
mutex_t lock; // protects .first and every entry's .data
@ -738,6 +743,7 @@ struct call {
tags_ht tags;
tags_ht viabranches;
labels_ht labels;
fragments_ht sdp_fragments;
packet_stream_q streams;
stream_fd_q stream_fds; /* stream_fd */
endpoint_map_q endpoint_maps;


+ 3
- 3
include/ice.h View File

@ -14,6 +14,7 @@
#include "socket.h"
#include "timerthread.h"
#include "types.h"
#include "call.h"
#define MAX_COMPONENTS 2
#define TIMER_RUN_INTERVAL 20 /* ms */
@ -167,9 +168,8 @@ int ice_response(stream_fd *, const endpoint_t *src,
void dequeue_sdp_fragments(struct call_monologue *);
bool trickle_ice_update(ng_buffer *ngbuf, call_t *call, sdp_ng_flags *flags,
sdp_streams_q *streams, sdp_sessions_q *sdp);
enum thread_looper_action ice_slow_timer(void);
sdp_streams_q *streams);
void ice_fragments_cleanup(fragments_ht ht, bool all);
#include "call.h"


+ 0
- 3
t/test-stats.c View File

@ -3240,7 +3240,6 @@ int main(void) {
call_timer();
stats_counters_calc_rate(rtpe_stats, 150000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate);
ice_slow_timer();
RTPE_STATS_ADD(ng_commands[NGC_OFFER], 100);
rtpe_now.tv_sec += 2;
@ -3249,7 +3248,6 @@ int main(void) {
call_timer();
stats_counters_calc_rate(rtpe_stats, 2000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate);
ice_slow_timer();
// timer run time interval increased
rtpe_now.tv_sec += 5;
@ -3258,7 +3256,6 @@ int main(void) {
call_timer();
stats_counters_calc_rate(rtpe_stats, 5000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate);
ice_slow_timer();
graph_str = print_graphite_data();
assert_g_string_eq(graph_str,


Loading…
Cancel
Save