diff --git a/README.md b/README.md index 55d3d7866..f417e30d4 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,7 @@ option and which are reproduced below: -M, --port-max=INT Highest port to use for RTP -r, --redis=[PW@]IP:PORT/INT Connect to Redis database -w, --redis-write=[PW@]IP:PORT/INT Connect to Redis write database + -k, --subscribe-keyspace Subscription keyspace list --redis-num-threads=INT Number of Redis restore threads -q, --no-redis-required Start even if can't connect to redis databases -b, --b2b-url=STRING XMLRPC URL of B2B UA @@ -387,7 +388,13 @@ The options are described in more detail below. When both options are given, *rtpengine* will start and use the Redis database regardless of the database's role (master or slave). -* --redis-num-threads +* -k, --subscribe-keyspace + + List of redis keyspaces to subscribe. If this is not present, no keyspaces are subscribed (default behaviour). + Further subscriptions could be added/removed via 'rtpengine-ctl ksadd/ksrm'. + This may lead to enabling/disabling of the redis keyspace notification feature. + +* --redis-num-threads How many redis restore threads to create. The default is four. diff --git a/daemon/aux.c b/daemon/aux.c index 459f781a6..286ee044a 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -220,3 +220,7 @@ int uint32_eq(const void *a, const void *b) { const u_int32_t *A = a, *B = b; return (*A == *B) ? TRUE : FALSE; } +int uint_cmp(const void *a, const void *b) { + const unsigned int *A = a, *B = b; + return (int) (*A - *B); +} diff --git a/daemon/aux.h b/daemon/aux.h index ea7576eb0..91667b7d3 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -82,8 +82,7 @@ unsigned int in6_addr_hash(const void *p); int in6_addr_eq(const void *a, const void *b); unsigned int uint32_hash(const void *p); int uint32_eq(const void *a, const void *b); - - +int uint_cmp(const void *a, const void *b); /*** GLIB HELPERS ***/ diff --git a/daemon/call.c b/daemon/call.c index 0da0acbf5..d7f960e1b 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -189,7 +189,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { cm = c->callmaster; - if (!(c->redis_call_responsible)) { + if (c->redis_foreign_call) { ilog(LOG_DEBUG, "Redis-Notification: Timeout resets the deletion timers for a call where I am not responsible."); c->deleted = c->ml_deleted = poller_now + cm->conf.delete_delay; goto out; @@ -247,8 +247,9 @@ next: ; } - if (good) + if (good || c->redis_foreign_call) { goto out; + } if (c->ml_deleted) goto out; @@ -537,10 +538,10 @@ static void callmaster_timer(void *ptr) { ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); - if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) { + if (ps && ps->call->redis_foreign_call && ke->rtp_stats[j].packets > 0) { ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets."); - ps->call->redis_call_responsible = 1; - atomic64_dec(&m->stats.foreign_sessions); + ps->call->redis_foreign_call = 0; + //atomic64_dec(&m->stats.foreign_sessions); /* this doesn't decrease when call becomes active */ } } @@ -1795,7 +1796,7 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, while (g_hash_table_iter_next(&iter, &key, &value)) { call = (struct call*) value; - if (!call->monologues.head) + if (!call->monologues.head || IS_BACKUP_CALL(call)) continue; ml = call->monologues.head->data; if (timercmp(interval_start, &ml->started, >)) { @@ -1842,23 +1843,27 @@ void call_destroy(struct call *c) { rwlock_lock_w(&m->hashlock); ret = g_hash_table_remove(m->callhash, &c->callid); + if(!IS_BACKUP_CALL(c)) { + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, + g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + } rwlock_unlock_w(&m->hashlock); - mutex_lock(&m->totalstats_interval.managed_sess_lock); - m->totalstats.managed_sess_crt--; - m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, - m->totalstats.managed_sess_crt); - mutex_unlock(&m->totalstats_interval.managed_sess_lock); - if (!ret) return; obj_put(c); - if (c->redis_call_responsible) { + if (!c->redis_foreign_call) { redis_delete(c, m->conf.redis_write); } + if (c->redis_foreign_call) { + atomic64_dec(&m->stats.foreign_sessions); + } + rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ @@ -2048,6 +2053,8 @@ void call_destroy(struct call *c) { atomic64_get(&ps->stats.errors), atomic64_get(&ps->last_packet)); + + atomic64_add(&m->totalstats.total_relayed_packets, atomic64_get(&ps->stats.packets)); atomic64_add(&m->totalstats_interval.total_relayed_packets, @@ -2104,9 +2111,11 @@ void call_destroy(struct call *c) { } if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { - if (atomic64_get(&ps->stats.packets)!=0) { - atomic64_inc(&m->totalstats.total_oneway_stream_sess); - atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); + if (atomic64_get(&ps->stats.packets)!=0 && !IS_BACKUP_CALL(c)){ + if (atomic64_get(&ps->stats.packets)!=0) { + atomic64_inc(&m->totalstats.total_oneway_stream_sess); + atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); + } } else { total_nopacket_relayed_sess++; @@ -2114,30 +2123,34 @@ void call_destroy(struct call *c) { } } - - atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); - atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + if (!IS_BACKUP_CALL(c)) { + atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + } if (c->monologues.head) { ml = c->monologues.head->data; - if (ml->term_reason==TIMEOUT) { - atomic64_inc(&m->totalstats.total_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_timeout_sess); - } else if (ml->term_reason==SILENT_TIMEOUT) { - atomic64_inc(&m->totalstats.total_silent_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); - } else if (ml->term_reason==REGULAR) { - atomic64_inc(&m->totalstats.total_regular_term_sess); - atomic64_inc(&m->totalstats_interval.total_regular_term_sess); - } else if (ml->term_reason==FORCED) { - atomic64_inc(&m->totalstats.total_forced_term_sess); - atomic64_inc(&m->totalstats_interval.total_forced_term_sess); - } - timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); - timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); - timeval_totalstats_interval_call_duration_add(&m->totalstats_interval, - &ml->started, &g_now, &m->latest_graphite_interval_start); + if (!IS_BACKUP_CALL(c)) { + if (ml->term_reason==TIMEOUT) { + atomic64_inc(&m->totalstats.total_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_timeout_sess); + } else if (ml->term_reason==SILENT_TIMEOUT) { + atomic64_inc(&m->totalstats.total_silent_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); + } else if (ml->term_reason==REGULAR) { + atomic64_inc(&m->totalstats.total_regular_term_sess); + atomic64_inc(&m->totalstats_interval.total_regular_term_sess); + } else if (ml->term_reason==FORCED) { + atomic64_inc(&m->totalstats.total_forced_term_sess); + atomic64_inc(&m->totalstats_interval.total_forced_term_sess); + } + + timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); + timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); + timeval_totalstats_interval_call_duration_add(&m->totalstats_interval, + &ml->started, &g_now, &m->latest_graphite_interval_start); + } } @@ -2291,11 +2304,12 @@ restart: goto restart; } g_hash_table_insert(m->callhash, &c->callid, obj_get(c)); - mutex_lock(&m->totalstats_interval.managed_sess_lock); - m->totalstats.managed_sess_crt++; - m->totalstats_interval.managed_sess_max = MAX(m->totalstats_interval.managed_sess_max, - m->totalstats.managed_sess_crt); - mutex_unlock(&m->totalstats_interval.managed_sess_lock); + if(! IS_BACKUP_CALL(c)) { + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats_interval.managed_sess_max = MAX(m->totalstats_interval.managed_sess_max, + g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + } rwlock_lock_w(&c->master_lock); rwlock_unlock_w(&m->hashlock); } diff --git a/daemon/call.h b/daemon/call.h index f3411b6fe..214ad8145 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -108,8 +108,7 @@ enum call_stream_state { #define __C_DBG(x...) ((void)0) #endif - - +#define IS_BACKUP_CALL(c) (c->is_backup_call) /* flags shared by several of the structs below */ #define SHARED_FLAG_IMPLICIT_RTCP 0x00000001 @@ -250,12 +249,11 @@ struct totalstats { struct timeval total_average_call_dur; mutex_t managed_sess_lock; /* for these below */ - u_int64_t managed_sess_crt; u_int64_t managed_sess_max; /* per graphite interval statistic */ u_int64_t managed_sess_min; /* per graphite interval statistic */ mutex_t total_calls_duration_lock; /* for these two below */ - struct timeval total_calls_duration_interval; + struct timeval total_calls_duration_interval; }; struct stream_params { @@ -416,10 +414,10 @@ struct call { unsigned char tos; char *created_from; sockaddr_t created_from_addr; - int redis_hosted_db; - // following flag servers as a mark that this - // call is either created by me or where I took over the responsibility (a packet has seen) - int redis_call_responsible; + + unsigned int redis_hosted_db; + unsigned int redis_foreign_call; + unsigned int is_backup_call; // created_via_redis_notify call }; struct callmaster_config { @@ -431,8 +429,9 @@ struct callmaster_config { unsigned int delete_delay; struct redis *redis; struct redis *redis_write; - struct redis *redis_read_notify; + struct redis *redis_notify; struct event_base *redis_notify_event_base; + GQueue *redis_subscribed_keyspaces; struct redisAsyncContext *redis_notify_async_context; char *b2b_url; unsigned char default_tos; @@ -450,11 +449,11 @@ struct callmaster { GHashTable *callhash; /* XXX rework these */ - struct stats statsps; /* per second stats, running timer */ - struct stats stats; /* copied from statsps once a second */ + struct stats statsps; /* per second stats, running timer */ + struct stats stats; /* copied from statsps once a second */ struct totalstats totalstats; struct totalstats totalstats_interval; - mutex_t totalstats_lastinterval_lock; + mutex_t totalstats_lastinterval_lock; struct totalstats totalstats_lastinterval; /* control_ng_stats stuff */ diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 89debe5fe..8c0812f85 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -746,6 +746,8 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i if (g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions) >= m->conf.max_sessions) { rwlock_unlock_r(&m->hashlock); + /* foreign calls can't get rejected + * total_rejected_sess applies only to "own" sessions */ atomic64_inc(&m->totalstats.total_rejected_sess); atomic64_inc(&m->totalstats_interval.total_rejected_sess); ilog(LOG_ERROR, "Parallel session limit reached (%i)",m->conf.max_sessions); diff --git a/daemon/cli.c b/daemon/cli.c index 02836cee1..80f25f93f 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -34,8 +34,6 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total foreign sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_foreign_sessions)); - ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total rejected sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_rejected_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess)); @@ -168,7 +166,7 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m } printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n", - c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_call_responsible?"no":"yes"); + c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_foreign_call?"yes":"no"); ADJUSTLEN(printlen,outbufend,replybuffer); for (l = c->monologues.head; l; l = l->next) { @@ -348,9 +346,11 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* if (len>=strlen(LIST_NUMSESSIONS) && strncmp(buffer,LIST_NUMSESSIONS,strlen(LIST_NUMSESSIONS)) == 0) { rwlock_lock_r(&m->hashlock); - printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions (own and foreign) running on rtpengine: %i\n", g_hash_table_size(m->callhash)); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&m->stats.foreign_sessions)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: "UINT64F"\n", atomic64_get(&m->stats.foreign_sessions)); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions total: %i\n", g_hash_table_size(m->callhash)); ADJUSTLEN(printlen,outbufend,replybuffer); rwlock_unlock_r(&m->hashlock); } else if (len>=strlen(LIST_SESSIONS) && strncmp(buffer,LIST_SESSIONS,strlen(LIST_SESSIONS)) == 0) { @@ -365,7 +365,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* while (g_hash_table_iter_next (&iter, &key, &value)) { ptrkey = (str*)key; call = (struct call*)value; - printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_call_responsible?"no":"yes"); + printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_foreign_call?"yes":"no"); ADJUSTLEN(printlen,outbufend,replybuffer); } rwlock_unlock_r(&m->hashlock); @@ -474,8 +474,8 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m, static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { int printlen=0; - - unsigned int keyspace_db; + int *pint; + int keyspace_db; str str_keyspace_db; if (len<=1) { @@ -489,19 +489,32 @@ static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char str_keyspace_db.len = len; keyspace_db = str_to_i(&str_keyspace_db, -1); - redis_notify_subscribe_keyspace(m,keyspace_db); - - printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db); + if (keyspace_db != -1) { + redis_notify_subscribe_keyspace(m,keyspace_db); + if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, &keyspace_db, uint_cmp)) { + pint = (int*)malloc(sizeof(int)); + *pint = keyspace_db; + g_queue_push_tail(m->conf.redis_subscribed_keyspaces, pint); + } + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db); + } + else { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Could not add keyspace %i to redis notifications.\n", keyspace_db); + } ADJUSTLEN(printlen,outbufend,replybuffer); } static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { - int printlen=0; - - unsigned int keyspace_db; + int printlen = 0; + struct call* c = NULL; + GHashTableIter iter; + gpointer key, value; + struct call_monologue *ml = NULL; + GList *l, *i; + int keyspace_db; str str_keyspace_db; - if (len<=1) { + if (len <= 1) { printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required."); ADJUSTLEN(printlen,outbufend,replybuffer); return; @@ -512,9 +525,49 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* str_keyspace_db.len = len; keyspace_db = str_to_i(&str_keyspace_db, -1); - redis_notify_unsubscribe_keyspace(m,keyspace_db); + if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, &keyspace_db, uint_cmp))) { + // remove this keyspace + redis_notify_unsubscribe_keyspace(m,keyspace_db); + g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully unsubscribed from keyspace %i.\n", keyspace_db); + + // remove all current foreign calls for this keyspace + g_hash_table_iter_init(&iter, m->callhash); + while (g_hash_table_iter_next(&iter, &key, &value)) { + c = (struct call*)value; + if (!c || !c->redis_foreign_call || !(c->redis_hosted_db == keyspace_db)) { + continue; + } + if (!c->ml_deleted) { + for (i = c->monologues.head; i; i = i->next) { + ml = i->data; + gettimeofday(&(ml->terminated), NULL); + ml->term_reason = FORCED; + } + } + call_destroy(c); + g_hash_table_iter_init(&iter, m->callhash); + } + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed all foreign calls for keyspace %i.\n", keyspace_db); + } else { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %i was not among redis notifications.\n", keyspace_db); + } + ADJUSTLEN(printlen,outbufend,replybuffer); +} + +static void cli_incoming_kslist(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { + int printlen=0; + GList *l; + + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nSubscribed-on keyspaces:\n"); + ADJUSTLEN(printlen,outbufend,replybuffer); + + for (l = m->conf.redis_subscribed_keyspaces->head; l; l = l->next) { + printlen = snprintf(replybuffer,(outbufend-replybuffer), "%d ", *((unsigned int *)(l->data))); + ADJUSTLEN(printlen,outbufend,replybuffer); + } - printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed keyspace %i to redis notifications.\n", keyspace_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "\n"); ADJUSTLEN(printlen,outbufend,replybuffer); } @@ -567,6 +620,7 @@ next: static const char* SET = "set"; static const char* KSADD = "ksadd"; static const char* KSRM = "ksrm"; + static const char* KSLIST = "kslist"; if (strncmp(inbuf,LIST,strlen(LIST)) == 0) { cli_incoming_list(inbuf+strlen(LIST), inlen-strlen(LIST), cli->callmaster, outbuf, outbufend); @@ -578,6 +632,8 @@ next: cli_incoming_ksadd(inbuf+strlen(KSADD), inlen-strlen(KSADD), cli->callmaster, outbuf, outbufend); } else if (strncmp(inbuf,KSRM,strlen(KSRM)) == 0) { cli_incoming_ksrm(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend); + } else if (strncmp(inbuf,KSLIST,strlen(KSLIST)) == 0) { + cli_incoming_kslist(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend); } else { sprintf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", inbuf); } diff --git a/daemon/graphite.c b/daemon/graphite.c index 66bf28add..3dd65d2ab 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -113,8 +113,9 @@ int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) { mutex_lock(&cm->totalstats_interval.managed_sess_lock); ts->managed_sess_max = cm->totalstats_interval.managed_sess_max; ts->managed_sess_min = cm->totalstats_interval.managed_sess_min; - cm->totalstats_interval.managed_sess_max = cm->totalstats.managed_sess_crt; - cm->totalstats_interval.managed_sess_min = cm->totalstats.managed_sess_crt; + + cm->totalstats_interval.managed_sess_max = g_hash_table_size(cm->callhash) - atomic64_get(&cm->stats.foreign_sessions); + cm->totalstats_interval.managed_sess_min = g_hash_table_size(cm->callhash) - atomic64_get(&cm->stats.foreign_sessions); mutex_unlock(&cm->totalstats_interval.managed_sess_lock); rwlock_unlock_r(&cm->hashlock); diff --git a/daemon/main.c b/daemon/main.c index 1b85df939..3fd86589f 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -54,6 +54,7 @@ static mutex_t *openssl_locks; static char *pidfile; static gboolean foreground; static GQueue interfaces = G_QUEUE_INIT; +static GQueue keyspaces = G_QUEUE_INIT; endpoint_t tcp_listen_ep; endpoint_t udp_listen_ep; endpoint_t ng_listen_ep; @@ -254,6 +255,9 @@ static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth static void options(int *argc, char ***argv) { char **if_a = NULL; + char **ks_a = NULL; + int *pint; + str str_keyspace_db; char **iter; struct intf_config *ifa; char *listenps = NULL; @@ -266,7 +270,7 @@ static void options(int *argc, char ***argv) { char *redisps_write = NULL; char *log_facility_s = NULL; char *log_facility_cdr_s = NULL; - char *log_facility_rtcp_s = NULL; + char *log_facility_rtcp_s = NULL; int version = 0; int sip_source = 0; @@ -275,6 +279,7 @@ static void options(int *argc, char ***argv) { { "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" }, { "no-fallback",'F', 0, G_OPTION_ARG_NONE, &no_fallback, "Only start when kernel module is available", NULL }, { "interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY,&if_a, "Local interface for RTP", "[NAME/]IP[!IP]"}, + { "subscribe-keyspace", 'k', 0, G_OPTION_ARG_STRING_ARRAY,&ks_a, "Subscription keyspace list", "INT INT ..."}, { "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, @@ -331,6 +336,18 @@ static void options(int *argc, char ***argv) { g_queue_push_tail(&interfaces, ifa); } + if (ks_a) { + for (iter = ks_a; *iter; iter++) { + pint = (int *)malloc(sizeof(int)); + str_keyspace_db.s = *iter; + str_keyspace_db.len = strlen(*iter); + *pint = str_to_i(&str_keyspace_db, -1); + + if (*pint != -1) + g_queue_push_tail(&keyspaces, pint); + } + } + if (listenps) { if (endpoint_parse_any(&tcp_listen_ep, listenps)) die("Invalid IP or port (--listen-tcp)"); @@ -542,6 +559,8 @@ no_kernel: mc.fmt = xmlrpc_fmt; mc.graphite_ep = graphite_ep; mc.graphite_interval = graphite_interval; + mc.redis_subscribed_keyspaces = g_queue_copy(&keyspaces); + if (redis_num_threads < 1) { #ifdef _SC_NPROCESSORS_ONLN redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN ); @@ -591,7 +610,8 @@ no_kernel: } if (!is_addr_unspecified(&redis_ep.address)) { - mc.redis = mc.redis_read_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + mc.redis = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + mc.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); if (!mc.redis) die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED paramter.", endpoint_print_buf(&redis_ep)); @@ -642,7 +662,7 @@ int main(int argc, char **argv) { thread_create_detach(poller_timer_loop, ctx.p); if (!is_addr_unspecified(&redis_ep.address)) - thread_create_detach(redis_notify, ctx.m); + thread_create_detach(redis_notify_loop, ctx.m); if (!is_addr_unspecified(&graphite_ep.address)) thread_create_detach(graphite_loop, ctx.m); diff --git a/daemon/redis.c b/daemon/redis.c index d8f09ad63..99a1065b3 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -234,6 +234,7 @@ int str_cut(char *str, int begin, int len) { } static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id); +static int redis_check_conn(struct redis *r); void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { @@ -244,14 +245,13 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { char db_str[16]; memset(&db_str, 0, 8); char *pdbstr = db_str; char *p = 0; - int dbno; if (!(cm->conf.redis)) { rlog(LOG_ERROR, "A redis notification has been there but role was not 'master' or 'read'"); return; } - r = cm->conf.redis_read_notify; + r = cm->conf.redis_notify; mutex_lock(&r->lock); @@ -285,7 +285,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } } - dbno = r->db = atoi(db_str); + r->db = atoi(db_str); // select the right db for restoring the call if (redisCommandNR(r->ctx, "SELECT %i", r->db)) { @@ -305,7 +305,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { c = g_hash_table_lookup(cm->callhash, &callid); - if (c && c->redis_call_responsible) { + if (c && !c->redis_foreign_call) { rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications."); goto err; } @@ -322,13 +322,13 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { // we lookup again to retrieve the call to insert the kayspace db id c = g_hash_table_lookup(cm->callhash, &callid); if (c) { - c->redis_hosted_db = dbno; + c->redis_foreign_call = 1; + c->is_backup_call = 1; } } if (strncmp(rr->element[3]->str,"del",3)==0) { call_destroy(c); - atomic64_dec(&cm->stats.foreign_sessions); } err: @@ -358,13 +358,15 @@ void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) { redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); } -void redis_notify(void *d) { - struct callmaster *cm = d; +static void redis_notify(struct callmaster *cm) { struct redis *r = 0; - if (cm->conf.redis) { - r = cm->conf.redis; + GList *l; + + if (cm->conf.redis_notify) { + r = cm->conf.redis_notify; + rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint)); } else { - rlog(LOG_INFO, "I do not subscribe to redis notifications since no REDIS is configured"); + rlog(LOG_INFO, "Don't use Redis notifications. See --redis-notifications parameter."); return; } @@ -372,17 +374,59 @@ void redis_notify(void *d) { cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr); + rlog(LOG_ERROR, "Redis notification error: %s\n", cm->conf.redis_notify_async_context->errstr); return; } redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); - redis_notify_subscribe_keyspace(cm,r->db); + /* Subscribing to the values in the configured keyspaces */ + for (l = cm->conf.redis_subscribed_keyspaces->head; l; l = l->next) { + redis_notify_subscribe_keyspace(cm,*(int *)(l->data)); + } + event_base_dispatch(cm->conf.redis_notify_event_base); } +void redis_notify_loop(void *d) { + int seconds = 1; + time_t next_run = g_now.tv_sec; + struct callmaster *cm = (struct callmaster *)d; + struct redis *r; + + // sanity checks + if (!cm) { + ilog(LOG_ERROR, "NULL callmaster"); + return ; + } + + r = cm->conf.redis_notify; + if (!r) { + return ; + } + + // initial redis_notify + if (redis_check_conn(r) == REDIS_STATE_CONNECTED) { + redis_notify(cm); + } + + // loop redis_notify => in case of lost connection + while (!g_shutdown) { + gettimeofday(&g_now, NULL); + if (g_now.tv_sec < next_run) { + usleep(100000); + continue; + } + + next_run = g_now.tv_sec + seconds; + + if (redis_check_conn(r) == REDIS_STATE_RECONNECTED) { + redis_notify(cm); + } + } +} + struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) { struct redis *r; @@ -434,8 +478,7 @@ static int redis_check_conn(struct redis *r) { // try redis connection if (redisCommandNR(r->ctx, "PING") == 0) { // redis is connected - // redis_check_conn() executed well - return 0; + return REDIS_STATE_CONNECTED; } // redis is disconnected @@ -445,11 +488,10 @@ static int redis_check_conn(struct redis *r) { r->state = REDIS_STATE_DISCONNECTED; } - // try redis reconnect -> will free current r->ctx + // try redis reconnect => will free current r->ctx if (redis_connect(r, 1)) { // redis is disconnected - // redis_check_conn() executed well - return 0; + return REDIS_STATE_DISCONNECTED; } // redis is connected @@ -459,8 +501,8 @@ static int redis_check_conn(struct redis *r) { r->state = REDIS_STATE_CONNECTED; } - // redis_check_conn() executed well - return 0; + // redis is re-connected + return REDIS_STATE_RECONNECTED; } @@ -1172,6 +1214,10 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi if (!redis_hash_get_str(&s, &call, "created_from_addr")) sockaddr_parse_any_str(&c->created_from_addr, &s); + err = "missing 'redis_hosted_db' value"; + if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) + goto err6; + err = "failed to create sfds"; if (redis_sfds(c, &sfds)) goto err6; @@ -1205,8 +1251,6 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi goto err6; err = NULL; - c->redis_hosted_db = r->db; - c->redis_call_responsible = 1; obj_put(c); err6: @@ -1276,8 +1320,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { rlog(LOG_DEBUG, "Restoring calls from Redis..."); mutex_lock(&r->lock); - redis_check_conn(r); - if (r->state == REDIS_STATE_DISCONNECTED) { + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { mutex_unlock(&r->lock); ret = 0; goto err; @@ -1422,14 +1465,14 @@ void redis_update(struct call *c, struct redis *r) { return; mutex_lock(&r->lock); - redis_check_conn(r); - if (r->state == REDIS_STATE_DISCONNECTED) { + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { mutex_unlock(&r->lock); return ; } rwlock_lock_r(&c->master_lock); + c->redis_hosted_db = r->db; if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); goto err; @@ -1441,14 +1484,15 @@ void redis_update(struct call *c, struct redis *r) { redis_pipe(r, "HMSET call-"PB" created %llu last_signal %llu tos %i deleted %llu " "num_sfds %u num_streams %u num_medias %u num_tags %u " "num_maps %u " - "ml_deleted %llu created_from %s created_from_addr %s", + "ml_deleted %llu created_from %s created_from_addr %s redis_hosted_db %u", STR(&c->callid), (long long unsigned) c->created, (long long unsigned) c->last_signal, (int) c->tos, (long long unsigned) c->deleted, g_queue_get_length(&c->stream_fds), g_queue_get_length(&c->streams), g_queue_get_length(&c->medias), g_queue_get_length(&c->monologues), g_queue_get_length(&c->endpoint_maps), (long long unsigned) c->ml_deleted, - c->created_from, sockaddr_print_buf(&c->created_from_addr)); + c->created_from, sockaddr_print_buf(&c->created_from_addr), + c->redis_hosted_db); /* XXX DTLS cert?? */ redis_pipe(r, "DEL sfd-"PB"-0", STR(&c->callid)); @@ -1667,9 +1711,6 @@ void redis_update(struct call *c, struct redis *r) { redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid)); redis_pipe(r, "SADD calls "PB"", STR(&c->callid)); redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); - if (!c->redis_hosted_db) - c->redis_hosted_db = r->db; - c->redis_call_responsible = 1; redis_consume(r); @@ -1693,8 +1734,7 @@ void redis_delete(struct call *c, struct redis *r) { return; mutex_lock(&r->lock); - redis_check_conn(r); - if (r->state == REDIS_STATE_DISCONNECTED) { + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { mutex_unlock(&r->lock); return ; } @@ -1728,8 +1768,7 @@ void redis_wipe(struct redis *r) { return; mutex_lock(&r->lock); - redis_check_conn(r); - if (r->state == REDIS_STATE_DISCONNECTED) { + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { mutex_unlock(&r->lock); return ; } diff --git a/daemon/redis.h b/daemon/redis.h index 3adce6197..9d3553c0d 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -25,8 +25,9 @@ enum redis_role { }; enum redis_state { - REDIS_STATE_DISCONNECTED = 0, - REDIS_STATE_CONNECTED = 1, + REDIS_STATE_DISCONNECTED = 0, // DISCONNECTED -> DISCONNECTED + REDIS_STATE_CONNECTED, // CONNECTED -> CONNECTED + REDIS_STATE_RECONNECTED, // DISCONNECTED -> CONNECTED }; struct callmaster; @@ -89,7 +90,7 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) #define REDIS_FMT(x) (x)->len, (x)->str -void redis_notify(void *d); +void redis_notify_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 5bf1e891f..688dc4b36 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -48,6 +48,13 @@ if [ ! -z "$INTERFACES" ]; then OPTIONS="$OPTIONS --interface=$interface" done fi + + +if [ ! -z "$SUBSCRIBE_KEYSPACES" ]; then + for ks in $SUBSCRIBE_KEYSPACES; do + OPTIONS="$OPTIONS --subscribe-keyspace=$ks" + done +fi [ -z "$ADDRESS" ] || OPTIONS="$OPTIONS --interface=$ADDRESS" [ -z "$ADV_ADDRESS" ] || OPTIONS="$OPTIONS!$ADV_ADDRESS" [ -z "$ADDRESS_IPV6" ] || OPTIONS="$OPTIONS --interface=$ADDRESS_IPV6" diff --git a/utils/rtpengine-ctl b/utils/rtpengine-ctl index 3959bc3e7..a9a21a74b 100755 --- a/utils/rtpengine-ctl +++ b/utils/rtpengine-ctl @@ -78,6 +78,15 @@ sub showusage { print " maxsessions : set the max nr of allowed sessions\n"; print " maxopenfiles : set the max nr of allowed open files\n"; print "\n"; + print " ksadd [ keyspace ]\n"; + print " keyspace : subscribe to 'keyspace' database\n"; + print "\n"; + print " ksrm [ keyspace ]\n"; + print " keyspace : unsubscribe to 'keyspace' database\n"; + print " : remove all foreign calls for that 'keyspace'\n"; + print "\n"; + print " kslist : print all currently subscribed keyspaces\n"; + print "\n"; print "\n"; print " Return Value:\n"; print " 0 on success with ouput from server side, other values for failure.\n";