|
|
|
@ -5,6 +5,7 @@ |
|
|
|
#include <unistd.h> |
|
|
|
#include <glib.h> |
|
|
|
#include <stdarg.h> |
|
|
|
#include <ctype.h> |
|
|
|
|
|
|
|
#include <glib.h> |
|
|
|
#include "redis.h" |
|
|
|
@ -75,15 +76,22 @@ static redisReply *redis_get(struct redis *r, int type, const char *fmt, ...) { |
|
|
|
static int redisCommandNR(redisContext *r, const char *fmt, ...) { |
|
|
|
va_list ap; |
|
|
|
redisReply *ret; |
|
|
|
int i = 0; |
|
|
|
|
|
|
|
va_start(ap, fmt); |
|
|
|
ret = redisvCommand(r, fmt, ap); |
|
|
|
va_end(ap); |
|
|
|
|
|
|
|
if (ret) |
|
|
|
freeReplyObject(ret); |
|
|
|
if (!ret) |
|
|
|
return -1; |
|
|
|
|
|
|
|
if (ret->type == REDIS_REPLY_ERROR) { |
|
|
|
i = -1; |
|
|
|
ilog(LOG_WARNING, "Redis returned error to command '%s': %s", fmt, ret->str); |
|
|
|
} |
|
|
|
|
|
|
|
return ret ? 0 : -1; |
|
|
|
freeReplyObject(ret); |
|
|
|
return i; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -123,7 +131,7 @@ static void redis_consume(struct redis *r) { |
|
|
|
|
|
|
|
|
|
|
|
/* called with r->lock held if necessary */ |
|
|
|
static int redis_connect(struct redis *r, int wait, int role) { |
|
|
|
static int redis_connect(struct redis *r, int wait) { |
|
|
|
struct timeval tv; |
|
|
|
redisReply *rp; |
|
|
|
char *s; |
|
|
|
@ -141,8 +149,14 @@ static int redis_connect(struct redis *r, int wait, int role) { |
|
|
|
if (r->ctx->err) |
|
|
|
goto err2; |
|
|
|
|
|
|
|
if (redisCommandNR(r->ctx, "PING")) |
|
|
|
goto err2; |
|
|
|
if (r->auth) { |
|
|
|
if (redisCommandNR(r->ctx, "AUTH %s", r->auth)) |
|
|
|
goto err2; |
|
|
|
} |
|
|
|
else { |
|
|
|
if (redisCommandNR(r->ctx, "PING")) |
|
|
|
goto err2; |
|
|
|
} |
|
|
|
|
|
|
|
if (redisCommandNR(r->ctx, "SELECT %i", r->db)) |
|
|
|
goto err2; |
|
|
|
@ -160,19 +174,23 @@ static int redis_connect(struct redis *r, int wait, int role) { |
|
|
|
} |
|
|
|
|
|
|
|
if (!memcmp(s, "role:master", 9)) { |
|
|
|
if (role == MASTER_REDIS_ROLE || role == ANY_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis in master mode"); |
|
|
|
if (r->role == MASTER_REDIS_ROLE || r->role == ANY_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis %s in master mode", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
goto done; |
|
|
|
} else if (role == SLAVE_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis in master mode, but wanted mode is slave; retrying..."); |
|
|
|
} else if (r->role == SLAVE_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis %s in master mode, but wanted mode is slave; retrying...", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
goto next; |
|
|
|
} |
|
|
|
} else if (!memcmp(s, "role:slave", 8)) { |
|
|
|
if (role == SLAVE_REDIS_ROLE || role == ANY_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis in slave mode"); |
|
|
|
if (r->role == SLAVE_REDIS_ROLE || r->role == ANY_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis %s in slave mode", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
goto done; |
|
|
|
} else if (role == MASTER_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis in slave mode, but wanted mode is master; retrying..."); |
|
|
|
} else if (r->role == MASTER_REDIS_ROLE) { |
|
|
|
ilog(LOG_INFO, "Connected to Redis %s in slave mode, but wanted mode is master; retrying...", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
goto next; |
|
|
|
} |
|
|
|
} else { |
|
|
|
@ -194,23 +212,25 @@ done: |
|
|
|
err3: |
|
|
|
freeReplyObject(rp); |
|
|
|
err2: |
|
|
|
if (r->ctx->err) |
|
|
|
rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); |
|
|
|
redisFree(r->ctx); |
|
|
|
r->ctx = NULL; |
|
|
|
if (r->ctx->err) { |
|
|
|
rlog(LOG_ERR, "Failed to connect to Redis %s, error: %s", |
|
|
|
endpoint_print_buf(&r->endpoint), r->ctx->errstr); |
|
|
|
return -1; |
|
|
|
} |
|
|
|
err: |
|
|
|
rlog(LOG_ERR, "Failed to connect to master Redis database"); |
|
|
|
rlog(LOG_ERR, "Failed to connect to Redis %s", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
return -1; |
|
|
|
} |
|
|
|
|
|
|
|
int str_cut(char *str, int begin, int len) { |
|
|
|
int l = strlen(str); |
|
|
|
int l = strlen(str); |
|
|
|
|
|
|
|
if (len < 0) len = l - begin; |
|
|
|
if (begin + len > l) len = l - begin; |
|
|
|
memmove(str + begin, str + begin + len, l - len + 1); |
|
|
|
if (len < 0) len = l - begin; |
|
|
|
if (begin + len > l) len = l - begin; |
|
|
|
memmove(str + begin, str + begin + len, l - len + 1); |
|
|
|
|
|
|
|
return len; |
|
|
|
return len; |
|
|
|
} |
|
|
|
|
|
|
|
static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id); |
|
|
|
@ -222,11 +242,11 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { |
|
|
|
struct call* c; |
|
|
|
str callid; |
|
|
|
char db_str[16]; memset(&db_str, 0, 8); |
|
|
|
char* pdbstr = db_str; |
|
|
|
unsigned char* p = 0; |
|
|
|
char *pdbstr = db_str; |
|
|
|
char *p = 0; |
|
|
|
int dbno; |
|
|
|
|
|
|
|
if (!(cm->conf.redis_read) && !(cm->conf.redis)) { |
|
|
|
if (!(cm->conf.redis)) { |
|
|
|
rlog(LOG_ERROR, "A redis notification has been there but role was not 'master' or 'read'"); |
|
|
|
return; |
|
|
|
} |
|
|
|
@ -317,51 +337,53 @@ err: |
|
|
|
|
|
|
|
void redis_notify_event_base_loopbreak(struct callmaster *cm) { |
|
|
|
event_base_loopbreak(cm->conf.redis_notify_event_base); |
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); |
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); |
|
|
|
} |
|
|
|
|
|
|
|
void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) { |
|
|
|
char* main_db_str[256]; memset(&main_db_str,0,256); |
|
|
|
sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace); |
|
|
|
char main_db_str[256]; |
|
|
|
|
|
|
|
memset(&main_db_str, 0, 256); |
|
|
|
sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace); |
|
|
|
|
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); |
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); |
|
|
|
} |
|
|
|
|
|
|
|
void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) { |
|
|
|
char* main_db_str[256]; memset(&main_db_str,0,256); |
|
|
|
sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace); |
|
|
|
char main_db_str[256]; |
|
|
|
|
|
|
|
memset(&main_db_str, 0, 256); |
|
|
|
sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace); |
|
|
|
|
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); |
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); |
|
|
|
} |
|
|
|
|
|
|
|
void redis_notify(void *d) { |
|
|
|
struct callmaster *cm = d; |
|
|
|
struct redis *r = 0; |
|
|
|
if (cm->conf.redis_read) { |
|
|
|
r = cm->conf.redis_read; |
|
|
|
} else if (cm->conf.redis) { |
|
|
|
if (cm->conf.redis) { |
|
|
|
r = cm->conf.redis; |
|
|
|
} else { |
|
|
|
rlog(LOG_INFO, "I do not subscribe to redis notifications since redis role is not 'master' or 'read'"); |
|
|
|
rlog(LOG_INFO, "I do not subscribe to redis notifications since no REDIS is configured"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
cm->conf.redis_notify_event_base = event_base_new(); |
|
|
|
cm->conf.redis_notify_event_base = event_base_new(); |
|
|
|
|
|
|
|
cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); |
|
|
|
if (cm->conf.redis_notify_async_context->err) { |
|
|
|
rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr); |
|
|
|
return; |
|
|
|
} |
|
|
|
cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); |
|
|
|
if (cm->conf.redis_notify_async_context->err) { |
|
|
|
rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); |
|
|
|
redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); |
|
|
|
|
|
|
|
redis_notify_subscribe_keyspace(cm,r->db); |
|
|
|
event_base_dispatch(cm->conf.redis_notify_event_base); |
|
|
|
redis_notify_subscribe_keyspace(cm,r->db); |
|
|
|
event_base_dispatch(cm->conf.redis_notify_event_base); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
struct redis *redis_new(const endpoint_t *ep, int db, int role) { |
|
|
|
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) { |
|
|
|
struct redis *r; |
|
|
|
|
|
|
|
r = g_slice_alloc0(sizeof(*r)); |
|
|
|
@ -369,11 +391,25 @@ struct redis *redis_new(const endpoint_t *ep, int db, int role) { |
|
|
|
r->endpoint = *ep; |
|
|
|
sockaddr_print(&ep->address, r->host, sizeof(r->host)); |
|
|
|
r->db = db; |
|
|
|
r->auth = auth; |
|
|
|
r->role = role; |
|
|
|
r->state = REDIS_STATE_DISCONNECTED; |
|
|
|
r->no_redis_required = no_redis_required; |
|
|
|
mutex_init(&r->lock); |
|
|
|
|
|
|
|
if (redis_connect(r, 10, role)) |
|
|
|
if (redis_connect(r, 10)) { |
|
|
|
if (r->no_redis_required) { |
|
|
|
rlog(LOG_WARN, "Starting with no initial connection to Redis %s !", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
return r; |
|
|
|
} |
|
|
|
goto err; |
|
|
|
} |
|
|
|
|
|
|
|
// redis is connected |
|
|
|
rlog(LOG_INFO, "Established initial connection to Redis %s", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
r->state = REDIS_STATE_CONNECTED; |
|
|
|
return r; |
|
|
|
|
|
|
|
err: |
|
|
|
@ -393,13 +429,38 @@ static void redis_close(struct redis *r) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* called with r->lock held if necessary */ |
|
|
|
static void redis_check_conn(struct redis *r, int role) { |
|
|
|
if (redisCommandNR(r->ctx, "PING") == 0) |
|
|
|
return; |
|
|
|
rlog(LOG_INFO, "Lost connection to Redis"); |
|
|
|
if (redis_connect(r, 1, role)) |
|
|
|
abort(); |
|
|
|
/* must be called with r->lock held */ |
|
|
|
static int redis_check_conn(struct redis *r) { |
|
|
|
// try redis connection |
|
|
|
if (redisCommandNR(r->ctx, "PING") == 0) { |
|
|
|
// redis is connected |
|
|
|
// redis_check_conn() executed well |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
// redis is disconnected |
|
|
|
if (r->state == REDIS_STATE_CONNECTED) { |
|
|
|
rlog(LOG_ERR, "Lost connection to Redis %s", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
r->state = REDIS_STATE_DISCONNECTED; |
|
|
|
} |
|
|
|
|
|
|
|
// try redis reconnect -> will free current r->ctx |
|
|
|
if (redis_connect(r, 1)) { |
|
|
|
// redis is disconnected |
|
|
|
// redis_check_conn() executed well |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
// redis is connected |
|
|
|
if (r->state == REDIS_STATE_DISCONNECTED) { |
|
|
|
rlog(LOG_INFO, "RE-Established connection to Redis %s", |
|
|
|
endpoint_print_buf(&r->endpoint)); |
|
|
|
r->state = REDIS_STATE_CONNECTED; |
|
|
|
} |
|
|
|
|
|
|
|
// redis_check_conn() executed well |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -1182,7 +1243,6 @@ struct thread_ctx { |
|
|
|
GQueue r_q; |
|
|
|
mutex_t r_m; |
|
|
|
}; |
|
|
|
#define RESTORE_NUM_THREADS 4 |
|
|
|
|
|
|
|
static void restore_thread(void *call_p, void *ctx_p) { |
|
|
|
struct thread_ctx *ctx = ctx_p; |
|
|
|
@ -1202,7 +1262,7 @@ static void restore_thread(void *call_p, void *ctx_p) { |
|
|
|
mutex_unlock(&ctx->r_m); |
|
|
|
} |
|
|
|
|
|
|
|
int redis_restore(struct callmaster *m, struct redis *r, int role) { |
|
|
|
int redis_restore(struct callmaster *m, struct redis *r) { |
|
|
|
redisReply *calls, *call; |
|
|
|
int i, ret = -1; |
|
|
|
GThreadPool *gtp; |
|
|
|
@ -1214,7 +1274,15 @@ int redis_restore(struct callmaster *m, struct redis *r, int role) { |
|
|
|
log_level |= LOG_FLAG_RESTORE; |
|
|
|
|
|
|
|
rlog(LOG_DEBUG, "Restoring calls from Redis..."); |
|
|
|
redis_check_conn(r, role); |
|
|
|
|
|
|
|
mutex_lock(&r->lock); |
|
|
|
redis_check_conn(r); |
|
|
|
if (r->state == REDIS_STATE_DISCONNECTED) { |
|
|
|
mutex_unlock(&r->lock); |
|
|
|
ret = 0; |
|
|
|
goto err; |
|
|
|
} |
|
|
|
mutex_unlock(&r->lock); |
|
|
|
|
|
|
|
calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); |
|
|
|
|
|
|
|
@ -1226,9 +1294,9 @@ int redis_restore(struct callmaster *m, struct redis *r, int role) { |
|
|
|
ctx.m = m; |
|
|
|
mutex_init(&ctx.r_m); |
|
|
|
g_queue_init(&ctx.r_q); |
|
|
|
for (i = 0; i < RESTORE_NUM_THREADS; i++) |
|
|
|
g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, role)); |
|
|
|
gtp = g_thread_pool_new(restore_thread, &ctx, RESTORE_NUM_THREADS, TRUE, NULL); |
|
|
|
for (i = 0; i < m->conf.redis_num_threads; i++) |
|
|
|
g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required)); |
|
|
|
gtp = g_thread_pool_new(restore_thread, &ctx, m->conf.redis_num_threads, TRUE, NULL); |
|
|
|
|
|
|
|
for (i = 0; i < calls->elements; i++) { |
|
|
|
call = calls->element[i]; |
|
|
|
@ -1339,8 +1407,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, con |
|
|
|
*/ |
|
|
|
|
|
|
|
/* must be called lock-free */ |
|
|
|
|
|
|
|
void redis_update(struct call *c, struct redis *r, int role, enum call_opmode opmode) { |
|
|
|
void redis_update(struct call *c, struct redis *r) { |
|
|
|
GList *l, *n, *k, *m; |
|
|
|
struct call_monologue *ml, *ml2; |
|
|
|
|
|
|
|
@ -1355,7 +1422,11 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op |
|
|
|
return; |
|
|
|
|
|
|
|
mutex_lock(&r->lock); |
|
|
|
redis_check_conn(r, role); |
|
|
|
redis_check_conn(r); |
|
|
|
if (r->state == REDIS_STATE_DISCONNECTED) { |
|
|
|
mutex_unlock(&r->lock); |
|
|
|
return ; |
|
|
|
} |
|
|
|
|
|
|
|
rwlock_lock_r(&c->master_lock); |
|
|
|
|
|
|
|
@ -1617,12 +1688,16 @@ err: |
|
|
|
} |
|
|
|
|
|
|
|
/* must be called lock-free */ |
|
|
|
void redis_delete(struct call *c, struct redis *r, int role) { |
|
|
|
void redis_delete(struct call *c, struct redis *r) { |
|
|
|
if (!r) |
|
|
|
return; |
|
|
|
|
|
|
|
mutex_lock(&r->lock); |
|
|
|
redis_check_conn(r, role); |
|
|
|
redis_check_conn(r); |
|
|
|
if (r->state == REDIS_STATE_DISCONNECTED) { |
|
|
|
mutex_unlock(&r->lock); |
|
|
|
return ; |
|
|
|
} |
|
|
|
rwlock_lock_r(&c->master_lock); |
|
|
|
|
|
|
|
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) |
|
|
|
@ -1648,12 +1723,16 @@ err: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void redis_wipe(struct redis *r, int role) { |
|
|
|
void redis_wipe(struct redis *r) { |
|
|
|
if (!r) |
|
|
|
return; |
|
|
|
|
|
|
|
mutex_lock(&r->lock); |
|
|
|
redis_check_conn(r, role); |
|
|
|
redis_check_conn(r); |
|
|
|
if (r->state == REDIS_STATE_DISCONNECTED) { |
|
|
|
mutex_unlock(&r->lock); |
|
|
|
return ; |
|
|
|
} |
|
|
|
redisCommandNR(r->ctx, "DEL calls"); |
|
|
|
mutex_unlock(&r->lock); |
|
|
|
} |