diff --git a/daemon/call.c b/daemon/call.c index 9e53dee99..8b6e3d914 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -259,6 +259,11 @@ next: } } + if (!(c->redis_call_responsible)) { + ilog(LOG_INFO, "Timeout did not lead to close the call since I am not responisble"); + goto out; + } + ilog(LOG_INFO, "Closing call due to timeout"); drop: @@ -531,6 +536,10 @@ static void callmaster_timer(void *ptr) { ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); + if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) { + ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets."); + ps->call->redis_call_responsible = 1; + } } update = 0; @@ -1832,12 +1841,13 @@ void call_destroy(struct call *c) { obj_put(c); - if (m->conf.redis_write) { - redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE); - } else if (m->conf.redis) { - redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE); + if (c->redis_call_responsible) { + if (m->conf.redis_write) { + redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE); + } } - rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ diff --git a/daemon/call.h b/daemon/call.h index bc647dfa3..29324d25c 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -415,6 +415,9 @@ struct call { char *created_from; sockaddr_t created_from_addr; int redis_hosted_db; + // following flag servers as a mark that this + // call is either created by me or where I took over the responsibility (a packet has seen) + int redis_call_responsible; }; struct callmaster_config { diff --git a/daemon/redis.c b/daemon/redis.c index 5ca4774d7..db96bca0f 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -292,6 +292,11 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { c = g_hash_table_lookup(cm->callhash, &callid); + if (c && c->redis_call_responsible) { + rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications."); + return; + } + if (strncmp(rr->element[3]->str,"sadd",4)==0) { if (c) { rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str); @@ -1586,10 +1591,11 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid)); redis_pipe(r, "SADD calls "PB"", STR(&c->callid)); - if (opmode==OP_ANSWER) { +// if (opmode==OP_ANSWER) { redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); - } +// } c->redis_hosted_db = r->db; + c->redis_call_responsible = 1; redis_consume(r); mutex_unlock(&r->lock);