Browse Source

TT#14008 implement lock-light global call iterators

Change-Id: Ie01eb06262267a1aa0b69f632a515d5271bafb52
pull/1287/head
Richard Fuchs 5 years ago
parent
commit
95a30674ac
4 changed files with 176 additions and 51 deletions
  1. +80
    -40
      daemon/call.c
  2. +5
    -10
      daemon/call_interfaces.c
  3. +49
    -0
      include/aux.h
  4. +42
    -1
      include/call.h

+ 80
- 40
daemon/call.c View File

@ -67,6 +67,7 @@ struct stats rtpe_stats;
rwlock_t rtpe_callhash_lock;
GHashTable *rtpe_callhash;
struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS];
/* ********** */
@ -255,7 +256,6 @@ out:
rwlock_unlock_r(&rtpe_config.config_lock);
rwlock_unlock_r(&c->master_lock);
log_info_clear();
obj_put(c);
}
void xmlrpc_kill_calls(void *p) {
@ -515,16 +515,9 @@ static void update_requests_per_second_stats(struct requests_ps *request, uint64
mutex_unlock(&request->lock);
}
static void calls_build_list(void *k, void *v, void *d) {
GSList **list = d;
struct call *c = v;
*list = g_slist_prepend(*list, obj_get(c));
}
static void call_timer(void *ptr) {
struct iterator_helper hlp;
GList *i, *l;
GSList *calls = NULL;
struct rtpengine_list_entry *ke;
struct packet_stream *ps, *sink;
struct stats tmpstats;
@ -539,7 +532,7 @@ static void call_timer(void *ptr) {
// timers are run in a single thread, so no locking required here
static struct timeval last_run;
static long long interval = 1000000; // usec
static long long interval = 900000; // usec
gettimeofday(&tv_start, NULL);
@ -559,16 +552,9 @@ static void call_timer(void *ptr) {
ZERO(hlp);
hlp.addr_sfd = g_hash_table_new(g_endpoint_hash, g_endpoint_eq);
/* obtain the call list and make a copy from it so not to hold the lock */
rwlock_lock_r(&rtpe_callhash_lock);
g_hash_table_foreach(rtpe_callhash, calls_build_list, &calls);
rwlock_unlock_r(&rtpe_callhash_lock);
while (calls) {
struct call *c = calls->data;
ITERATE_CALL_LIST_START(CALL_ITERATOR_TIMER, c);
call_timer_iterator(c, &hlp);
calls = g_slist_delete_link(calls, calls);
}
ITERATE_CALL_LIST_NEXT_END(c);
atomic64_local_copy_zero_struct(&tmpstats, &rtpe_statsps, bytes);
atomic64_local_copy_zero_struct(&tmpstats, &rtpe_statsps, packets);
@ -732,6 +718,9 @@ int call_init() {
return -1;
rwlock_init(&rtpe_callhash_lock);
for (int i = 0; i < NUM_CALL_ITERATORS; i++)
mutex_init(&rtpe_call_iterators[i].lock);
poller_add_timer(rtpe_poller, call_timer, NULL);
return 0;
@ -2491,17 +2480,11 @@ void add_total_calls_duration_in_interval(struct timeval *interval_tv) {
static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start,
struct timeval *interval_duration)
{
GHashTableIter iter;
gpointer key, value;
struct timeval call_duration, res = {0};
struct call *call;
struct call_monologue *ml;
rwlock_lock_r(&rtpe_callhash_lock);
g_hash_table_iter_init(&iter, rtpe_callhash);
ITERATE_CALL_LIST_START(CALL_ITERATOR_GRAPHITE, call);
while (g_hash_table_iter_next(&iter, &key, &value)) {
call = (struct call*) value;
if (!call->monologues.head || IS_FOREIGN_CALL(call))
continue;
ml = call->monologues.head->data;
@ -2511,8 +2494,9 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval
timeval_subtract(&call_duration, &rtpe_now, &ml->started);
timeval_add(&res, &res, &call_duration);
}
}
rwlock_unlock_r(&rtpe_callhash_lock);
ITERATE_CALL_LIST_NEXT_END(call);
return res;
}
@ -2586,6 +2570,46 @@ void call_destroy(struct call *c) {
redis_delete(c, rtpe_redis_write);
for (int i = 0; i < NUM_CALL_ITERATORS; i++) {
struct call *prev_call, *next_call;
while (1) {
mutex_lock(&rtpe_call_iterators[i].lock);
// lock this entry
mutex_lock(&c->iterator[i].lock);
// try lock adjacent entries
prev_call = c->iterator[i].link.prev ? c->iterator[i].link.prev->data : NULL;
next_call = c->iterator[i].link.next ? c->iterator[i].link.next->data : NULL;
if (prev_call) {
if (mutex_trylock(&prev_call->iterator[i].lock)) {
mutex_unlock(&c->iterator[i].lock);
mutex_unlock(&rtpe_call_iterators[i].lock);
continue; // try again
}
}
if (next_call) {
if (mutex_trylock(&next_call->iterator[i].lock)) {
mutex_unlock(&prev_call->iterator[i].lock);
mutex_unlock(&c->iterator[i].lock);
mutex_unlock(&rtpe_call_iterators[i].lock);
continue; // try again
}
}
break; // we can remove now
}
if (c->iterator[i].link.data)
obj_put_o(c->iterator[i].link.data);
rtpe_call_iterators[i].first = g_list_remove_link(rtpe_call_iterators[i].first,
&c->iterator[i].link);
ZERO(c->iterator[i].link);
if (prev_call)
mutex_unlock(&prev_call->iterator[i].lock);
if (next_call)
mutex_unlock(&next_call->iterator[i].lock);
mutex_unlock(&c->iterator[i].lock);
mutex_unlock(&rtpe_call_iterators[i].lock);
}
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */
@ -2830,6 +2854,9 @@ static struct call *call_create(const str *callid) {
c->tos = rtpe_config.default_tos;
c->ssrc_hash = create_ssrc_hash_call();
for (int i = 0; i < NUM_CALL_ITERATORS; i++)
mutex_init(&c->iterator[i].lock);
return c;
}
@ -2859,6 +2886,32 @@ restart:
rwlock_lock_w(&c->master_lock);
rwlock_unlock_w(&rtpe_callhash_lock);
for (int i = 0; i < NUM_CALL_ITERATORS; i++) {
c->iterator[i].link.data = obj_get(c);
struct call *first_call;
while (1) {
// lock the list
mutex_lock(&rtpe_call_iterators[i].lock);
// if there is a first entry, lock that
first_call = NULL;
if (rtpe_call_iterators[i].first) {
first_call = rtpe_call_iterators[i].first->data;
if (mutex_trylock(&first_call->iterator[i].lock)) {
mutex_unlock(&rtpe_call_iterators[i].lock);
continue; // retry
}
}
// we can insert now
break;
}
rtpe_call_iterators[i].first
= g_list_insert_before_link(rtpe_call_iterators[i].first,
rtpe_call_iterators[i].first, &c->iterator[i].link);
if (first_call)
mutex_unlock(&first_call->iterator[i].lock);
mutex_unlock(&rtpe_call_iterators[i].lock);
}
}
else {
obj_hold(c);
@ -3324,16 +3377,3 @@ out:
obj_put(c);
return ret;
}
static void call_get_all_calls_interator(void *key, void *val, void *ptr) {
GQueue *q = ptr;
g_queue_push_tail(q, obj_get_o(val));
}
void call_get_all_calls(GQueue *q) {
rwlock_lock_r(&rtpe_callhash_lock);
g_hash_table_foreach(rtpe_callhash, call_get_all_calls_interator, q);
rwlock_unlock_r(&rtpe_callhash_lock);
}

+ 5
- 10
daemon/call_interfaces.c View File

@ -466,20 +466,15 @@ static void call_status_iterator(struct call *c, struct streambuf_stream *s) {
}
void calls_status_tcp(struct streambuf_stream *s) {
GQueue q = G_QUEUE_INIT;
struct call *c;
call_get_all_calls(&q);
rwlock_lock_r(&rtpe_callhash_lock);
streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n",
g_queue_get_length(&q),
g_hash_table_size(rtpe_callhash),
atomic64_get(&rtpe_stats.bytes), 0, 0);
rwlock_unlock_r(&rtpe_callhash_lock);
while (q.head) {
c = g_queue_pop_head(&q);
ITERATE_CALL_LIST_START(CALL_ITERATOR_MAIN, c);
call_status_iterator(c, s);
obj_put(c);
}
ITERATE_CALL_LIST_NEXT_END(c);
}


+ 49
- 0
include/aux.h View File

@ -81,6 +81,55 @@ INLINE void g_queue_free_full(GQueue *q, GDestroyNotify free_func) {
g_queue_free(q);
}
#endif
#if !GLIB_CHECK_VERSION(2,62,0)
// from https://github.com/GNOME/glib/blob/master/glib/glist.c
INLINE GList *
g_list_insert_before_link (GList *list,
GList *sibling,
GList *link_)
{
g_return_val_if_fail (link_ != NULL, list);
g_return_val_if_fail (link_->prev == NULL, list);
g_return_val_if_fail (link_->next == NULL, list);
if (list == NULL)
{
g_return_val_if_fail (sibling == NULL, list);
return link_;
}
else if (sibling != NULL)
{
link_->prev = sibling->prev;
link_->next = sibling;
sibling->prev = link_;
if (link_->prev != NULL)
{
link_->prev->next = link_;
return list;
}
else
{
g_return_val_if_fail (sibling == list, link_);
return link_;
}
}
else
{
GList *last;
for (last = list; last->next != NULL; last = last->next) {}
last->next = link_;
last->next->prev = last;
last->next->next = NULL;
return list;
}
}
#endif


+ 42
- 1
include/call.h View File

@ -65,6 +65,13 @@ enum call_stream_state {
CSS_PIERCE_NAT,
CSS_RUNNING,
};
enum {
CALL_ITERATOR_MAIN = 0,
CALL_ITERATOR_TIMER,
CALL_ITERATOR_GRAPHITE,
NUM_CALL_ITERATORS
};
#define ERROR_NO_FREE_PORTS -100
#define ERROR_NO_FREE_LOGS -101
@ -393,6 +400,38 @@ struct call_monologue {
unsigned int rec_forwarding:1;
};
struct call_iterator_list {
GList *first;
mutex_t lock; // protects .first and every entry's .data
};
struct call_iterator_entry {
GList link; // .data is protected by the list's main lock
mutex_t lock; // held while the link is in use, protects link.prev and link.next
};
#define ITERATE_CALL_LIST_START(which, varname) \
do { \
int __which = (which); \
mutex_lock(&rtpe_call_iterators[__which].lock); \
\
GList *__l = rtpe_call_iterators[__which].first; \
while (__l) { \
struct call *varname = __l->data; \
obj_hold(varname); \
mutex_lock(&varname->iterator[__which].lock); \
mutex_unlock(&rtpe_call_iterators[__which].lock)
#define ITERATE_CALL_LIST_NEXT_END(varname) \
GList *__next = varname->iterator[__which].link.next; \
mutex_unlock(&varname->iterator[__which].lock); \
__l = __next; \
obj_put(varname); \
mutex_lock(&rtpe_call_iterators[__which].lock); \
} \
\
mutex_unlock(&rtpe_call_iterators[__which].lock); \
} while (0)
struct call {
struct obj obj;
@ -427,6 +466,8 @@ struct call {
struct recording *recording;
str metadata;
struct call_iterator_entry iterator[NUM_CALL_ITERATORS];
// ipv4/ipv6 media flags
unsigned int is_ipv4_media_offer:1;
unsigned int is_ipv6_media_offer:1;
@ -449,6 +490,7 @@ struct call {
extern rwlock_t rtpe_callhash_lock;
extern GHashTable *rtpe_callhash;
extern struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS];
extern struct stats rtpe_statsps; /* per second stats, running timer */
extern struct stats rtpe_stats; /* copied from statsps once a second */
@ -456,7 +498,6 @@ extern struct stats rtpe_stats; /* copied from statsps once a second */
int call_init(void);
void call_free(void);
void call_get_all_calls(GQueue *q);
struct call_monologue *__monologue_create(struct call *call);
void __monologue_tag(struct call_monologue *ml, const str *tag);


Loading…
Cancel
Save