Browse Source

Fixes after second review

- read/write lock config_lock for keyspace operations
- read lock hashlock when iterating through the callhash
pull/225/head
Stefan Mititelu 10 years ago
parent
commit
ce3ae37a76
6 changed files with 136 additions and 98 deletions
  1. +0
    -5
      daemon/aux.c
  2. +9
    -6
      daemon/call.c
  3. +1
    -0
      daemon/call.h
  4. +124
    -86
      daemon/cli.c
  5. +2
    -0
      daemon/redis.c
  6. +0
    -1
      debian/rules

+ 0
- 5
daemon/aux.c View File

@ -220,8 +220,3 @@ int uint32_eq(const void *a, const void *b) {
const u_int32_t *A = a, *B = b;
return (*A == *B) ? TRUE : FALSE;
}
int guint_cmp(gconstpointer a, gconstpointer b) {
const guint A = GPOINTER_TO_UINT(a), B = GPOINTER_TO_UINT(b);
return (int) (A - B);
}

+ 9
- 6
daemon/call.c View File

@ -1864,21 +1864,24 @@ void call_destroy(struct call *c) {
rwlock_lock_w(&m->hashlock);
ret = g_hash_table_remove(m->callhash, &c->callid);
rwlock_unlock_w(&m->hashlock);
// if call not found in hashlock => previously deleted
if (!ret)
return;
obj_put(c);
if (IS_FOREIGN_CALL(c)) {
atomic64_dec(&m->stats.foreign_sessions);
}
if(!IS_FOREIGN_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min,
g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
rwlock_unlock_w(&m->hashlock);
if (!ret)
return;
obj_put(c);
if (!IS_FOREIGN_CALL(c)) {
redis_delete(c, m->conf.redis_write);


+ 1
- 0
daemon/call.h View File

@ -20,6 +20,7 @@
#include "socket.h"
#include "media_socket.h"
#define UNDEFINED ((unsigned int) -1)
#define TRUNCATED " ... Output truncated. Increase Output Buffer ... \n"
#define truncate_output(x) strcpy(x - strlen(TRUNCATED) - 1, TRUNCATED)


+ 124
- 86
daemon/cli.c View File

@ -19,6 +19,72 @@
#include "rtpengine_config.h"
static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign_call, unsigned int uint_keyspace_db) {
struct call *c = NULL;
struct call_monologue *ml = NULL;
GQueue call_list = G_QUEUE_INIT;
GHashTableIter iter;
gpointer key, value;
GList *i;
// lock read
rwlock_lock_r(&m->hashlock);
g_hash_table_iter_init(&iter, m->callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) {
c = (struct call*)value;
if (!c) {
continue;
}
// match foreign_call flag
if ((foreign_call != UNDEFINED) && !(foreign_call == IS_FOREIGN_CALL(c))) {
continue;
}
// match uint_keyspace_db, if some given
if ((uint_keyspace_db != UNDEFINED) && !(uint_keyspace_db == c->redis_hosted_db)) {
continue;
}
// save call reference
g_queue_push_tail(&call_list, obj_get(c));
// increase ref counter
obj_get(c);
}
// unlock read
rwlock_unlock_r(&m->hashlock);
// destroy calls
while ((c = g_queue_pop_head(&call_list))) {
if (!c->ml_deleted) {
for (i = c->monologues.head; i; i = i->next) {
ml = i->data;
gettimeofday(&(ml->terminated), NULL);
ml->term_reason = FORCED;
}
}
call_destroy(c);
// decrease ref counter
obj_put(c);
}
}
static void destroy_all_foreign_calls(struct callmaster *m) {
destroy_own_foreign_calls(m, CT_FOREIGN_CALL, UNDEFINED);
}
static void destroy_all_own_calls(struct callmaster *m) {
destroy_own_foreign_calls(m, CT_OWN_CALL, UNDEFINED);
}
static void destroy_keyspace_foreign_calls(struct callmaster *m, unsigned int uint_keyspace_db) {
destroy_own_foreign_calls(m, CT_FOREIGN_CALL, uint_keyspace_db);
}
static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
struct timeval avg, calls_dur_iv;
@ -63,15 +129,15 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m
calls_dur_iv = m->totalstats_lastinterval.total_calls_duration_interval;
min_sess_iv = m->totalstats_lastinterval.managed_sess_min;
max_sess_iv = m->totalstats_lastinterval.managed_sess_max;
offer_iv = m->totalstats_lastinterval.offer;
answer_iv = m->totalstats_lastinterval.answer;
delete_iv = m->totalstats_lastinterval.delete;
mutex_unlock(&m->totalstats_lastinterval_lock);
offer_iv = m->totalstats_lastinterval.offer;
answer_iv = m->totalstats_lastinterval.answer;
delete_iv = m->totalstats_lastinterval.delete;
mutex_unlock(&m->totalstats_lastinterval_lock);
// compute average offer/answer/delete time
timeval_divide(&offer_iv.time_avg, &offer_iv.time_avg, offer_iv.count);
timeval_divide(&answer_iv.time_avg, &answer_iv.time_avg, answer_iv.count);
timeval_divide(&delete_iv.time_avg, &delete_iv.time_avg, delete_iv.count);
// compute average offer/answer/delete time
timeval_divide(&offer_iv.time_avg, &offer_iv.time_avg, offer_iv.count);
timeval_divide(&answer_iv.time_avg, &answer_iv.time_avg, answer_iv.count);
timeval_divide(&delete_iv.time_avg, &delete_iv.time_avg, delete_iv.count);
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nGraphite interval statistics (last reported values to graphite):\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
@ -572,8 +638,6 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m,
str termparam;
struct call* c=0;
int printlen=0;
GHashTableIter iter;
gpointer key, value;
struct call_monologue *ml;
GList *i;
@ -585,59 +649,43 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m,
++buffer; --len; // one space
str_init_len(&termparam,buffer,len);
// --- terminate all calls
if (!str_memcmp(&termparam,"all")) {
while (g_hash_table_size(m->callhash)) {
g_hash_table_iter_init (&iter, m->callhash);
g_hash_table_iter_next (&iter, &key, &value);
c = (struct call*)value;
if (!c) continue;
if (!c->ml_deleted) {
for (i = c->monologues.head; i; i = i->next) {
ml = i->data;
gettimeofday(&(ml->terminated), NULL);
ml->term_reason = FORCED;
}
}
call_destroy(c);
}
ilog(LOG_INFO,"All calls terminated by operator.");
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "All calls terminated by operator.");
ADJUSTLEN(printlen,outbufend,replybuffer);
return;
}
// --- terminate all calls
if (!str_memcmp(&termparam,"all")) {
// destroy own calls
destroy_all_own_calls(m);
// destroy foreign calls
destroy_all_foreign_calls(m);
// update cli
ilog(LOG_INFO,"All calls terminated by operator.");
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "All calls terminated by operator.");
ADJUSTLEN(printlen,outbufend,replybuffer);
return;
// --- terminate own calls
} else if (!str_memcmp(&termparam,"own")) {
// destroy own calls
destroy_all_own_calls(m);
// update cli
ilog(LOG_INFO,"All own calls terminated by operator.");
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "All own calls terminated by operator.");
ADJUSTLEN(printlen,outbufend,replybuffer);
return;
// --- terminate foreign calls
} else if (!str_memcmp(&termparam,"foreign")) {
// destroy foreign calls
destroy_all_foreign_calls(m);
// update cli
ilog(LOG_INFO,"All foreign calls terminated by operator.");
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "All foreign calls terminated by operator.");
ADJUSTLEN(printlen,outbufend,replybuffer);
// --- terminate own/foreign calls
else if (!str_memcmp(&termparam,"own") || !str_memcmp(&termparam,"foreign")) {
// remove all current own calls for this keyspace
g_hash_table_iter_init(&iter, m->callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) {
c = (struct call*)value;
if (!c) continue;
if (!str_memcmp(&termparam,"own") && IS_FOREIGN_CALL(c)) {
continue;
} else if (!str_memcmp(&termparam,"foreign") && !IS_FOREIGN_CALL(c)) {
continue;
}
if (!c->ml_deleted) {
for (i = c->monologues.head; i; i = i->next) {
ml = i->data;
gettimeofday(&(ml->terminated), NULL);
ml->term_reason = FORCED;
}
}
call_destroy(c);
g_hash_table_iter_init(&iter, m->callhash);
}
if (!str_memcmp(&termparam,"own")) {
ilog(LOG_INFO,"All own calls terminated by operator.");
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "All own calls terminated by operator.");
ADJUSTLEN(printlen,outbufend,replybuffer);
} else if (!str_memcmp(&termparam,"foreign")) {
ilog(LOG_INFO,"All foreign calls terminated by operator.");
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "All foreign calls terminated by operator.");
ADJUSTLEN(printlen,outbufend,replybuffer);
}
return;
}
@ -690,24 +738,22 @@ static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char
} else if (endptr == str_keyspace_db.s) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail adding keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s);
} else {
if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db), guint_cmp)) {
rwlock_lock_w(&m->conf.config_lock);
if (!g_queue_find(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) {
g_queue_push_tail(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Success adding keyspace %u to redis notifications.\n", uint_keyspace_db);
} else {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %u is already among redis notifications.\n", uint_keyspace_db);
}
rwlock_unlock_w(&m->conf.config_lock);
}
ADJUSTLEN(printlen,outbufend,replybuffer);
}
static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen = 0;
struct call* c = NULL;
GHashTableIter iter;
gpointer key, value;
struct call_monologue *ml = NULL;
GList *l, *i;
GList *l;
unsigned int uint_keyspace_db;
str str_keyspace_db;
char *endptr;
@ -723,37 +769,27 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char*
str_keyspace_db.len = len;
uint_keyspace_db = strtol(str_keyspace_db.s, &endptr, 10);
rwlock_lock_w(&m->conf.config_lock);
if ((errno == ERANGE && (uint_keyspace_db == LONG_MAX || uint_keyspace_db == LONG_MIN)) || (errno != 0 && uint_keyspace_db == 0)) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; errono=%d\n", str_keyspace_db.len, str_keyspace_db.s, errno);
} else if (endptr == str_keyspace_db.s) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s);
} else if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db), guint_cmp))) {
} else if ((l = g_queue_find(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) {
// remove this keyspace
redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db);
g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully unsubscribed from keyspace %u.\n", uint_keyspace_db);
// remove all current foreign calls for this keyspace
g_hash_table_iter_init(&iter, m->callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) {
c = (struct call*)value;
if (!c || !IS_FOREIGN_CALL(c)|| !(c->redis_hosted_db == uint_keyspace_db)) {
continue;
}
if (!c->ml_deleted) {
for (i = c->monologues.head; i; i = i->next) {
ml = i->data;
gettimeofday(&(ml->terminated), NULL);
ml->term_reason = FORCED;
}
}
call_destroy(c);
g_hash_table_iter_init(&iter, m->callhash);
}
// destroy foreign calls for this keyspace
destroy_keyspace_foreign_calls(m, uint_keyspace_db);
// update cli
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed all foreign calls for keyspace %u.\n", uint_keyspace_db);
} else {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %u is not among redis notifications.\n", uint_keyspace_db);
}
rwlock_unlock_w(&m->conf.config_lock);
ADJUSTLEN(printlen,outbufend,replybuffer);
}
@ -764,10 +800,12 @@ static void cli_incoming_kslist(char* buffer, int len, struct callmaster* m, cha
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nSubscribed-on keyspaces:\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
rwlock_lock_r(&m->conf.config_lock);
for (l = m->conf.redis_subscribed_keyspaces->head; l; l = l->next) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "%u ", GPOINTER_TO_UINT(l->data));
ADJUSTLEN(printlen,outbufend,replybuffer);
}
rwlock_unlock_r(&m->conf.config_lock);
printlen = snprintf(replybuffer, outbufend-replybuffer, "\n");
ADJUSTLEN(printlen,outbufend,replybuffer);


+ 2
- 0
daemon/redis.c View File

@ -515,9 +515,11 @@ static int redis_notify(struct callmaster *cm) {
}
// subscribe to the values in the configured keyspaces
rwlock_lock_r(&cm->conf.config_lock);
for (l = cm->conf.redis_subscribed_keyspaces->head; l; l = l->next) {
redis_notify_subscribe_action(cm, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data));
}
rwlock_unlock_r(&cm->conf.config_lock);
// dispatch event base => thread blocks here
if (event_base_dispatch(cm->conf.redis_notify_event_base) < 0) {


+ 0
- 1
debian/rules View File

@ -84,7 +84,6 @@ install: build
%:
@echo "--- Building: $@"
dh_shlibdeps --dpkg-shlibdeps-params=--ignore-missing-info
dh_installdirs -p$@ -P$(b)/$@
dh_link -p$@ -P$(b)/$@
dh_installdocs -p$@ -P$(b)/$@ debian/README.md.gz debian/README.html.gz


Loading…
Cancel
Save