diff --git a/README.md b/README.md index 0c67fa84e..ae819b1fc 100644 --- a/README.md +++ b/README.md @@ -169,6 +169,10 @@ option and which are reproduced below: -M, --port-max=INT Highest port to use for RTP -r, --redis=IP:PORT Connect to Redis database -R, --redis-db=INT Which Redis DB to use + -z, --redis-read=IP:PORT Connect to Redis read database + -Z, --redis-read-db=INT Which Redis read DB to use + -w, --redis-write=IP:PORT Connect to Redis write database + -W, --redis-write-db=INT Which Redis write DB to use -b, --b2b-url=STRING XMLRPC URL of B2B UA -L, --log-level=INT Mask log priorities above this level --log-facility=daemon|local0|... Syslog facility to use for logging @@ -181,7 +185,7 @@ option and which are reproduced below: --sip-source Use SIP source address by default --dtls-passive Always prefer DTLS passive role -g, --graphite=[IP46:]PORT TCP address of graphite statistics server - -w, --graphite-interval=INT Graphite data statistics send interval + -G, --graphite-interval=INT Graphite data statistics send interval --graphite-prefix=STRING Graphite prefix for every line --max-sessions=INT Limit the number of maximum concurrent sessions @@ -356,7 +360,7 @@ The options are described in more detail below. Delete the call from memory after the specified delay from memory. Can be set to zero for immediate call deletion. -* -r, --redis, -R, --redis-db, -b, --b2b-url +* -r, --redis, -R, --redis-db, -z, --redis-read, -Z, --redis-read-db, -w, --redis-write, -W, --redis-write-db, -b, --b2b-url NGCP-specific options diff --git a/daemon/call.c b/daemon/call.c index d81892f84..071c8b376 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1033,8 +1033,13 @@ got_dst: out: ca = sfd->call ? : NULL; - if (ca && update) - redis_update(ca, sfd->call->callmaster->conf.redis); + if (ca && update) { + if (sfd->call->callmaster->conf.redis_write) { + redis_update(ca, sfd->call->callmaster->conf.redis_write, ANY_REDIS_ROLE); + } else if (sfd->call->callmaster->conf.redis) { + redis_update(ca, sfd->call->callmaster->conf.redis, MASTER_REDIS_ROLE); + } + } done: log_info_clear(); } @@ -1474,8 +1479,13 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); - if (update) - redis_update(ps->call, m->conf.redis); + if (update) { + if (m->conf.redis_write) { + redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE); + } + } next: hlp.ports[ke->target.target_port] = NULL; @@ -2819,7 +2829,11 @@ void call_destroy(struct call *c) { obj_put(c); - redis_delete(c, m->conf.redis); + if (m->conf.redis_write) { + redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE); + } rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ @@ -3714,7 +3728,11 @@ static void calls_dump_iterator(void *key, void *val, void *ptr) { struct call *c = val; struct callmaster *m = c->callmaster; - redis_update(c, m->conf.redis); + if (m->conf.redis_write) { + redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_update(c, m->conf.redis, MASTER_REDIS_ROLE); + } } void calls_dump_redis(struct callmaster *m) { @@ -3722,11 +3740,31 @@ void calls_dump_redis(struct callmaster *m) { return; ilog(LOG_DEBUG, "Start dumping all call data to Redis...\n"); - redis_wipe(m->conf.redis); + redis_wipe(m->conf.redis, MASTER_REDIS_ROLE); g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); ilog(LOG_DEBUG, "Finished dumping all call data to Redis\n"); } +void calls_dump_redis_read(struct callmaster *m) { + if (!m->conf.redis_read) + return; + + ilog(LOG_DEBUG, "Start dumping all call data to read Redis...\n"); + redis_wipe(m->conf.redis_read, ANY_REDIS_ROLE); + g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); + ilog(LOG_DEBUG, "Finished dumping all call data to read Redis\n"); +} + +void calls_dump_redis_write(struct callmaster *m) { + if (!m->conf.redis_write) + return; + + ilog(LOG_DEBUG, "Start dumping all call data to write Redis...\n"); + redis_wipe(m->conf.redis_write, ANY_REDIS_ROLE); + g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); + ilog(LOG_DEBUG, "Finished dumping all call data to write Redis\n"); +} + const struct transport_protocol *transport_protocol(const str *s) { int i; diff --git a/daemon/call.h b/daemon/call.h index c098228db..5e9aca677 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -439,6 +439,8 @@ struct callmaster_config { unsigned int silent_timeout; unsigned int delete_delay; struct redis *redis; + struct redis *redis_read; + struct redis *redis_write; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; @@ -497,6 +499,8 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, struct timeval *iv_start, struct timeval *iv_duration); void calls_dump_redis(struct callmaster *); +void calls_dump_redis_read(struct callmaster *); +void calls_dump_redis_write(struct callmaster *); struct call_monologue *__monologue_create(struct call *call); void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index b0a9eecfc..897aeb3bc 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -189,7 +189,11 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - redis_update(c, m->conf.redis); + if (m->conf.redis_write) { + redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_update(c, m->conf.redis, MASTER_REDIS_ROLE); + } gettimeofday(&(monologue->started), NULL); @@ -341,7 +345,11 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - redis_update(c, m->conf.redis); + if (m->conf.redis_write) { + redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_update(c, m->conf.redis, MASTER_REDIS_ROLE); + } ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -714,7 +722,11 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags); rwlock_unlock_w(&call->master_lock); - redis_update(call, m->conf.redis); + if (m->conf.redis_write) { + redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE); + } else if (m->conf.redis) { + redis_update(call, m->conf.redis, MASTER_REDIS_ROLE); + } obj_put(call); gettimeofday(&(monologue->started), NULL); diff --git a/daemon/main.c b/daemon/main.c index 1a171b910..3b6241567 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -76,6 +76,12 @@ static int max_sessions = 0; static u_int32_t redis_ip; static u_int16_t redis_port; static int redis_db = -1; +static u_int32_t redis_read_ip; +static u_int32_t redis_write_ip; +static u_int16_t redis_read_port; +static u_int16_t redis_write_port; +static int redis_read_db = -1; +static int redis_write_db = -1; static char *b2b_url; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static int num_threads; @@ -229,6 +235,7 @@ static void options(int *argc, char ***argv) { char *graphitep = NULL; char *graphite_prefix_s = NULL; char *redisps = NULL; + char *redisps_read = NULL, *redisps_write = NULL; char *log_facility_s = NULL; char *log_facility_cdr_s = NULL; char *log_facility_rtcp_s = NULL; @@ -243,9 +250,9 @@ static void options(int *argc, char ***argv) { { "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, - { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, - { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" }, - { "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, + { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, + { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" }, + { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, { "graphite-prefix",0, 0, G_OPTION_ARG_STRING, &graphite_prefix_s, "Prefix for graphite line", "STRING"}, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, @@ -256,6 +263,10 @@ static void options(int *argc, char ***argv) { { "port-max", 'M', 0, G_OPTION_ARG_INT, &port_max, "Highest port to use for RTP", "INT" }, { "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "IP:PORT" }, { "redis-db", 'R', 0, G_OPTION_ARG_INT, &redis_db, "Which Redis DB to use", "INT" }, + { "redis-read", 'z', 0, G_OPTION_ARG_STRING, &redisps_read, "Connect to Redis read database", "IP:PORT" }, + { "redis-read-db", 'Z', 0, G_OPTION_ARG_INT, &redis_read_db, "Which Redis read DB to use", "INT" }, + { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "IP:PORT" }, + { "redis-write-db", 'W', 0, G_OPTION_ARG_INT, &redis_write_db,"Which Redis write DB to use", "INT" }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level,"Mask log priorities above this level","INT" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -332,7 +343,21 @@ static void options(int *argc, char ***argv) { if (redis_db < 0) die("Must specify Redis DB number (--redis-db) when using Redis"); } - + + if (redisps_read) { + if (parse_ip_port(&redis_read_ip, &redis_read_port, redisps_read) || !redis_read_ip) + die("Invalid Redis read IP or port (--redis-read)"); + if (redis_read_db < 0) + die("Must specify Redis read DB number (--redis-read-db) when using Redis"); + } + + if (redisps_write) { + if (parse_ip_port(&redis_write_ip, &redis_write_port, redisps_write) || !redis_write_ip) + die("Invalid Redis write IP or port (--redis-write)"); + if (redis_write_db < 0) + die("Must specify Redis write DB number (--redis-write-db) when using Redis"); + } + if (xmlrpc_fmt > 1) die("Invalid XMLRPC format"); @@ -548,11 +573,23 @@ no_kernel: } if (redis_ip) { - mc.redis = redis_new(redis_ip, redis_port, redis_db); + mc.redis = redis_new(redis_ip, redis_port, redis_db, MASTER_REDIS_ROLE); if (!mc.redis) die("Cannot start up without Redis database"); } + if (redis_read_ip) { + mc.redis_read = redis_new(redis_read_ip, redis_read_port, redis_read_db, ANY_REDIS_ROLE); + if (!mc.redis_read) + die("Cannot start up without Redis read database"); + } + + if (redis_write_ip) { + mc.redis_write = redis_new(redis_write_ip, redis_write_port, redis_write_db, ANY_REDIS_ROLE); + if (!mc.redis_write) + die("Cannot start up without Redis write database"); + } + ctx->m->conf = mc; callmaster_config_init(ctx->m); @@ -563,9 +600,13 @@ no_kernel: // start redis restore timer gettimeofday(&redis_start, NULL); - // restore - if (redis_restore(ctx->m, mc.redis)) - die("Refusing to continue without working Redis database"); + if (mc.redis_read) { + if (redis_restore(ctx->m, mc.redis_read, ANY_REDIS_ROLE)) + die("Refusing to continue without working Redis read database"); + } else if (mc.redis) { + if (redis_restore(ctx->m, mc.redis, MASTER_REDIS_ROLE)) + die("Refusing to continue without working Redis database"); + } // stop redis restore timer gettimeofday(&redis_stop, NULL); diff --git a/daemon/redis.c b/daemon/redis.c index 95f8f7eda..bca61fea7 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -123,7 +123,7 @@ static void redis_consume(struct redis *r) { /* called with r->lock held if necessary */ -static int redis_connect(struct redis *r, int wait) { +static int redis_connect(struct redis *r, int wait, int role) { struct timeval tv; redisReply *rp; char *s; @@ -150,22 +150,37 @@ static int redis_connect(struct redis *r, int wait) { while (wait-- >= 0) { ilog(LOG_INFO, "Asking Redis whether it's master or slave..."); rp = redisCommand(r->ctx, "INFO"); - if (!rp) + if (!rp) { goto err2; + } s = strstr(rp->str, "role:"); - if (!s) + if (!s) { goto err3; - if (!memcmp(s, "role:master", 9)) - goto done; - else if (!memcmp(s, "role:slave", 8)) - goto next; - else + } + + if (!memcmp(s, "role:master", 9)) { + if (role == MASTER_REDIS_ROLE || role == ANY_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis in master mode"); + goto done; + } else if (role == SLAVE_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis in master mode, but wanted mode is slave; retrying..."); + goto next; + } + } else if (!memcmp(s, "role:slave", 8)) { + if (role == SLAVE_REDIS_ROLE || role == ANY_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis in slave mode"); + goto done; + } else if (role == MASTER_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis in slave mode, but wanted mode is master; retrying..."); + goto next; + } + } else { goto err3; + } next: freeReplyObject(rp); - rlog(LOG_INFO, "Connected to Redis, but it's in slave mode"); usleep(1000000); } @@ -174,7 +189,6 @@ next: done: freeReplyObject(rp); redis_check_type(r, "calls", NULL, "set"); - ilog(LOG_INFO, "Connected to Redis"); return 0; err3: @@ -191,7 +205,7 @@ err: -struct redis *redis_new(u_int32_t ip, u_int16_t port, int db) { +struct redis *redis_new(u_int32_t ip, u_int16_t port, int db, int role) { struct redis *r; r = g_slice_alloc0(sizeof(*r)); @@ -202,7 +216,7 @@ struct redis *redis_new(u_int32_t ip, u_int16_t port, int db) { r->db = db; mutex_init(&r->lock); - if (redis_connect(r, 10)) + if (redis_connect(r, 10, role)) goto err; return r; @@ -225,11 +239,11 @@ static void redis_close(struct redis *r) { /* called with r->lock held if necessary */ -static void redis_check_conn(struct redis *r) { +static void redis_check_conn(struct redis *r, int role) { if (redisCommandNR(r->ctx, "PING") == 0) return; rlog(LOG_INFO, "Lost connection to Redis"); - if (redis_connect(r, 1)) + if (redis_connect(r, 1, role)) abort(); } @@ -985,7 +999,7 @@ static void restore_thread(void *call_p, void *ctx_p) { mutex_unlock(&ctx->r_m); } -int redis_restore(struct callmaster *m, struct redis *r) { +int redis_restore(struct callmaster *m, struct redis *r, int role) { redisReply *calls, *call; int i, ret = -1; GThreadPool *gtp; @@ -997,7 +1011,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { log_level |= LOG_FLAG_RESTORE; rlog(LOG_DEBUG, "Restoring calls from Redis..."); - redis_check_conn(r); + redis_check_conn(r, role); calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); @@ -1010,7 +1024,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { mutex_init(&ctx.r_m); g_queue_init(&ctx.r_q); for (i = 0; i < RESTORE_NUM_THREADS; i++) - g_queue_push_tail(&ctx.r_q, redis_new(r->ip, r->port, r->db)); + g_queue_push_tail(&ctx.r_q, redis_new(r->ip, r->port, r->db, role)); gtp = g_thread_pool_new(restore_thread, &ctx, RESTORE_NUM_THREADS, TRUE, NULL); for (i = 0; i < calls->elements; i++) { @@ -1099,7 +1113,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, voi /* must be called lock-free */ -void redis_update(struct call *c, struct redis *r) { +void redis_update(struct call *c, struct redis *r, int role) { GSList *l, *n; GList *pt_list, *pt_iter; GList *k, *m; @@ -1116,7 +1130,7 @@ void redis_update(struct call *c, struct redis *r) { return; mutex_lock(&r->lock); - redis_check_conn(r); + redis_check_conn(r, role); rwlock_lock_r(&c->master_lock); @@ -1295,12 +1309,12 @@ void redis_update(struct call *c, struct redis *r) { /* must be called lock-free */ -void redis_delete(struct call *c, struct redis *r) { +void redis_delete(struct call *c, struct redis *r, int role) { if (!r) return; mutex_lock(&r->lock); - redis_check_conn(r); + redis_check_conn(r, role); rwlock_lock_r(&c->master_lock); redis_delete_call(c, r); @@ -1313,12 +1327,12 @@ void redis_delete(struct call *c, struct redis *r) { -void redis_wipe(struct redis *r) { +void redis_wipe(struct redis *r, int role) { if (!r) return; mutex_lock(&r->lock); - redis_check_conn(r); + redis_check_conn(r, role); redisCommandNR(r->ctx, "DEL calls"); mutex_unlock(&r->lock); } diff --git a/daemon/redis.h b/daemon/redis.h index c37f7df0c..0ab1f710b 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -11,6 +11,9 @@ #include +#define MASTER_REDIS_ROLE 0 +#define SLAVE_REDIS_ROLE 1 +#define ANY_REDIS_ROLE 2 struct callmaster; struct call; @@ -74,11 +77,11 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) -struct redis *redis_new(u_int32_t, u_int16_t, int); -int redis_restore(struct callmaster *, struct redis *); -void redis_update(struct call *, struct redis *); -void redis_delete(struct call *, struct redis *); -void redis_wipe(struct redis *); +struct redis *redis_new(u_int32_t, u_int16_t, int, int); +int redis_restore(struct callmaster *, struct redis *, int); +void redis_update(struct call *, struct redis *, int); +void redis_delete(struct call *, struct redis *, int); +void redis_wipe(struct redis *, int); diff --git a/debian/ngcp-rtpengine-daemon.default b/debian/ngcp-rtpengine-daemon.default index afa883027..f204ab19b 100644 --- a/debian/ngcp-rtpengine-daemon.default +++ b/debian/ngcp-rtpengine-daemon.default @@ -17,6 +17,10 @@ TABLE=0 # PORT_MAX=50000 # REDIS=127.0.0.1:6379 # REDIS_DB=1 +# REDIS_READ=127.0.0.1:6379 +# REDIS_READ_DB=1 +# REDIS_WRITE=127.0.0.1:6379 +# REDIS_WRITE_DB=1 # B2B_URL=http://127.0.0.1:8090/ # LOG_LEVEL=6 # LOG_FACILITY=daemon diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 7be49ea14..ca7f5a7af 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -64,6 +64,10 @@ fi [ -z "$PORT_MAX" ] || OPTIONS="$OPTIONS --port-max=$PORT_MAX" [ -z "$REDIS" ] || OPTIONS="$OPTIONS --redis=$REDIS" [ -z "$REDIS_DB" ] || OPTIONS="$OPTIONS --redis-db=$REDIS_DB" +[ -z "$REDIS_READ" ] || OPTIONS="$OPTIONS --redis-read=$REDIS_READ" +[ -z "$REDIS_READ_DB" ] || OPTIONS="$OPTIONS --redis-read-db=$REDIS_READ_DB" +[ -z "$REDIS_WRITE" ] || OPTIONS="$OPTIONS --redis-write=$REDIS_WRITE" +[ -z "$REDIS_WRITE_DB" ] || OPTIONS="$OPTIONS --redis-write-db=$REDIS_WRITE_DB" [ -z "$B2B_URL" ] || OPTIONS="$OPTIONS --b2b-url=$B2B_URL" [ -z "$NO_FALLBACK" -o \( "$NO_FALLBACK" != "1" -a "$NO_FALLBACK" != "yes" \) ] || OPTIONS="$OPTIONS --no-fallback" OPTIONS="$OPTIONS --table=$TABLE"