Browse Source

Redis nofitications fixes

- add --subscribe_keyspace list config parameter.
- don't delete foreign calls by timers
- fix synchronization of foreign calls (use a separate redis_notify database)
- fix statistics for control channel calls.
- fix deletion of foreign calls upon del notifications
- update rtpengine-ctl tool
pull/225/head
smititelu 10 years ago
committed by Stefan Mititelu
parent
commit
b38f3da45c
13 changed files with 274 additions and 116 deletions
  1. +8
    -1
      README.md
  2. +4
    -0
      daemon/aux.c
  3. +1
    -2
      daemon/aux.h
  4. +55
    -41
      daemon/call.c
  5. +11
    -12
      daemon/call.h
  6. +2
    -0
      daemon/call_interfaces.c
  7. +73
    -17
      daemon/cli.c
  8. +3
    -2
      daemon/graphite.c
  9. +23
    -3
      daemon/main.c
  10. +74
    -35
      daemon/redis.c
  11. +4
    -3
      daemon/redis.h
  12. +7
    -0
      debian/ngcp-rtpengine-daemon.init
  13. +9
    -0
      utils/rtpengine-ctl

+ 8
- 1
README.md View File

@ -170,6 +170,7 @@ option and which are reproduced below:
-M, --port-max=INT Highest port to use for RTP
-r, --redis=[PW@]IP:PORT/INT Connect to Redis database
-w, --redis-write=[PW@]IP:PORT/INT Connect to Redis write database
-k, --subscribe-keyspace Subscription keyspace list
--redis-num-threads=INT Number of Redis restore threads
-q, --no-redis-required Start even if can't connect to redis databases
-b, --b2b-url=STRING XMLRPC URL of B2B UA
@ -387,7 +388,13 @@ The options are described in more detail below.
When both options are given, *rtpengine* will start and use the Redis database regardless of the
database's role (master or slave).
* --redis-num-threads
* -k, --subscribe-keyspace
List of redis keyspaces to subscribe. If this is not present, no keyspaces are subscribed (default behaviour).
Further subscriptions could be added/removed via 'rtpengine-ctl ksadd/ksrm'.
This may lead to enabling/disabling of the redis keyspace notification feature.
* --redis-num-threads
How many redis restore threads to create. The default is four.


+ 4
- 0
daemon/aux.c View File

@ -220,3 +220,7 @@ int uint32_eq(const void *a, const void *b) {
const u_int32_t *A = a, *B = b;
return (*A == *B) ? TRUE : FALSE;
}
int uint_cmp(const void *a, const void *b) {
const unsigned int *A = a, *B = b;
return (int) (*A - *B);
}

+ 1
- 2
daemon/aux.h View File

@ -82,8 +82,7 @@ unsigned int in6_addr_hash(const void *p);
int in6_addr_eq(const void *a, const void *b);
unsigned int uint32_hash(const void *p);
int uint32_eq(const void *a, const void *b);
int uint_cmp(const void *a, const void *b);
/*** GLIB HELPERS ***/


+ 55
- 41
daemon/call.c View File

@ -189,7 +189,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
cm = c->callmaster;
if (!(c->redis_call_responsible)) {
if (c->redis_foreign_call) {
ilog(LOG_DEBUG, "Redis-Notification: Timeout resets the deletion timers for a call where I am not responsible.");
c->deleted = c->ml_deleted = poller_now + cm->conf.delete_delay;
goto out;
@ -247,8 +247,9 @@ next:
;
}
if (good)
if (good || c->redis_foreign_call) {
goto out;
}
if (c->ml_deleted)
goto out;
@ -537,10 +538,10 @@ static void callmaster_timer(void *ptr) {
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) {
if (ps && ps->call->redis_foreign_call && ke->rtp_stats[j].packets > 0) {
ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets.");
ps->call->redis_call_responsible = 1;
atomic64_dec(&m->stats.foreign_sessions);
ps->call->redis_foreign_call = 0;
//atomic64_dec(&m->stats.foreign_sessions); /* this doesn't decrease when call becomes active */
}
}
@ -1795,7 +1796,7 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
while (g_hash_table_iter_next(&iter, &key, &value)) {
call = (struct call*) value;
if (!call->monologues.head)
if (!call->monologues.head || IS_BACKUP_CALL(call))
continue;
ml = call->monologues.head->data;
if (timercmp(interval_start, &ml->started, >)) {
@ -1842,23 +1843,27 @@ void call_destroy(struct call *c) {
rwlock_lock_w(&m->hashlock);
ret = g_hash_table_remove(m->callhash, &c->callid);
if(!IS_BACKUP_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min,
g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
rwlock_unlock_w(&m->hashlock);
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats.managed_sess_crt--;
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min,
m->totalstats.managed_sess_crt);
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
if (!ret)
return;
obj_put(c);
if (c->redis_call_responsible) {
if (!c->redis_foreign_call) {
redis_delete(c, m->conf.redis_write);
}
if (c->redis_foreign_call) {
atomic64_dec(&m->stats.foreign_sessions);
}
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */
@ -2048,6 +2053,8 @@ void call_destroy(struct call *c) {
atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));
atomic64_add(&m->totalstats.total_relayed_packets,
atomic64_get(&ps->stats.packets));
atomic64_add(&m->totalstats_interval.total_relayed_packets,
@ -2104,9 +2111,11 @@ void call_destroy(struct call *c) {
}
if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) {
if (atomic64_get(&ps->stats.packets)!=0) {
atomic64_inc(&m->totalstats.total_oneway_stream_sess);
atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess);
if (atomic64_get(&ps->stats.packets)!=0 && !IS_BACKUP_CALL(c)){
if (atomic64_get(&ps->stats.packets)!=0) {
atomic64_inc(&m->totalstats.total_oneway_stream_sess);
atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess);
}
}
else {
total_nopacket_relayed_sess++;
@ -2114,30 +2123,34 @@ void call_destroy(struct call *c) {
}
}
atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
if (!IS_BACKUP_CALL(c)) {
atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
}
if (c->monologues.head) {
ml = c->monologues.head->data;
if (ml->term_reason==TIMEOUT) {
atomic64_inc(&m->totalstats.total_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_timeout_sess);
} else if (ml->term_reason==SILENT_TIMEOUT) {
atomic64_inc(&m->totalstats.total_silent_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess);
} else if (ml->term_reason==REGULAR) {
atomic64_inc(&m->totalstats.total_regular_term_sess);
atomic64_inc(&m->totalstats_interval.total_regular_term_sess);
} else if (ml->term_reason==FORCED) {
atomic64_inc(&m->totalstats.total_forced_term_sess);
atomic64_inc(&m->totalstats_interval.total_forced_term_sess);
}
timeval_totalstats_average_add(&m->totalstats, &tim_result_duration);
timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration);
timeval_totalstats_interval_call_duration_add(&m->totalstats_interval,
&ml->started, &g_now, &m->latest_graphite_interval_start);
if (!IS_BACKUP_CALL(c)) {
if (ml->term_reason==TIMEOUT) {
atomic64_inc(&m->totalstats.total_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_timeout_sess);
} else if (ml->term_reason==SILENT_TIMEOUT) {
atomic64_inc(&m->totalstats.total_silent_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess);
} else if (ml->term_reason==REGULAR) {
atomic64_inc(&m->totalstats.total_regular_term_sess);
atomic64_inc(&m->totalstats_interval.total_regular_term_sess);
} else if (ml->term_reason==FORCED) {
atomic64_inc(&m->totalstats.total_forced_term_sess);
atomic64_inc(&m->totalstats_interval.total_forced_term_sess);
}
timeval_totalstats_average_add(&m->totalstats, &tim_result_duration);
timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration);
timeval_totalstats_interval_call_duration_add(&m->totalstats_interval,
&ml->started, &g_now, &m->latest_graphite_interval_start);
}
}
@ -2291,11 +2304,12 @@ restart:
goto restart;
}
g_hash_table_insert(m->callhash, &c->callid, obj_get(c));
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats.managed_sess_crt++;
m->totalstats_interval.managed_sess_max = MAX(m->totalstats_interval.managed_sess_max,
m->totalstats.managed_sess_crt);
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
if(! IS_BACKUP_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_max = MAX(m->totalstats_interval.managed_sess_max,
g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
}
rwlock_lock_w(&c->master_lock);
rwlock_unlock_w(&m->hashlock);
}


+ 11
- 12
daemon/call.h View File

@ -108,8 +108,7 @@ enum call_stream_state {
#define __C_DBG(x...) ((void)0)
#endif
#define IS_BACKUP_CALL(c) (c->is_backup_call)
/* flags shared by several of the structs below */
#define SHARED_FLAG_IMPLICIT_RTCP 0x00000001
@ -250,12 +249,11 @@ struct totalstats {
struct timeval total_average_call_dur;
mutex_t managed_sess_lock; /* for these below */
u_int64_t managed_sess_crt;
u_int64_t managed_sess_max; /* per graphite interval statistic */
u_int64_t managed_sess_min; /* per graphite interval statistic */
mutex_t total_calls_duration_lock; /* for these two below */
struct timeval total_calls_duration_interval;
struct timeval total_calls_duration_interval;
};
struct stream_params {
@ -416,10 +414,10 @@ struct call {
unsigned char tos;
char *created_from;
sockaddr_t created_from_addr;
int redis_hosted_db;
// following flag servers as a mark that this
// call is either created by me or where I took over the responsibility (a packet has seen)
int redis_call_responsible;
unsigned int redis_hosted_db;
unsigned int redis_foreign_call;
unsigned int is_backup_call; // created_via_redis_notify call
};
struct callmaster_config {
@ -431,8 +429,9 @@ struct callmaster_config {
unsigned int delete_delay;
struct redis *redis;
struct redis *redis_write;
struct redis *redis_read_notify;
struct redis *redis_notify;
struct event_base *redis_notify_event_base;
GQueue *redis_subscribed_keyspaces;
struct redisAsyncContext *redis_notify_async_context;
char *b2b_url;
unsigned char default_tos;
@ -450,11 +449,11 @@ struct callmaster {
GHashTable *callhash;
/* XXX rework these */
struct stats statsps; /* per second stats, running timer */
struct stats stats; /* copied from statsps once a second */
struct stats statsps; /* per second stats, running timer */
struct stats stats; /* copied from statsps once a second */
struct totalstats totalstats;
struct totalstats totalstats_interval;
mutex_t totalstats_lastinterval_lock;
mutex_t totalstats_lastinterval_lock;
struct totalstats totalstats_lastinterval;
/* control_ng_stats stuff */


+ 2
- 0
daemon/call_interfaces.c View File

@ -746,6 +746,8 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i
if (g_hash_table_size(m->callhash) -
atomic64_get(&m->stats.foreign_sessions) >= m->conf.max_sessions) {
rwlock_unlock_r(&m->hashlock);
/* foreign calls can't get rejected
* total_rejected_sess applies only to "own" sessions */
atomic64_inc(&m->totalstats.total_rejected_sess);
atomic64_inc(&m->totalstats_interval.total_rejected_sess);
ilog(LOG_ERROR, "Parallel session limit reached (%i)",m->conf.max_sessions);


+ 73
- 17
daemon/cli.c View File

@ -34,8 +34,6 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions);
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total foreign sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_foreign_sessions));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total rejected sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_rejected_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess));
@ -168,7 +166,7 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
}
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_call_responsible?"no":"yes");
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_foreign_call?"yes":"no");
ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = c->monologues.head; l; l = l->next) {
@ -348,9 +346,11 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char*
if (len>=strlen(LIST_NUMSESSIONS) && strncmp(buffer,LIST_NUMSESSIONS,strlen(LIST_NUMSESSIONS)) == 0) {
rwlock_lock_r(&m->hashlock);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions (own and foreign) running on rtpengine: %i\n", g_hash_table_size(m->callhash));
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&m->stats.foreign_sessions));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: "UINT64F"\n", atomic64_get(&m->stats.foreign_sessions));
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions total: %i\n", g_hash_table_size(m->callhash));
ADJUSTLEN(printlen,outbufend,replybuffer);
rwlock_unlock_r(&m->hashlock);
} else if (len>=strlen(LIST_SESSIONS) && strncmp(buffer,LIST_SESSIONS,strlen(LIST_SESSIONS)) == 0) {
@ -365,7 +365,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char*
while (g_hash_table_iter_next (&iter, &key, &value)) {
ptrkey = (str*)key;
call = (struct call*)value;
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_call_responsible?"no":"yes");
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_foreign_call?"yes":"no");
ADJUSTLEN(printlen,outbufend,replybuffer);
}
rwlock_unlock_r(&m->hashlock);
@ -474,8 +474,8 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m,
static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
unsigned int keyspace_db;
int *pint;
int keyspace_db;
str str_keyspace_db;
if (len<=1) {
@ -489,19 +489,32 @@ static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char
str_keyspace_db.len = len;
keyspace_db = str_to_i(&str_keyspace_db, -1);
redis_notify_subscribe_keyspace(m,keyspace_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db);
if (keyspace_db != -1) {
redis_notify_subscribe_keyspace(m,keyspace_db);
if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, &keyspace_db, uint_cmp)) {
pint = (int*)malloc(sizeof(int));
*pint = keyspace_db;
g_queue_push_tail(m->conf.redis_subscribed_keyspaces, pint);
}
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db);
}
else {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Could not add keyspace %i to redis notifications.\n", keyspace_db);
}
ADJUSTLEN(printlen,outbufend,replybuffer);
}
static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
unsigned int keyspace_db;
int printlen = 0;
struct call* c = NULL;
GHashTableIter iter;
gpointer key, value;
struct call_monologue *ml = NULL;
GList *l, *i;
int keyspace_db;
str str_keyspace_db;
if (len<=1) {
if (len <= 1) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required.");
ADJUSTLEN(printlen,outbufend,replybuffer);
return;
@ -512,9 +525,49 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char*
str_keyspace_db.len = len;
keyspace_db = str_to_i(&str_keyspace_db, -1);
redis_notify_unsubscribe_keyspace(m,keyspace_db);
if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, &keyspace_db, uint_cmp))) {
// remove this keyspace
redis_notify_unsubscribe_keyspace(m,keyspace_db);
g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully unsubscribed from keyspace %i.\n", keyspace_db);
// remove all current foreign calls for this keyspace
g_hash_table_iter_init(&iter, m->callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) {
c = (struct call*)value;
if (!c || !c->redis_foreign_call || !(c->redis_hosted_db == keyspace_db)) {
continue;
}
if (!c->ml_deleted) {
for (i = c->monologues.head; i; i = i->next) {
ml = i->data;
gettimeofday(&(ml->terminated), NULL);
ml->term_reason = FORCED;
}
}
call_destroy(c);
g_hash_table_iter_init(&iter, m->callhash);
}
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed all foreign calls for keyspace %i.\n", keyspace_db);
} else {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %i was not among redis notifications.\n", keyspace_db);
}
ADJUSTLEN(printlen,outbufend,replybuffer);
}
static void cli_incoming_kslist(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
GList *l;
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nSubscribed-on keyspaces:\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = m->conf.redis_subscribed_keyspaces->head; l; l = l->next) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "%d ", *((unsigned int *)(l->data)));
ADJUSTLEN(printlen,outbufend,replybuffer);
}
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed keyspace %i to redis notifications.\n", keyspace_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
}
@ -567,6 +620,7 @@ next:
static const char* SET = "set";
static const char* KSADD = "ksadd";
static const char* KSRM = "ksrm";
static const char* KSLIST = "kslist";
if (strncmp(inbuf,LIST,strlen(LIST)) == 0) {
cli_incoming_list(inbuf+strlen(LIST), inlen-strlen(LIST), cli->callmaster, outbuf, outbufend);
@ -578,6 +632,8 @@ next:
cli_incoming_ksadd(inbuf+strlen(KSADD), inlen-strlen(KSADD), cli->callmaster, outbuf, outbufend);
} else if (strncmp(inbuf,KSRM,strlen(KSRM)) == 0) {
cli_incoming_ksrm(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend);
} else if (strncmp(inbuf,KSLIST,strlen(KSLIST)) == 0) {
cli_incoming_kslist(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend);
} else {
sprintf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", inbuf);
}


+ 3
- 2
daemon/graphite.c View File

@ -113,8 +113,9 @@ int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) {
mutex_lock(&cm->totalstats_interval.managed_sess_lock);
ts->managed_sess_max = cm->totalstats_interval.managed_sess_max;
ts->managed_sess_min = cm->totalstats_interval.managed_sess_min;
cm->totalstats_interval.managed_sess_max = cm->totalstats.managed_sess_crt;
cm->totalstats_interval.managed_sess_min = cm->totalstats.managed_sess_crt;
cm->totalstats_interval.managed_sess_max = g_hash_table_size(cm->callhash) - atomic64_get(&cm->stats.foreign_sessions);
cm->totalstats_interval.managed_sess_min = g_hash_table_size(cm->callhash) - atomic64_get(&cm->stats.foreign_sessions);
mutex_unlock(&cm->totalstats_interval.managed_sess_lock);
rwlock_unlock_r(&cm->hashlock);


+ 23
- 3
daemon/main.c View File

@ -54,6 +54,7 @@ static mutex_t *openssl_locks;
static char *pidfile;
static gboolean foreground;
static GQueue interfaces = G_QUEUE_INIT;
static GQueue keyspaces = G_QUEUE_INIT;
endpoint_t tcp_listen_ep;
endpoint_t udp_listen_ep;
endpoint_t ng_listen_ep;
@ -254,6 +255,9 @@ static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth
static void options(int *argc, char ***argv) {
char **if_a = NULL;
char **ks_a = NULL;
int *pint;
str str_keyspace_db;
char **iter;
struct intf_config *ifa;
char *listenps = NULL;
@ -266,7 +270,7 @@ static void options(int *argc, char ***argv) {
char *redisps_write = NULL;
char *log_facility_s = NULL;
char *log_facility_cdr_s = NULL;
char *log_facility_rtcp_s = NULL;
char *log_facility_rtcp_s = NULL;
int version = 0;
int sip_source = 0;
@ -275,6 +279,7 @@ static void options(int *argc, char ***argv) {
{ "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" },
{ "no-fallback",'F', 0, G_OPTION_ARG_NONE, &no_fallback, "Only start when kernel module is available", NULL },
{ "interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY,&if_a, "Local interface for RTP", "[NAME/]IP[!IP]"},
{ "subscribe-keyspace", 'k', 0, G_OPTION_ARG_STRING_ARRAY,&ks_a, "Subscription keyspace list", "INT INT ..."},
{ "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" },
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" },
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" },
@ -331,6 +336,18 @@ static void options(int *argc, char ***argv) {
g_queue_push_tail(&interfaces, ifa);
}
if (ks_a) {
for (iter = ks_a; *iter; iter++) {
pint = (int *)malloc(sizeof(int));
str_keyspace_db.s = *iter;
str_keyspace_db.len = strlen(*iter);
*pint = str_to_i(&str_keyspace_db, -1);
if (*pint != -1)
g_queue_push_tail(&keyspaces, pint);
}
}
if (listenps) {
if (endpoint_parse_any(&tcp_listen_ep, listenps))
die("Invalid IP or port (--listen-tcp)");
@ -542,6 +559,8 @@ no_kernel:
mc.fmt = xmlrpc_fmt;
mc.graphite_ep = graphite_ep;
mc.graphite_interval = graphite_interval;
mc.redis_subscribed_keyspaces = g_queue_copy(&keyspaces);
if (redis_num_threads < 1) {
#ifdef _SC_NPROCESSORS_ONLN
redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN );
@ -591,7 +610,8 @@ no_kernel:
}
if (!is_addr_unspecified(&redis_ep.address)) {
mc.redis = mc.redis_read_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
mc.redis = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
mc.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
if (!mc.redis)
die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED paramter.",
endpoint_print_buf(&redis_ep));
@ -642,7 +662,7 @@ int main(int argc, char **argv) {
thread_create_detach(poller_timer_loop, ctx.p);
if (!is_addr_unspecified(&redis_ep.address))
thread_create_detach(redis_notify, ctx.m);
thread_create_detach(redis_notify_loop, ctx.m);
if (!is_addr_unspecified(&graphite_ep.address))
thread_create_detach(graphite_loop, ctx.m);


+ 74
- 35
daemon/redis.c View File

@ -234,6 +234,7 @@ int str_cut(char *str, int begin, int len) {
}
static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id);
static int redis_check_conn(struct redis *r);
void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
@ -244,14 +245,13 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
char db_str[16]; memset(&db_str, 0, 8);
char *pdbstr = db_str;
char *p = 0;
int dbno;
if (!(cm->conf.redis)) {
rlog(LOG_ERROR, "A redis notification has been there but role was not 'master' or 'read'");
return;
}
r = cm->conf.redis_read_notify;
r = cm->conf.redis_notify;
mutex_lock(&r->lock);
@ -285,7 +285,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
goto err;
}
}
dbno = r->db = atoi(db_str);
r->db = atoi(db_str);
// select the right db for restoring the call
if (redisCommandNR(r->ctx, "SELECT %i", r->db)) {
@ -305,7 +305,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
c = g_hash_table_lookup(cm->callhash, &callid);
if (c && c->redis_call_responsible) {
if (c && !c->redis_foreign_call) {
rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications.");
goto err;
}
@ -322,13 +322,13 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
// we lookup again to retrieve the call to insert the kayspace db id
c = g_hash_table_lookup(cm->callhash, &callid);
if (c) {
c->redis_hosted_db = dbno;
c->redis_foreign_call = 1;
c->is_backup_call = 1;
}
}
if (strncmp(rr->element[3]->str,"del",3)==0) {
call_destroy(c);
atomic64_dec(&cm->stats.foreign_sessions);
}
err:
@ -358,13 +358,15 @@ void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) {
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str);
}
void redis_notify(void *d) {
struct callmaster *cm = d;
static void redis_notify(struct callmaster *cm) {
struct redis *r = 0;
if (cm->conf.redis) {
r = cm->conf.redis;
GList *l;
if (cm->conf.redis_notify) {
r = cm->conf.redis_notify;
rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint));
} else {
rlog(LOG_INFO, "I do not subscribe to redis notifications since no REDIS is configured");
rlog(LOG_INFO, "Don't use Redis notifications. See --redis-notifications parameter.");
return;
}
@ -372,17 +374,59 @@ void redis_notify(void *d) {
cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port);
if (cm->conf.redis_notify_async_context->err) {
rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr);
rlog(LOG_ERROR, "Redis notification error: %s\n", cm->conf.redis_notify_async_context->errstr);
return;
}
redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base);
redis_notify_subscribe_keyspace(cm,r->db);
/* Subscribing to the values in the configured keyspaces */
for (l = cm->conf.redis_subscribed_keyspaces->head; l; l = l->next) {
redis_notify_subscribe_keyspace(cm,*(int *)(l->data));
}
event_base_dispatch(cm->conf.redis_notify_event_base);
}
void redis_notify_loop(void *d) {
int seconds = 1;
time_t next_run = g_now.tv_sec;
struct callmaster *cm = (struct callmaster *)d;
struct redis *r;
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster");
return ;
}
r = cm->conf.redis_notify;
if (!r) {
return ;
}
// initial redis_notify
if (redis_check_conn(r) == REDIS_STATE_CONNECTED) {
redis_notify(cm);
}
// loop redis_notify => in case of lost connection
while (!g_shutdown) {
gettimeofday(&g_now, NULL);
if (g_now.tv_sec < next_run) {
usleep(100000);
continue;
}
next_run = g_now.tv_sec + seconds;
if (redis_check_conn(r) == REDIS_STATE_RECONNECTED) {
redis_notify(cm);
}
}
}
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) {
struct redis *r;
@ -434,8 +478,7 @@ static int redis_check_conn(struct redis *r) {
// try redis connection
if (redisCommandNR(r->ctx, "PING") == 0) {
// redis is connected
// redis_check_conn() executed well
return 0;
return REDIS_STATE_CONNECTED;
}
// redis is disconnected
@ -445,11 +488,10 @@ static int redis_check_conn(struct redis *r) {
r->state = REDIS_STATE_DISCONNECTED;
}
// try redis reconnect -> will free current r->ctx
// try redis reconnect => will free current r->ctx
if (redis_connect(r, 1)) {
// redis is disconnected
// redis_check_conn() executed well
return 0;
return REDIS_STATE_DISCONNECTED;
}
// redis is connected
@ -459,8 +501,8 @@ static int redis_check_conn(struct redis *r) {
r->state = REDIS_STATE_CONNECTED;
}
// redis_check_conn() executed well
return 0;
// redis is re-connected
return REDIS_STATE_RECONNECTED;
}
@ -1172,6 +1214,10 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi
if (!redis_hash_get_str(&s, &call, "created_from_addr"))
sockaddr_parse_any_str(&c->created_from_addr, &s);
err = "missing 'redis_hosted_db' value";
if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db"))
goto err6;
err = "failed to create sfds";
if (redis_sfds(c, &sfds))
goto err6;
@ -1205,8 +1251,6 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi
goto err6;
err = NULL;
c->redis_hosted_db = r->db;
c->redis_call_responsible = 1;
obj_put(c);
err6:
@ -1276,8 +1320,7 @@ int redis_restore(struct callmaster *m, struct redis *r) {
rlog(LOG_DEBUG, "Restoring calls from Redis...");
mutex_lock(&r->lock);
redis_check_conn(r);
if (r->state == REDIS_STATE_DISCONNECTED) {
if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) {
mutex_unlock(&r->lock);
ret = 0;
goto err;
@ -1422,14 +1465,14 @@ void redis_update(struct call *c, struct redis *r) {
return;
mutex_lock(&r->lock);
redis_check_conn(r);
if (r->state == REDIS_STATE_DISCONNECTED) {
if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) {
mutex_unlock(&r->lock);
return ;
}
rwlock_lock_r(&c->master_lock);
c->redis_hosted_db = r->db;
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) {
rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error.");
goto err;
@ -1441,14 +1484,15 @@ void redis_update(struct call *c, struct redis *r) {
redis_pipe(r, "HMSET call-"PB" created %llu last_signal %llu tos %i deleted %llu "
"num_sfds %u num_streams %u num_medias %u num_tags %u "
"num_maps %u "
"ml_deleted %llu created_from %s created_from_addr %s",
"ml_deleted %llu created_from %s created_from_addr %s redis_hosted_db %u",
STR(&c->callid), (long long unsigned) c->created, (long long unsigned) c->last_signal,
(int) c->tos, (long long unsigned) c->deleted,
g_queue_get_length(&c->stream_fds), g_queue_get_length(&c->streams),
g_queue_get_length(&c->medias), g_queue_get_length(&c->monologues),
g_queue_get_length(&c->endpoint_maps),
(long long unsigned) c->ml_deleted,
c->created_from, sockaddr_print_buf(&c->created_from_addr));
c->created_from, sockaddr_print_buf(&c->created_from_addr),
c->redis_hosted_db);
/* XXX DTLS cert?? */
redis_pipe(r, "DEL sfd-"PB"-0", STR(&c->callid));
@ -1667,9 +1711,6 @@ void redis_update(struct call *c, struct redis *r) {
redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid));
redis_pipe(r, "SADD calls "PB"", STR(&c->callid));
redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid));
if (!c->redis_hosted_db)
c->redis_hosted_db = r->db;
c->redis_call_responsible = 1;
redis_consume(r);
@ -1693,8 +1734,7 @@ void redis_delete(struct call *c, struct redis *r) {
return;
mutex_lock(&r->lock);
redis_check_conn(r);
if (r->state == REDIS_STATE_DISCONNECTED) {
if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) {
mutex_unlock(&r->lock);
return ;
}
@ -1728,8 +1768,7 @@ void redis_wipe(struct redis *r) {
return;
mutex_lock(&r->lock);
redis_check_conn(r);
if (r->state == REDIS_STATE_DISCONNECTED) {
if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) {
mutex_unlock(&r->lock);
return ;
}


+ 4
- 3
daemon/redis.h View File

@ -25,8 +25,9 @@ enum redis_role {
};
enum redis_state {
REDIS_STATE_DISCONNECTED = 0,
REDIS_STATE_CONNECTED = 1,
REDIS_STATE_DISCONNECTED = 0, // DISCONNECTED -> DISCONNECTED
REDIS_STATE_CONNECTED, // CONNECTED -> CONNECTED
REDIS_STATE_RECONNECTED, // DISCONNECTED -> CONNECTED
};
struct callmaster;
@ -89,7 +90,7 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v)
#define REDIS_FMT(x) (x)->len, (x)->str
void redis_notify(void *d);
void redis_notify_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required);


+ 7
- 0
debian/ngcp-rtpengine-daemon.init View File

@ -48,6 +48,13 @@ if [ ! -z "$INTERFACES" ]; then
OPTIONS="$OPTIONS --interface=$interface"
done
fi
if [ ! -z "$SUBSCRIBE_KEYSPACES" ]; then
for ks in $SUBSCRIBE_KEYSPACES; do
OPTIONS="$OPTIONS --subscribe-keyspace=$ks"
done
fi
[ -z "$ADDRESS" ] || OPTIONS="$OPTIONS --interface=$ADDRESS"
[ -z "$ADV_ADDRESS" ] || OPTIONS="$OPTIONS!$ADV_ADDRESS"
[ -z "$ADDRESS_IPV6" ] || OPTIONS="$OPTIONS --interface=$ADDRESS_IPV6"


+ 9
- 0
utils/rtpengine-ctl View File

@ -78,6 +78,15 @@ sub showusage {
print " maxsessions <int> : set the max nr of allowed sessions\n";
print " maxopenfiles <uint> : set the max nr of allowed open files\n";
print "\n";
print " ksadd [ keyspace <uint>]\n";
print " keyspace <uint> : subscribe to 'keyspace' database\n";
print "\n";
print " ksrm [ keyspace <uint>]\n";
print " keyspace <uint> : unsubscribe to 'keyspace' database\n";
print " : remove all foreign calls for that 'keyspace'\n";
print "\n";
print " kslist : print all currently subscribed keyspaces\n";
print "\n";
print "\n";
print " Return Value:\n";
print " 0 on success with ouput from server side, other values for failure.\n";


Loading…
Cancel
Save