From 95a30674ac25a6f01dc29fbbb44d0d50e555fc2c Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 2 Jun 2021 14:28:22 -0400 Subject: [PATCH] TT#14008 implement lock-light global call iterators Change-Id: Ie01eb06262267a1aa0b69f632a515d5271bafb52 --- daemon/call.c | 120 ++++++++++++++++++++++++++------------- daemon/call_interfaces.c | 15 ++--- include/aux.h | 49 ++++++++++++++++ include/call.h | 43 +++++++++++++- 4 files changed, 176 insertions(+), 51 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index fb6cbadf8..004d5e43d 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -67,6 +67,7 @@ struct stats rtpe_stats; rwlock_t rtpe_callhash_lock; GHashTable *rtpe_callhash; +struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS]; /* ********** */ @@ -255,7 +256,6 @@ out: rwlock_unlock_r(&rtpe_config.config_lock); rwlock_unlock_r(&c->master_lock); log_info_clear(); - obj_put(c); } void xmlrpc_kill_calls(void *p) { @@ -515,16 +515,9 @@ static void update_requests_per_second_stats(struct requests_ps *request, uint64 mutex_unlock(&request->lock); } -static void calls_build_list(void *k, void *v, void *d) { - GSList **list = d; - struct call *c = v; - *list = g_slist_prepend(*list, obj_get(c)); -} - static void call_timer(void *ptr) { struct iterator_helper hlp; GList *i, *l; - GSList *calls = NULL; struct rtpengine_list_entry *ke; struct packet_stream *ps, *sink; struct stats tmpstats; @@ -539,7 +532,7 @@ static void call_timer(void *ptr) { // timers are run in a single thread, so no locking required here static struct timeval last_run; - static long long interval = 1000000; // usec + static long long interval = 900000; // usec gettimeofday(&tv_start, NULL); @@ -559,16 +552,9 @@ static void call_timer(void *ptr) { ZERO(hlp); hlp.addr_sfd = g_hash_table_new(g_endpoint_hash, g_endpoint_eq); - /* obtain the call list and make a copy from it so not to hold the lock */ - rwlock_lock_r(&rtpe_callhash_lock); - g_hash_table_foreach(rtpe_callhash, calls_build_list, &calls); - rwlock_unlock_r(&rtpe_callhash_lock); - - while (calls) { - struct call *c = calls->data; + ITERATE_CALL_LIST_START(CALL_ITERATOR_TIMER, c); call_timer_iterator(c, &hlp); - calls = g_slist_delete_link(calls, calls); - } + ITERATE_CALL_LIST_NEXT_END(c); atomic64_local_copy_zero_struct(&tmpstats, &rtpe_statsps, bytes); atomic64_local_copy_zero_struct(&tmpstats, &rtpe_statsps, packets); @@ -732,6 +718,9 @@ int call_init() { return -1; rwlock_init(&rtpe_callhash_lock); + for (int i = 0; i < NUM_CALL_ITERATORS; i++) + mutex_init(&rtpe_call_iterators[i].lock); + poller_add_timer(rtpe_poller, call_timer, NULL); return 0; @@ -2491,17 +2480,11 @@ void add_total_calls_duration_in_interval(struct timeval *interval_tv) { static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start, struct timeval *interval_duration) { - GHashTableIter iter; - gpointer key, value; struct timeval call_duration, res = {0}; - struct call *call; struct call_monologue *ml; - rwlock_lock_r(&rtpe_callhash_lock); - g_hash_table_iter_init(&iter, rtpe_callhash); + ITERATE_CALL_LIST_START(CALL_ITERATOR_GRAPHITE, call); - while (g_hash_table_iter_next(&iter, &key, &value)) { - call = (struct call*) value; if (!call->monologues.head || IS_FOREIGN_CALL(call)) continue; ml = call->monologues.head->data; @@ -2511,8 +2494,9 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval timeval_subtract(&call_duration, &rtpe_now, &ml->started); timeval_add(&res, &res, &call_duration); } - } - rwlock_unlock_r(&rtpe_callhash_lock); + + ITERATE_CALL_LIST_NEXT_END(call); + return res; } @@ -2586,6 +2570,46 @@ void call_destroy(struct call *c) { redis_delete(c, rtpe_redis_write); + for (int i = 0; i < NUM_CALL_ITERATORS; i++) { + struct call *prev_call, *next_call; + while (1) { + mutex_lock(&rtpe_call_iterators[i].lock); + // lock this entry + mutex_lock(&c->iterator[i].lock); + // try lock adjacent entries + prev_call = c->iterator[i].link.prev ? c->iterator[i].link.prev->data : NULL; + next_call = c->iterator[i].link.next ? c->iterator[i].link.next->data : NULL; + if (prev_call) { + if (mutex_trylock(&prev_call->iterator[i].lock)) { + mutex_unlock(&c->iterator[i].lock); + mutex_unlock(&rtpe_call_iterators[i].lock); + continue; // try again + } + } + if (next_call) { + if (mutex_trylock(&next_call->iterator[i].lock)) { + mutex_unlock(&prev_call->iterator[i].lock); + mutex_unlock(&c->iterator[i].lock); + mutex_unlock(&rtpe_call_iterators[i].lock); + continue; // try again + } + } + break; // we can remove now + } + if (c->iterator[i].link.data) + obj_put_o(c->iterator[i].link.data); + rtpe_call_iterators[i].first = g_list_remove_link(rtpe_call_iterators[i].first, + &c->iterator[i].link); + ZERO(c->iterator[i].link); + if (prev_call) + mutex_unlock(&prev_call->iterator[i].lock); + if (next_call) + mutex_unlock(&next_call->iterator[i].lock); + mutex_unlock(&c->iterator[i].lock); + mutex_unlock(&rtpe_call_iterators[i].lock); + } + + rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ @@ -2830,6 +2854,9 @@ static struct call *call_create(const str *callid) { c->tos = rtpe_config.default_tos; c->ssrc_hash = create_ssrc_hash_call(); + for (int i = 0; i < NUM_CALL_ITERATORS; i++) + mutex_init(&c->iterator[i].lock); + return c; } @@ -2859,6 +2886,32 @@ restart: rwlock_lock_w(&c->master_lock); rwlock_unlock_w(&rtpe_callhash_lock); + + for (int i = 0; i < NUM_CALL_ITERATORS; i++) { + c->iterator[i].link.data = obj_get(c); + struct call *first_call; + while (1) { + // lock the list + mutex_lock(&rtpe_call_iterators[i].lock); + // if there is a first entry, lock that + first_call = NULL; + if (rtpe_call_iterators[i].first) { + first_call = rtpe_call_iterators[i].first->data; + if (mutex_trylock(&first_call->iterator[i].lock)) { + mutex_unlock(&rtpe_call_iterators[i].lock); + continue; // retry + } + } + // we can insert now + break; + } + rtpe_call_iterators[i].first + = g_list_insert_before_link(rtpe_call_iterators[i].first, + rtpe_call_iterators[i].first, &c->iterator[i].link); + if (first_call) + mutex_unlock(&first_call->iterator[i].lock); + mutex_unlock(&rtpe_call_iterators[i].lock); + } } else { obj_hold(c); @@ -3324,16 +3377,3 @@ out: obj_put(c); return ret; } - - -static void call_get_all_calls_interator(void *key, void *val, void *ptr) { - GQueue *q = ptr; - g_queue_push_tail(q, obj_get_o(val)); -} - -void call_get_all_calls(GQueue *q) { - rwlock_lock_r(&rtpe_callhash_lock); - g_hash_table_foreach(rtpe_callhash, call_get_all_calls_interator, q); - rwlock_unlock_r(&rtpe_callhash_lock); - -} diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 3f6226571..910c7c948 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -466,20 +466,15 @@ static void call_status_iterator(struct call *c, struct streambuf_stream *s) { } void calls_status_tcp(struct streambuf_stream *s) { - GQueue q = G_QUEUE_INIT; - struct call *c; - - call_get_all_calls(&q); - + rwlock_lock_r(&rtpe_callhash_lock); streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n", - g_queue_get_length(&q), + g_hash_table_size(rtpe_callhash), atomic64_get(&rtpe_stats.bytes), 0, 0); + rwlock_unlock_r(&rtpe_callhash_lock); - while (q.head) { - c = g_queue_pop_head(&q); + ITERATE_CALL_LIST_START(CALL_ITERATOR_MAIN, c); call_status_iterator(c, s); - obj_put(c); - } + ITERATE_CALL_LIST_NEXT_END(c); } diff --git a/include/aux.h b/include/aux.h index 104c429f7..c06160376 100644 --- a/include/aux.h +++ b/include/aux.h @@ -81,6 +81,55 @@ INLINE void g_queue_free_full(GQueue *q, GDestroyNotify free_func) { g_queue_free(q); } #endif +#if !GLIB_CHECK_VERSION(2,62,0) + +// from https://github.com/GNOME/glib/blob/master/glib/glist.c + +INLINE GList * +g_list_insert_before_link (GList *list, + GList *sibling, + GList *link_) +{ + g_return_val_if_fail (link_ != NULL, list); + g_return_val_if_fail (link_->prev == NULL, list); + g_return_val_if_fail (link_->next == NULL, list); + + if (list == NULL) + { + g_return_val_if_fail (sibling == NULL, list); + return link_; + } + else if (sibling != NULL) + { + link_->prev = sibling->prev; + link_->next = sibling; + sibling->prev = link_; + if (link_->prev != NULL) + { + link_->prev->next = link_; + return list; + } + else + { + g_return_val_if_fail (sibling == list, link_); + return link_; + } + } + else + { + GList *last; + + for (last = list; last->next != NULL; last = last->next) {} + + last->next = link_; + last->next->prev = last; + last->next->next = NULL; + + return list; + } +} + +#endif diff --git a/include/call.h b/include/call.h index ccd523c11..8afc54c91 100644 --- a/include/call.h +++ b/include/call.h @@ -65,6 +65,13 @@ enum call_stream_state { CSS_PIERCE_NAT, CSS_RUNNING, }; +enum { + CALL_ITERATOR_MAIN = 0, + CALL_ITERATOR_TIMER, + CALL_ITERATOR_GRAPHITE, + + NUM_CALL_ITERATORS +}; #define ERROR_NO_FREE_PORTS -100 #define ERROR_NO_FREE_LOGS -101 @@ -393,6 +400,38 @@ struct call_monologue { unsigned int rec_forwarding:1; }; +struct call_iterator_list { + GList *first; + mutex_t lock; // protects .first and every entry's .data +}; +struct call_iterator_entry { + GList link; // .data is protected by the list's main lock + mutex_t lock; // held while the link is in use, protects link.prev and link.next +}; + +#define ITERATE_CALL_LIST_START(which, varname) \ + do { \ + int __which = (which); \ + mutex_lock(&rtpe_call_iterators[__which].lock); \ + \ + GList *__l = rtpe_call_iterators[__which].first; \ + while (__l) { \ + struct call *varname = __l->data; \ + obj_hold(varname); \ + mutex_lock(&varname->iterator[__which].lock); \ + mutex_unlock(&rtpe_call_iterators[__which].lock) + +#define ITERATE_CALL_LIST_NEXT_END(varname) \ + GList *__next = varname->iterator[__which].link.next; \ + mutex_unlock(&varname->iterator[__which].lock); \ + __l = __next; \ + obj_put(varname); \ + mutex_lock(&rtpe_call_iterators[__which].lock); \ + } \ + \ + mutex_unlock(&rtpe_call_iterators[__which].lock); \ + } while (0) + struct call { struct obj obj; @@ -427,6 +466,8 @@ struct call { struct recording *recording; str metadata; + struct call_iterator_entry iterator[NUM_CALL_ITERATORS]; + // ipv4/ipv6 media flags unsigned int is_ipv4_media_offer:1; unsigned int is_ipv6_media_offer:1; @@ -449,6 +490,7 @@ struct call { extern rwlock_t rtpe_callhash_lock; extern GHashTable *rtpe_callhash; +extern struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS]; extern struct stats rtpe_statsps; /* per second stats, running timer */ extern struct stats rtpe_stats; /* copied from statsps once a second */ @@ -456,7 +498,6 @@ extern struct stats rtpe_stats; /* copied from statsps once a second */ int call_init(void); void call_free(void); -void call_get_all_calls(GQueue *q); struct call_monologue *__monologue_create(struct call *call); void __monologue_tag(struct call_monologue *ml, const str *tag);