diff --git a/daemon/call.h b/daemon/call.h index 95c9328ac..22c38594e 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -427,6 +427,7 @@ struct callmaster_config { struct redis *redis_read; struct redis *redis_write; struct event_base *redis_notify_event_base; + struct redisAsyncContext *redis_notify_async_context; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/main.c b/daemon/main.c index 39c1cd817..ac23aab5d 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -620,7 +620,7 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(poller_timer_loop, ctx.p); - if (!is_addr_unspecified(&redis_read_ep.address)) + if (!is_addr_unspecified(&redis_read_ep.address) || !is_addr_unspecified(&redis_ep.address)) thread_create_detach(redis_notify, ctx.m); if (!is_addr_unspecified(&graphite_ep.address)) diff --git a/daemon/redis.c b/daemon/redis.c index 3caa31408..9103533c0 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -218,10 +218,19 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { struct callmaster *cm = privdata; - struct redis *r = cm->conf.redis; + struct redis *r = 0; struct call* c; str callid; + if (cm->conf.redis_read) { + r = cm->conf.redis_read; + } else if (cm->conf.redis) { + r = cm->conf.redis; + } else { + rlog(LOG_ERROR, "A redis notification has been there but role was not 'master' or 'read'"); + return; + } + redisReply *rr = (redisReply*)reply; if (reply == NULL || rr->type != REDIS_REPLY_ARRAY) @@ -264,11 +273,8 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { } void redis_notify_event_base_loopbreak(struct callmaster *cm) { - if (cm->conf.redis_notify_event_base) { - event_base_loopbreak(cm->conf.redis_notify_event_base); - free(cm->conf.redis_notify_event_base); - cm->conf.redis_notify_event_base = 0; - } + event_base_loopbreak(cm->conf.redis_notify_event_base); + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); } void redis_notify(void *d) { @@ -279,21 +285,21 @@ void redis_notify(void *d) { } else if (cm->conf.redis) { r = cm->conf.redis; } else { - rlog(LOG_INFO,"Neither redis master nor redis_read configured. redis notification aborted."); + rlog(LOG_INFO, "I do not subscribe to redis notifications since redis role is not 'master' or 'read'"); return; } - cm->conf.redis_notify_event_base = event_base_new(); + cm->conf.redis_notify_event_base = event_base_new(); - redisAsyncContext *c = redisAsyncConnect(r->host, r->endpoint.port); - if (c->err) { - rlog(LOG_ERROR, "error: %s\n", c->errstr); + cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); + if (cm->conf.redis_notify_async_context->err) { + rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr); return; } - redisLibeventAttach(c, cm->conf.redis_notify_event_base); + redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); - redisAsyncCommand(c, onRedisNotification, d, "psubscribe __key*__:notifier-*"); + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, d, "psubscribe __key*__:notifier-*"); event_base_dispatch(cm->conf.redis_notify_event_base); }