Browse Source

Specific REDIS read and write databases

Able to specify now, in the config, the redis databases from which rtpengine
should read and write session information.
pull/158/head
smititelu 10 years ago
parent
commit
72cf55b6fb
9 changed files with 172 additions and 48 deletions
  1. +6
    -2
      README.md
  2. +45
    -7
      daemon/call.c
  3. +4
    -0
      daemon/call.h
  4. +15
    -3
      daemon/call_interfaces.c
  5. +49
    -8
      daemon/main.c
  6. +37
    -23
      daemon/redis.c
  7. +8
    -5
      daemon/redis.h
  8. +4
    -0
      debian/ngcp-rtpengine-daemon.default
  9. +4
    -0
      debian/ngcp-rtpengine-daemon.init

+ 6
- 2
README.md View File

@ -169,6 +169,10 @@ option and which are reproduced below:
-M, --port-max=INT Highest port to use for RTP
-r, --redis=IP:PORT Connect to Redis database
-R, --redis-db=INT Which Redis DB to use
-z, --redis-read=IP:PORT Connect to Redis read database
-Z, --redis-read-db=INT Which Redis read DB to use
-w, --redis-write=IP:PORT Connect to Redis write database
-W, --redis-write-db=INT Which Redis write DB to use
-b, --b2b-url=STRING XMLRPC URL of B2B UA
-L, --log-level=INT Mask log priorities above this level
--log-facility=daemon|local0|... Syslog facility to use for logging
@ -181,7 +185,7 @@ option and which are reproduced below:
--sip-source Use SIP source address by default
--dtls-passive Always prefer DTLS passive role
-g, --graphite=[IP46:]PORT TCP address of graphite statistics server
-w, --graphite-interval=INT Graphite data statistics send interval
-G, --graphite-interval=INT Graphite data statistics send interval
--graphite-prefix=STRING Graphite prefix for every line
--max-sessions=INT Limit the number of maximum concurrent sessions
@ -356,7 +360,7 @@ The options are described in more detail below.
Delete the call from memory after the specified delay from memory. Can be set to zero for
immediate call deletion.
* -r, --redis, -R, --redis-db, -b, --b2b-url
* -r, --redis, -R, --redis-db, -z, --redis-read, -Z, --redis-read-db, -w, --redis-write, -W, --redis-write-db, -b, --b2b-url
NGCP-specific options


+ 45
- 7
daemon/call.c View File

@ -1033,8 +1033,13 @@ got_dst:
out:
ca = sfd->call ? : NULL;
if (ca && update)
redis_update(ca, sfd->call->callmaster->conf.redis);
if (ca && update) {
if (sfd->call->callmaster->conf.redis_write) {
redis_update(ca, sfd->call->callmaster->conf.redis_write, ANY_REDIS_ROLE);
} else if (sfd->call->callmaster->conf.redis) {
redis_update(ca, sfd->call->callmaster->conf.redis, MASTER_REDIS_ROLE);
}
}
done:
log_info_clear();
}
@ -1474,8 +1479,13 @@ static void callmaster_timer(void *ptr) {
rwlock_unlock_r(&sfd->call->master_lock);
if (update)
redis_update(ps->call, m->conf.redis);
if (update) {
if (m->conf.redis_write) {
redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE);
}
}
next:
hlp.ports[ke->target.target_port] = NULL;
@ -2819,7 +2829,11 @@ void call_destroy(struct call *c) {
obj_put(c);
redis_delete(c, m->conf.redis);
if (m->conf.redis_write) {
redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE);
}
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */
@ -3714,7 +3728,11 @@ static void calls_dump_iterator(void *key, void *val, void *ptr) {
struct call *c = val;
struct callmaster *m = c->callmaster;
redis_update(c, m->conf.redis);
if (m->conf.redis_write) {
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE);
}
}
void calls_dump_redis(struct callmaster *m) {
@ -3722,11 +3740,31 @@ void calls_dump_redis(struct callmaster *m) {
return;
ilog(LOG_DEBUG, "Start dumping all call data to Redis...\n");
redis_wipe(m->conf.redis);
redis_wipe(m->conf.redis, MASTER_REDIS_ROLE);
g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL);
ilog(LOG_DEBUG, "Finished dumping all call data to Redis\n");
}
void calls_dump_redis_read(struct callmaster *m) {
if (!m->conf.redis_read)
return;
ilog(LOG_DEBUG, "Start dumping all call data to read Redis...\n");
redis_wipe(m->conf.redis_read, ANY_REDIS_ROLE);
g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL);
ilog(LOG_DEBUG, "Finished dumping all call data to read Redis\n");
}
void calls_dump_redis_write(struct callmaster *m) {
if (!m->conf.redis_write)
return;
ilog(LOG_DEBUG, "Start dumping all call data to write Redis...\n");
redis_wipe(m->conf.redis_write, ANY_REDIS_ROLE);
g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL);
ilog(LOG_DEBUG, "Finished dumping all call data to write Redis\n");
}
const struct transport_protocol *transport_protocol(const str *s) {
int i;


+ 4
- 0
daemon/call.h View File

@ -439,6 +439,8 @@ struct callmaster_config {
unsigned int silent_timeout;
unsigned int delete_delay;
struct redis *redis;
struct redis *redis_read;
struct redis *redis_write;
char *b2b_url;
unsigned char default_tos;
enum xmlrpc_format fmt;
@ -497,6 +499,8 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
struct timeval *iv_start, struct timeval *iv_duration);
void calls_dump_redis(struct callmaster *);
void calls_dump_redis_read(struct callmaster *);
void calls_dump_redis_write(struct callmaster *);
struct call_monologue *__monologue_create(struct call *call);
void __monologue_tag(struct call_monologue *ml, const str *tag);
void __monologue_viabranch(struct call_monologue *ml, const str *viabranch);


+ 15
- 3
daemon/call_interfaces.c View File

@ -189,7 +189,11 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP);
rwlock_unlock_w(&c->master_lock);
redis_update(c, m->conf.redis);
if (m->conf.redis_write) {
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE);
}
gettimeofday(&(monologue->started), NULL);
@ -341,7 +345,11 @@ out2:
rwlock_unlock_w(&c->master_lock);
streams_free(&s);
redis_update(c, m->conf.redis);
if (m->conf.redis_write) {
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE);
}
ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret));
obj_put(c);
@ -714,7 +722,11 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags);
rwlock_unlock_w(&call->master_lock);
redis_update(call, m->conf.redis);
if (m->conf.redis_write) {
redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_update(call, m->conf.redis, MASTER_REDIS_ROLE);
}
obj_put(call);
gettimeofday(&(monologue->started), NULL);


+ 49
- 8
daemon/main.c View File

@ -76,6 +76,12 @@ static int max_sessions = 0;
static u_int32_t redis_ip;
static u_int16_t redis_port;
static int redis_db = -1;
static u_int32_t redis_read_ip;
static u_int32_t redis_write_ip;
static u_int16_t redis_read_port;
static u_int16_t redis_write_port;
static int redis_read_db = -1;
static int redis_write_db = -1;
static char *b2b_url;
static enum xmlrpc_format xmlrpc_fmt = XF_SEMS;
static int num_threads;
@ -229,6 +235,7 @@ static void options(int *argc, char ***argv) {
char *graphitep = NULL;
char *graphite_prefix_s = NULL;
char *redisps = NULL;
char *redisps_read = NULL, *redisps_write = NULL;
char *log_facility_s = NULL;
char *log_facility_cdr_s = NULL;
char *log_facility_rtcp_s = NULL;
@ -243,9 +250,9 @@ static void options(int *argc, char ***argv) {
{ "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" },
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" },
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" },
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" },
{ "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" },
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" },
{ "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" },
{ "graphite-prefix",0, 0, G_OPTION_ARG_STRING, &graphite_prefix_s, "Prefix for graphite line", "STRING"},
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
@ -256,6 +263,10 @@ static void options(int *argc, char ***argv) {
{ "port-max", 'M', 0, G_OPTION_ARG_INT, &port_max, "Highest port to use for RTP", "INT" },
{ "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "IP:PORT" },
{ "redis-db", 'R', 0, G_OPTION_ARG_INT, &redis_db, "Which Redis DB to use", "INT" },
{ "redis-read", 'z', 0, G_OPTION_ARG_STRING, &redisps_read, "Connect to Redis read database", "IP:PORT" },
{ "redis-read-db", 'Z', 0, G_OPTION_ARG_INT, &redis_read_db, "Which Redis read DB to use", "INT" },
{ "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "IP:PORT" },
{ "redis-write-db", 'W', 0, G_OPTION_ARG_INT, &redis_write_db,"Which Redis write DB to use", "INT" },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level,"Mask log priorities above this level","INT" },
{ "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"},
@ -332,7 +343,21 @@ static void options(int *argc, char ***argv) {
if (redis_db < 0)
die("Must specify Redis DB number (--redis-db) when using Redis");
}
if (redisps_read) {
if (parse_ip_port(&redis_read_ip, &redis_read_port, redisps_read) || !redis_read_ip)
die("Invalid Redis read IP or port (--redis-read)");
if (redis_read_db < 0)
die("Must specify Redis read DB number (--redis-read-db) when using Redis");
}
if (redisps_write) {
if (parse_ip_port(&redis_write_ip, &redis_write_port, redisps_write) || !redis_write_ip)
die("Invalid Redis write IP or port (--redis-write)");
if (redis_write_db < 0)
die("Must specify Redis write DB number (--redis-write-db) when using Redis");
}
if (xmlrpc_fmt > 1)
die("Invalid XMLRPC format");
@ -548,11 +573,23 @@ no_kernel:
}
if (redis_ip) {
mc.redis = redis_new(redis_ip, redis_port, redis_db);
mc.redis = redis_new(redis_ip, redis_port, redis_db, MASTER_REDIS_ROLE);
if (!mc.redis)
die("Cannot start up without Redis database");
}
if (redis_read_ip) {
mc.redis_read = redis_new(redis_read_ip, redis_read_port, redis_read_db, ANY_REDIS_ROLE);
if (!mc.redis_read)
die("Cannot start up without Redis read database");
}
if (redis_write_ip) {
mc.redis_write = redis_new(redis_write_ip, redis_write_port, redis_write_db, ANY_REDIS_ROLE);
if (!mc.redis_write)
die("Cannot start up without Redis write database");
}
ctx->m->conf = mc;
callmaster_config_init(ctx->m);
@ -563,9 +600,13 @@ no_kernel:
// start redis restore timer
gettimeofday(&redis_start, NULL);
// restore
if (redis_restore(ctx->m, mc.redis))
die("Refusing to continue without working Redis database");
if (mc.redis_read) {
if (redis_restore(ctx->m, mc.redis_read, ANY_REDIS_ROLE))
die("Refusing to continue without working Redis read database");
} else if (mc.redis) {
if (redis_restore(ctx->m, mc.redis, MASTER_REDIS_ROLE))
die("Refusing to continue without working Redis database");
}
// stop redis restore timer
gettimeofday(&redis_stop, NULL);


+ 37
- 23
daemon/redis.c View File

@ -123,7 +123,7 @@ static void redis_consume(struct redis *r) {
/* called with r->lock held if necessary */
static int redis_connect(struct redis *r, int wait) {
static int redis_connect(struct redis *r, int wait, int role) {
struct timeval tv;
redisReply *rp;
char *s;
@ -150,22 +150,37 @@ static int redis_connect(struct redis *r, int wait) {
while (wait-- >= 0) {
ilog(LOG_INFO, "Asking Redis whether it's master or slave...");
rp = redisCommand(r->ctx, "INFO");
if (!rp)
if (!rp) {
goto err2;
}
s = strstr(rp->str, "role:");
if (!s)
if (!s) {
goto err3;
if (!memcmp(s, "role:master", 9))
goto done;
else if (!memcmp(s, "role:slave", 8))
goto next;
else
}
if (!memcmp(s, "role:master", 9)) {
if (role == MASTER_REDIS_ROLE || role == ANY_REDIS_ROLE) {
ilog(LOG_INFO, "Connected to Redis in master mode");
goto done;
} else if (role == SLAVE_REDIS_ROLE) {
ilog(LOG_INFO, "Connected to Redis in master mode, but wanted mode is slave; retrying...");
goto next;
}
} else if (!memcmp(s, "role:slave", 8)) {
if (role == SLAVE_REDIS_ROLE || role == ANY_REDIS_ROLE) {
ilog(LOG_INFO, "Connected to Redis in slave mode");
goto done;
} else if (role == MASTER_REDIS_ROLE) {
ilog(LOG_INFO, "Connected to Redis in slave mode, but wanted mode is master; retrying...");
goto next;
}
} else {
goto err3;
}
next:
freeReplyObject(rp);
rlog(LOG_INFO, "Connected to Redis, but it's in slave mode");
usleep(1000000);
}
@ -174,7 +189,6 @@ next:
done:
freeReplyObject(rp);
redis_check_type(r, "calls", NULL, "set");
ilog(LOG_INFO, "Connected to Redis");
return 0;
err3:
@ -191,7 +205,7 @@ err:
struct redis *redis_new(u_int32_t ip, u_int16_t port, int db) {
struct redis *redis_new(u_int32_t ip, u_int16_t port, int db, int role) {
struct redis *r;
r = g_slice_alloc0(sizeof(*r));
@ -202,7 +216,7 @@ struct redis *redis_new(u_int32_t ip, u_int16_t port, int db) {
r->db = db;
mutex_init(&r->lock);
if (redis_connect(r, 10))
if (redis_connect(r, 10, role))
goto err;
return r;
@ -225,11 +239,11 @@ static void redis_close(struct redis *r) {
/* called with r->lock held if necessary */
static void redis_check_conn(struct redis *r) {
static void redis_check_conn(struct redis *r, int role) {
if (redisCommandNR(r->ctx, "PING") == 0)
return;
rlog(LOG_INFO, "Lost connection to Redis");
if (redis_connect(r, 1))
if (redis_connect(r, 1, role))
abort();
}
@ -985,7 +999,7 @@ static void restore_thread(void *call_p, void *ctx_p) {
mutex_unlock(&ctx->r_m);
}
int redis_restore(struct callmaster *m, struct redis *r) {
int redis_restore(struct callmaster *m, struct redis *r, int role) {
redisReply *calls, *call;
int i, ret = -1;
GThreadPool *gtp;
@ -997,7 +1011,7 @@ int redis_restore(struct callmaster *m, struct redis *r) {
log_level |= LOG_FLAG_RESTORE;
rlog(LOG_DEBUG, "Restoring calls from Redis...");
redis_check_conn(r);
redis_check_conn(r, role);
calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls");
@ -1010,7 +1024,7 @@ int redis_restore(struct callmaster *m, struct redis *r) {
mutex_init(&ctx.r_m);
g_queue_init(&ctx.r_q);
for (i = 0; i < RESTORE_NUM_THREADS; i++)
g_queue_push_tail(&ctx.r_q, redis_new(r->ip, r->port, r->db));
g_queue_push_tail(&ctx.r_q, redis_new(r->ip, r->port, r->db, role));
gtp = g_thread_pool_new(restore_thread, &ctx, RESTORE_NUM_THREADS, TRUE, NULL);
for (i = 0; i < calls->elements; i++) {
@ -1099,7 +1113,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, voi
/* must be called lock-free */
void redis_update(struct call *c, struct redis *r) {
void redis_update(struct call *c, struct redis *r, int role) {
GSList *l, *n;
GList *pt_list, *pt_iter;
GList *k, *m;
@ -1116,7 +1130,7 @@ void redis_update(struct call *c, struct redis *r) {
return;
mutex_lock(&r->lock);
redis_check_conn(r);
redis_check_conn(r, role);
rwlock_lock_r(&c->master_lock);
@ -1295,12 +1309,12 @@ void redis_update(struct call *c, struct redis *r) {
/* must be called lock-free */
void redis_delete(struct call *c, struct redis *r) {
void redis_delete(struct call *c, struct redis *r, int role) {
if (!r)
return;
mutex_lock(&r->lock);
redis_check_conn(r);
redis_check_conn(r, role);
rwlock_lock_r(&c->master_lock);
redis_delete_call(c, r);
@ -1313,12 +1327,12 @@ void redis_delete(struct call *c, struct redis *r) {
void redis_wipe(struct redis *r) {
void redis_wipe(struct redis *r, int role) {
if (!r)
return;
mutex_lock(&r->lock);
redis_check_conn(r);
redis_check_conn(r, role);
redisCommandNR(r->ctx, "DEL calls");
mutex_unlock(&r->lock);
}

+ 8
- 5
daemon/redis.h View File

@ -11,6 +11,9 @@
#include <hiredis/hiredis.h>
#define MASTER_REDIS_ROLE 0
#define SLAVE_REDIS_ROLE 1
#define ANY_REDIS_ROLE 2
struct callmaster;
struct call;
@ -74,11 +77,11 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v)
struct redis *redis_new(u_int32_t, u_int16_t, int);
int redis_restore(struct callmaster *, struct redis *);
void redis_update(struct call *, struct redis *);
void redis_delete(struct call *, struct redis *);
void redis_wipe(struct redis *);
struct redis *redis_new(u_int32_t, u_int16_t, int, int);
int redis_restore(struct callmaster *, struct redis *, int);
void redis_update(struct call *, struct redis *, int);
void redis_delete(struct call *, struct redis *, int);
void redis_wipe(struct redis *, int);


+ 4
- 0
debian/ngcp-rtpengine-daemon.default View File

@ -17,6 +17,10 @@ TABLE=0
# PORT_MAX=50000
# REDIS=127.0.0.1:6379
# REDIS_DB=1
# REDIS_READ=127.0.0.1:6379
# REDIS_READ_DB=1
# REDIS_WRITE=127.0.0.1:6379
# REDIS_WRITE_DB=1
# B2B_URL=http://127.0.0.1:8090/
# LOG_LEVEL=6
# LOG_FACILITY=daemon


+ 4
- 0
debian/ngcp-rtpengine-daemon.init View File

@ -64,6 +64,10 @@ fi
[ -z "$PORT_MAX" ] || OPTIONS="$OPTIONS --port-max=$PORT_MAX"
[ -z "$REDIS" ] || OPTIONS="$OPTIONS --redis=$REDIS"
[ -z "$REDIS_DB" ] || OPTIONS="$OPTIONS --redis-db=$REDIS_DB"
[ -z "$REDIS_READ" ] || OPTIONS="$OPTIONS --redis-read=$REDIS_READ"
[ -z "$REDIS_READ_DB" ] || OPTIONS="$OPTIONS --redis-read-db=$REDIS_READ_DB"
[ -z "$REDIS_WRITE" ] || OPTIONS="$OPTIONS --redis-write=$REDIS_WRITE"
[ -z "$REDIS_WRITE_DB" ] || OPTIONS="$OPTIONS --redis-write-db=$REDIS_WRITE_DB"
[ -z "$B2B_URL" ] || OPTIONS="$OPTIONS --b2b-url=$B2B_URL"
[ -z "$NO_FALLBACK" -o \( "$NO_FALLBACK" != "1" -a "$NO_FALLBACK" != "yes" \) ] || OPTIONS="$OPTIONS --no-fallback"
OPTIONS="$OPTIONS --table=$TABLE"


Loading…
Cancel
Save