From fd3e2342c1a947757281fbddf88c1a2e118f1e99 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Sun, 31 Jan 2016 11:25:58 +0100 Subject: [PATCH] Implemented redis notification according to RTPENGINE-64 Thoughts on that topic so far: There's one thing to keep also in mind. What do we do if the call changes (streams) and the backup node is notified ? Currently we only know (by subscribing to the 'notifier-' prefix that in fact it has changed, but we don't know what has changed in detail. Subscribing to everything would lead to the problem that we have to take care about synchronising the the new streams with the old ones. Without having a look at the code that might be a lot of effort and ... I guess that's why richard likes to ... have clean states of the calls. Synchronizing is always a mess. Easier to delete and setup new. I thought about the following solution for makes things more abstract and easier to understand: 1. Whenever that call on a backup node (foreign call) has seen a packet (also timers are started then), we do not process any notification by a redis notification for this call (caus' the RTP-IP has already been switched). More precise and abstract that means: If a node has taken over the responsibility for that call (by having seen a packet), it assumes that notifications from the former node to be dropped. The call will be deleted when either the timer fires or (for other companies) the control channel address has also been switched and via that channel the call is deleted. 2. If a call has not seen a packet (inactive call on the backup node, seen as not responsible for that call but could become so), we accept all commands for that call on the backup node in the same way as on the original node. That means also deletions in-between and so on. I mean if the original node does it, why not accepting to do the same way on the backup node ? --- daemon/call.c | 20 +++++++++++++++----- daemon/call.h | 3 +++ daemon/redis.c | 10 ++++++++-- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 9e53dee99..8b6e3d914 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -259,6 +259,11 @@ next: } } + if (!(c->redis_call_responsible)) { + ilog(LOG_INFO, "Timeout did not lead to close the call since I am not responisble"); + goto out; + } + ilog(LOG_INFO, "Closing call due to timeout"); drop: @@ -531,6 +536,10 @@ static void callmaster_timer(void *ptr) { ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); + if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) { + ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets."); + ps->call->redis_call_responsible = 1; + } } update = 0; @@ -1832,12 +1841,13 @@ void call_destroy(struct call *c) { obj_put(c); - if (m->conf.redis_write) { - redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE); - } else if (m->conf.redis) { - redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE); + if (c->redis_call_responsible) { + if (m->conf.redis_write) { + redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE); + } } - rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ diff --git a/daemon/call.h b/daemon/call.h index bc647dfa3..29324d25c 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -415,6 +415,9 @@ struct call { char *created_from; sockaddr_t created_from_addr; int redis_hosted_db; + // following flag servers as a mark that this + // call is either created by me or where I took over the responsibility (a packet has seen) + int redis_call_responsible; }; struct callmaster_config { diff --git a/daemon/redis.c b/daemon/redis.c index 5ca4774d7..db96bca0f 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -292,6 +292,11 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { c = g_hash_table_lookup(cm->callhash, &callid); + if (c && c->redis_call_responsible) { + rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications."); + return; + } + if (strncmp(rr->element[3]->str,"sadd",4)==0) { if (c) { rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str); @@ -1586,10 +1591,11 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid)); redis_pipe(r, "SADD calls "PB"", STR(&c->callid)); - if (opmode==OP_ANSWER) { +// if (opmode==OP_ANSWER) { redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); - } +// } c->redis_hosted_db = r->db; + c->redis_call_responsible = 1; redis_consume(r); mutex_unlock(&r->lock);