From 749a7da7b064adbdf3e479cff5e0ee8cae7d21a6 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Fri, 9 Dec 2016 09:05:25 +0100 Subject: [PATCH] Implements redis onekey concept The redis onekey concepts is introduced to reduce traffic to redis and redis notification traffic. It modifies the current structure for one call in redis, which are multiple keys with pre- and postfixes and the callid in between to one key with the structure "json-". The value is a json formatted string with the previous multi-key identifiers in it. --- README.md | 7 +- daemon/Makefile | 4 + daemon/call.c | 9 +- daemon/call.h | 7 +- daemon/call_interfaces.c | 24 +- daemon/main.c | 3 + daemon/media_socket.c | 9 +- daemon/redis.c | 1252 +++++++++++++++++++++++++++-- daemon/redis.h | 8 +- debian/control | 1 + debian/ngcp-rtpengine-daemon.init | 1 + 11 files changed, 1261 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index bbed85197..73d7c7c02 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,8 @@ option and which are reproduced below: -w, --redis-write=[PW@]IP:PORT/INT Connect to Redis write database -k, --subscribe-keyspace Subscription keyspace list --redis-num-threads=INT Number of Redis restore threads - --redis-expires=INT Expire time in seconds for redis keys + --redis-expires=INT Expire time in seconds for redis keys + --redis-multikey=INT Use multiple redis keys for storing the call (old behaviour) DEPRECATED -q, --no-redis-required Start even if can't connect to redis databases -b, --b2b-url=STRING XMLRPC URL of B2B UA -L, --log-level=INT Mask log priorities above this level @@ -426,6 +427,10 @@ The options are described in more detail below. Expire time in seconds for redis keys. Default is 86400. +* --redis-multikey + + Use multiple redis keys for storing the call (old behaviour) DEPRECATED + * -q, --no-redis-required When this paramter is present or NO_REDIS_REQUIRED='yes' or '1' in config file, rtpengine starts even if there is no initial connection to redis databases(either to -r or to -w or to both redis). diff --git a/daemon/Makefile b/daemon/Makefile index 7eca49501..c3f6dcf72 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -8,6 +8,7 @@ CFLAGS+= `pkg-config --cflags zlib` CFLAGS+= `pkg-config --cflags openssl` CFLAGS+= `pkg-config --cflags libevent_pthreads` CFLAGS+= `pcre-config --cflags` +CFLAGS+= `pkg-config --cflags json-glib-1.0` CFLAGS+= -I. -I../kernel-module/ -I../lib/ CFLAGS+= -D_GNU_SOURCE @@ -26,10 +27,13 @@ LDFLAGS+= `pkg-config --libs libpcre` LDFLAGS+= `pkg-config --libs libcrypto` LDFLAGS+= `pkg-config --libs openssl` LDFLAGS+= `pkg-config --libs libevent_pthreads` +LDFLAGS+= `pkg-config --libs` LDFLAGS+= -lpcap LDFLAGS+= `pcre-config --libs` LDFLAGS+= `xmlrpc-c-config client --libs` LDFLAGS+= -lhiredis +LDFLAGS+= `pkg-config --libs libcurl` +LDFLAGS+= `pkg-config --libs json-glib-1.0` include ../lib/lib.Makefile diff --git a/daemon/call.c b/daemon/call.c index c3b0a10d9..fc08a4390 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -594,8 +594,13 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); - if (update) - redis_update(ps->call, m->conf.redis_write); + if (update) { + if (m->conf.redis_multikey) { + redis_update(ps->call, m->conf.redis_write); + } else { + redis_update_onekey(ps->call, m->conf.redis_write); + } + } next: g_hash_table_remove(hlp.addr_sfd, &ep); diff --git a/daemon/call.h b/daemon/call.h index d8f19c7aa..2dc84b3b9 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -5,7 +5,8 @@ /* XXX split everything into call_signalling.[ch] and call_packets.[ch] or w/e */ - +#include +#include #include #include @@ -443,6 +444,7 @@ struct call { unsigned int foreign_call; // created_via_redis_notify call struct recording *recording; + JsonReader *root_reader; }; struct callmaster_config { @@ -460,7 +462,8 @@ struct callmaster_config { struct event_base *redis_notify_event_base; GQueue *redis_subscribed_keyspaces; struct redisAsyncContext *redis_notify_async_context; - unsigned int redis_expires_secs; + unsigned int redis_expires_secs; + unsigned int redis_multikey; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index e08e708f4..8bd2fc992 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -186,7 +186,11 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - redis_update(c, m->conf.redis_write); + if (m->conf.redis_multikey) { + redis_update(c, m->conf.redis_write); + } else { + redis_update_onekey(c, m->conf.redis_write); + } gettimeofday(&(monologue->started), NULL); @@ -334,7 +338,11 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - redis_update(c, m->conf.redis_write); + if (m->conf.redis_multikey) { + redis_update(c, m->conf.redis_write); + } else { + redis_update_onekey(c, m->conf.redis_write); + } ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -764,10 +772,16 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster } rwlock_unlock_w(&call->master_lock); - if (!flags.no_redis_update) - redis_update(call, m->conf.redis_write); - else + + if (!flags.no_redis_update) { + if (m->conf.redis_multikey) { + redis_update(call, m->conf.redis_write); + } else { + redis_update_onekey(call,m->conf.redis_write); + } + } else { ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); + } obj_put(call); gettimeofday(&(monologue->started), NULL); diff --git a/daemon/main.c b/daemon/main.c index 32698170d..5b0e3181d 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -62,6 +62,7 @@ static unsigned int timeout; static unsigned int silent_timeout; static unsigned int final_timeout; static unsigned int redis_expires = 86400; +static unsigned int redis_multikey = 0; static int port_min = 30000; static int port_max = 40000; static int max_sessions = -1; @@ -298,6 +299,7 @@ static void options(int *argc, char ***argv) { { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, { "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" }, + { "redis-multikey", 0, 0, G_OPTION_ARG_INT, &redis_multikey, "Use multiple redis keys for storing the call (old behaviour) DEPRECATED", "INT" }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -621,6 +623,7 @@ no_kernel: } mc.redis_expires_secs = redis_expires; + mc.redis_multikey = redis_multikey; ctx->m->conf = mc; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5195c6a6c..ffd482029 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1471,8 +1471,13 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { out: ca = sfd->call ? : NULL; - if (ca && update) - redis_update(ca, ca->callmaster->conf.redis_write); + if (ca && update) { + if (ca->callmaster->conf.redis_multikey) { + redis_update(ca, ca->callmaster->conf.redis_write); + } else { + redis_update_onekey(ca, ca->callmaster->conf.redis_write); + } + } done: log_info_clear(); } diff --git a/daemon/redis.c b/daemon/redis.c index 49fb976fc..7823e9154 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -25,10 +25,9 @@ #include "recording.h" #include "rtplib.h" - - - - +#include +#include +#include INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -62,9 +61,23 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define STR(x) (x)->s, (size_t) (x)->len #define STR_R(x) (x)->str, (size_t) (x)->len #define S_LEN(s,l) (s), (size_t) (l) +#define STRSTR(x) (x)->s #endif +static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id, enum call_type); +static int redis_check_conn(struct redis *r); +static void json_restore_call(struct redis *r, struct callmaster *m, redisReply *id, enum call_type type); + +static int str_cut(char *str, int begin, int len) { + int l = strlen(str); + + if (len < 0) len = l - begin; + if (begin + len > l) len = l - begin; + memmove(str + begin, str + begin + len, l - len + 1); + + return len; +} static void redis_pipe(struct redis *r, const char *fmt, ...) { va_list ap; @@ -105,7 +118,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) { } - /* called with r->lock held */ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) { redisReply *rp; @@ -124,8 +136,6 @@ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type } - - /* called with r->lock held */ static void redis_consume(struct redis *r) { redisReply *rp; @@ -138,8 +148,6 @@ static void redis_consume(struct redis *r) { } - - /* called with r->lock held if necessary */ static int redis_connect(struct redis *r, int wait) { struct timeval tv; @@ -233,20 +241,8 @@ err: return -1; } -int str_cut(char *str, int begin, int len) { - int l = strlen(str); - - if (len < 0) len = l - begin; - if (begin + len > l) len = l - begin; - memmove(str + begin, str + begin + len, l - len + 1); - - return len; -} - -static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id, enum call_type); -static int redis_check_conn(struct redis *r); -void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { +void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) { struct callmaster *cm = privdata; struct redis *r = 0; @@ -258,7 +254,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { // sanity checks if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on onRedisNotification"); + rlog(LOG_ERROR, "Struct callmaster is NULL on on_redis_notification"); return; } @@ -283,13 +279,22 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { if (rr->elements != 4) goto err; - char *pch = strstr(rr->element[2]->str, "notifier-"); - if (pch == NULL) { - rlog(LOG_ERROR,"Redis-Notifier: The substring 'notifier-' has not been found in the redis notification !\n"); - goto err; + char *pch =0; + if (cm->conf.redis_multikey) { + pch = strstr(rr->element[2]->str, "notifier-"); + if (pch == NULL) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'notifier-' to determine the redis key has not been found in the redis notification !\n"); + goto err; + } + pch += strlen("notifier-"); + } else { + pch = strstr(rr->element[2]->str, "json-"); + if (pch == NULL) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); + goto err; + } } - // extract from __keyspace@__ prefix p = strstr(rr->element[2]->str, "@"); ++p; @@ -312,16 +317,30 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } - pch += strlen("notifier-"); str_cut(rr->element[2]->str,0,pch-rr->element[2]->str); rr->element[2]->len = strlen(rr->element[2]->str); rlog(LOG_DEBUG,"Redis-Notifier:%s:%d: Processing call with callid: %s\n", rr->element[3]->str, r->db, rr->element[2]->str); - str_init(&callid,rr->element[2]->str); + // strip off json- prefix from callid + str_init_len(&callid,rr->element[2]->str+strlen("json-"),strlen(rr->element[2]->str)-strlen("json-")); - if (strncmp(rr->element[3]->str,"sadd",4)==0) + if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0) redis_restore_call(r, cm, rr->element[2], CT_FOREIGN_CALL); + if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) { + c = call_get(&callid, cm); + if (c) { + rwlock_unlock_w(&c->master_lock); + if (IS_FOREIGN_CALL(c)) + call_destroy(c); + else { + rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: %s\n", rr->element[2]->str); + goto err; + } + } + json_restore_call(r, cm, rr->element[2], CT_FOREIGN_CALL); + } + if (strncmp(rr->element[3]->str,"del",3)==0) { c = call_get(&callid, cm); if (!c) { @@ -329,6 +348,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } rwlock_unlock_w(&c->master_lock); + if (!IS_FOREIGN_CALL(c)) { + rlog(LOG_WARN, "Redis-Notifier: Ignoring DEL received for an OWN call: %s\n", rr->element[2]->str); + goto err; + } call_destroy(c); } @@ -458,21 +481,22 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a return -1; } - switch (action) { + if (cm->conf.redis_multikey) { + switch (action) { case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "psubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "punsubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void *) cm, "punsubscribe") != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL"); return -1; } @@ -480,6 +504,31 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a default: rlog(LOG_ERROR, "No subscribe action found: %d", action); return -1; + } + } else { + switch (action) { + case SUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i*:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i*:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_ALL: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); + return -1; + } + break; + default: + rlog(LOG_ERROR, "No subscribe action found: %d", action); + return -1; + } } return 0; @@ -647,7 +696,6 @@ err: } - static void redis_close(struct redis *r) { if (r->ctx) redisFree(r->ctx); @@ -656,7 +704,6 @@ static void redis_close(struct redis *r) { } - /* must be called with r->lock held */ static int redis_check_conn(struct redis *r) { // try redis connection @@ -698,6 +745,12 @@ static void redis_delete_list(struct redis *r, const str *callid, const char *pr redis_pipe(r, "DEL %s-"PB"-%u", prefix, STR(callid), i); } +/* called with r->lock held and c->master_lock held */ +static void redis_delete_call_json(struct call *c, struct redis *r) { + redis_pipe(r, "DEL json-"PB"", STR(&c->callid)); + redis_consume(r); +} + /* called with r->lock held and c->master_lock held */ static void redis_delete_call(struct call *c, struct redis *r) { redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); @@ -719,7 +772,90 @@ static void redis_delete_call(struct call *c, struct redis *r) { redis_consume(r); } +// stolen from libhiredis +extern redisReply *createReplyObject(int type); +/* Create a reply object */ +inline redisReply *createReplyObject(int type) { + redisReply *r = calloc(1,sizeof(*r)); + + if (r == NULL) + return NULL; + + r->type = type; + return r; +} + +static void free_func(void* p){ + free(p); +} + +static int json_get_hash(struct redis_hash *out, struct call* c, + const char *key, const redisReply *which, + unsigned int id) +{ + char key_concatted[256]; + memset(key_concatted,0,256); + + if (!c) + goto err; + + if (id == -1) { + sprintf(key_concatted, "%s-%s",key,which->str); + } else { + sprintf(key_concatted, "%s-%s-%u",key,which->str,id); + } + + + if (!json_reader_read_member (c->root_reader, key_concatted)) { + rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); + goto err; + } + + out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free_func, free_func); + if (!out->ht) + goto err; + out->rr = 0; + + gchar **members = json_reader_list_members(c->root_reader); + gchar **orig_members = members; + for (int i=0; i < json_reader_count_members (c->root_reader); ++i) { + + if (!json_reader_read_member (c->root_reader, *members)) { + rlog(LOG_ERROR, "Could not read json member: %s",*members); + goto err3; + } + out->rr = createReplyObject(REDIS_REPLY_STRING); + if (!out->rr) { + rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)"); + goto err3; + } + out->rr->str = (char*)json_reader_get_string_value(c->root_reader); + out->rr->len = strlen(out->rr->str); + + char* tmp = strdup(*members); + + if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) { + goto err3; + } + + json_reader_end_member(c->root_reader); + + ++members; + } // for + g_strfreev(orig_members); + json_reader_end_member (c->root_reader); + + return 0; + +err3: + g_strfreev(members); + if (out->rr) + free(out->rr); + g_hash_table_destroy(out->ht); +err: + return -1; +} static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const redisReply *which, @@ -751,7 +887,8 @@ static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *k return 0; err3: - freeReplyObject(out->rr); + if (out->rr) + freeReplyObject(out->rr); err2: g_hash_table_destroy(out->ht); err: @@ -760,7 +897,9 @@ err: static void redis_destroy_hash(struct redis_hash *rh) { - freeReplyObject(rh->rr); + + if (rh->rr) + freeReplyObject(rh->rr); g_hash_table_destroy(rh->ht); } static void redis_destroy_list(struct redis_list *rl) { @@ -773,7 +912,19 @@ static void redis_destroy_list(struct redis_list *rl) { free(rl->ptrs); } +static void json_destroy_hash(struct redis_hash *rh) { + g_hash_table_destroy(rh->ht); +} +static void json_destroy_list(struct redis_list *rl) { + unsigned int i; + + for (i = 0; i < rl->len; i++) { + json_destroy_hash(&rl->rh[i]); + } + free(rl->rh); + free(rl->ptrs); +} static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) { redisReply *r; @@ -876,6 +1027,40 @@ static void *redis_list_get_ptr(struct redis_list *list, struct redis_hash *rh, return NULL; return redis_list_get_idx_ptr(list, idx); } + +static int json_build_list_cb(GQueue *q, struct call *c, const char *key, const str *callid, + unsigned int idx, struct redis_list *list, + int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) +{ + str s; + char key_concatted[256]; + memset(&key_concatted,0,256); + + sprintf(key_concatted, "%s-%s-%u",key,callid->s,idx); + + if (!json_reader_read_member (c->root_reader, key_concatted)) + rlog(LOG_ERROR,"Key in json not found:%s",key_concatted); + for (int jidx=0; jidx < json_reader_count_elements(c->root_reader); ++jidx) { + if (!json_reader_read_element(c->root_reader,jidx)) + rlog(LOG_ERROR,"Element in array not found."); + // if (json_reader_get_type() != G_TYPE_STRING) { + // return -1; + // } + const char* value = json_reader_get_string_value(c->root_reader); + if (!value) + rlog(LOG_ERROR,"String in json not found."); + str_init_len(&s, (char*)value , strlen(value)); + if (cb(&s, q, list, ptr)) { + return -1; + } + json_reader_end_element(c->root_reader); + } + json_reader_end_member (c->root_reader); + + return 0; +} + + static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid, unsigned int idx, struct redis_list *list, int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) @@ -909,11 +1094,53 @@ static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) g_queue_push_tail(q, redis_list_get_idx_ptr(list, (unsigned) j)); return 0; } + +static int json_build_list(GQueue *q, struct call *c, const char *key, const str *callid, + unsigned int idx, struct redis_list *list) +{ + return json_build_list_cb(q, c, key, callid, idx, list, rbl_cb_simple, NULL); +} + static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid, unsigned int idx, struct redis_list *list) { return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL); } + +static int json_get_list_hash(struct redis_list *out, struct call* c, + const char *key, const redisReply *id, + const struct redis_hash *rh, const char *rh_num_key) +{ + unsigned int i; + + if (redis_hash_get_unsigned(&out->len, rh, rh_num_key)) + return -1; + out->rh = malloc(sizeof(*out->rh) * out->len); + memset(out->rh,0,sizeof(*out->rh) * out->len); + if (!out->rh) + return -1; + out->ptrs = malloc(sizeof(*out->ptrs) * out->len); + if (!out->ptrs) + goto err1; + + for (i = 0; i < out->len; i++) { + if (json_get_hash(&out->rh[i], c, key, id, i)) + goto err2; + } + + return 0; + +err2: + free(out->ptrs); + while (i) { + i--; + json_destroy_hash(&out->rh[i]); + } +err1: + free(out->rh); + return -1; +} + static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const redisReply *id, const struct redis_hash *rh, const char *rh_num_key) { @@ -998,8 +1225,6 @@ static int redis_hash_get_crypto_context(struct crypto_context *out, const struc return 0; } - - static int redis_sfds(struct call *c, struct redis_list *sfds) { unsigned int i; str family, intf_name; @@ -1243,6 +1468,35 @@ static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams) return 0; } +static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_list *medias) +{ + unsigned int i; + struct call_monologue *ml, *other_ml; + GQueue q = G_QUEUE_INIT; + GList *l; + + for (i = 0; i < tags->len; i++) { + ml = tags->ptrs[i]; + + ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); + + if (json_build_list(&q, c, "other_tags", &c->callid, i, tags)) + return -1; + for (l = q.head; l; l = l->next) { + other_ml = l->data; + if (!other_ml) + return -1; + g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml); + } + g_queue_clear(&q); + + if (json_build_list(&ml->medias, c, "medias", &c->callid, i, medias)) + return -1; + } + + return 0; +} + static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) { unsigned int i; @@ -1272,6 +1526,31 @@ static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *t return 0; } +static int json_link_streams(struct call *c, struct redis_list *streams, + struct redis_list *sfds, struct redis_list *medias) +{ + unsigned int i; + struct packet_stream *ps; + + for (i = 0; i < streams->len; i++) { + ps = streams->ptrs[i]; + + ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); + ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); + ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); + ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); + ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); + + if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds)) + return -1; + + if (ps->media) + __rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types); + } + + return 0; +} + static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, struct redis_list *sfds, struct redis_list *medias) { @@ -1297,6 +1576,26 @@ static int redis_link_streams(struct redis *r, struct call *c, struct redis_list return 0; } +static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, + struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) +{ + unsigned int i; + struct call_media *med; + + for (i = 0; i < medias->len; i++) { + med = medias->ptrs[i]; + + med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); + if (!med->monologue) + return -1; + if (json_build_list(&med->streams, c, "streams", &c->callid, i, streams)) + return -1; + if (json_build_list(&med->endpoint_maps, c, "maps", &c->callid, i, maps)) + return -1; + } + return 0; +} + static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) { @@ -1346,6 +1645,21 @@ static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *pt return 0; } +static int json_link_maps(struct redis *r, struct call *c, struct redis_list *maps, + struct redis_list *sfds) +{ + unsigned int i; + struct endpoint_map *em; + + for (i = 0; i < maps->len; i++) { + em = maps->ptrs[i]; + + if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", &c->callid, em->unique_id, sfds, + rbl_cb_intf_sfds, em)) + return -1; + } + return 0; +} static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps, struct redis_list *sfds) @@ -1363,6 +1677,160 @@ static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *m return 0; } +static void json_restore_call(struct redis *r, struct callmaster *m, redisReply *id, enum call_type type) { + redisReply* rr_jsonStr; + struct redis_hash call; + struct redis_list tags, sfds, streams, medias, maps; + struct call *c = NULL; + str s ; + const char *err = 0; + int i; +#if !(GLIB_CHECK_VERSION(2,36,0)) + g_type_init(); +#endif + JsonReader *root_reader =0; + JsonParser *parser =0; + char orig_json_callid[512]; memset(orig_json_callid,0,512); + memcpy(orig_json_callid,id->str,strlen(id->str)); + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET %s",id->str); + if (!rr_jsonStr) { + rlog(LOG_ERR, "Could not retrieve json data from redis for callid: %s", id->str); + goto err1; + } + + // strip off json- prefix from callid + str_cut(id->str, 0, strlen("json-")); + id->len = strlen(id->str); + str_init_len(&s, id->str, id->len); + + parser = json_parser_new(); + if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) { + rlog(LOG_DEBUG, "Could not parse json data !"); + goto err1; + } + root_reader = json_reader_new (json_parser_get_root (parser)); + if (!root_reader) { + rlog(LOG_DEBUG, "Could not read json data !"); + goto err1; + } + + c = call_get_or_create(&s, m, type); + err = "failed to create call struct"; + if (!c) + goto err1; + + c->root_reader = root_reader; // attach the json to the call in order to restore data from there + + err = "call already exists"; + if (c->last_signal) + goto err2; + err = "'call' data incomplete"; + if (json_get_hash(&call, c, "json", id, -1)) + goto err2; + err = "'tags' incomplete"; + if (json_get_list_hash(&tags, c, "tag", id, &call, "num_tags")) + goto err3; + err = "'sfds' incomplete"; + if (json_get_list_hash(&sfds, c, "sfd", id, &call, "num_sfds")) + goto err4; + err = "'streams' incomplete"; + if (json_get_list_hash(&streams, c, "stream", id, &call, "num_streams")) + goto err5; + err = "'medias' incomplete"; + if (json_get_list_hash(&medias, c, "media", id, &call, "num_medias")) + goto err6; + err = "'maps' incomplete"; + if (json_get_list_hash(&maps, c, "map", id, &call, "num_maps")) + goto err7; + + err = "missing 'created' timestamp"; + if (redis_hash_get_time_t(&c->created, &call, "created")) + goto err8; + err = "missing 'last signal' timestamp"; + if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) + goto err8; + if (redis_hash_get_int(&i, &call, "tos")) + c->tos = 184; + else + c->tos = i; + redis_hash_get_time_t(&c->deleted, &call, "deleted"); + redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); + if (!redis_hash_get_str(&s, &call, "created_from")) + c->created_from = call_strdup(c, s.s); + if (!redis_hash_get_str(&s, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &s); + + err = "missing 'redis_hosted_db' value"; + if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) + goto err8; + + err = "failed to create sfds"; + if (redis_sfds(c, &sfds)) + goto err8; + err = "failed to create streams"; + if (redis_streams(c, &streams)) + goto err8; + err = "failed to create tags"; + if (redis_tags(c, &tags)) + goto err8; + err = "failed to create medias"; + if (redis_medias(r, c, &medias)) + goto err8; + err = "failed to create maps"; + if (redis_maps(c, &maps)) + goto err8; + + err = "failed to link sfds"; + if (redis_link_sfds(&sfds, &streams)) + goto err8; + err = "failed to link streams"; + if (json_link_streams(c, &streams, &sfds, &medias)) + goto err8; + err = "failed to link tags"; + if (json_link_tags(c, &tags, &medias)) + goto err8; + err = "failed to link medias"; + if (json_link_medias(r, c, &medias, &streams, &maps, &tags)) + goto err8; + err = "failed to link maps"; + if (json_link_maps(r, c, &maps, &sfds)) + goto err8; + + err = NULL; + +err8: + json_destroy_list(&maps); +err7: + json_destroy_list(&medias); +err6: + json_destroy_list(&streams); +err5: + json_destroy_list(&sfds); +err4: + json_destroy_list(&tags); +err3: + json_destroy_hash(&call); +err2: + rwlock_unlock_w(&c->master_lock); +err1: + if (root_reader) + g_object_unref (root_reader); + if (parser) + g_object_unref (parser); + if (rr_jsonStr) + freeReplyObject(rr_jsonStr); + log_info_clear(); + if (err) { + rlog(LOG_WARNING, "Failed to restore call ID '%.*s' from Redis: %s", REDIS_FMT(id), err); + if (c) + call_destroy(c); + else + redisCommandNR(m->conf.redis_write->ctx, "DEL %s", orig_json_callid); + } + if (c) + obj_put(c); +} static void redis_restore_recording(struct call *c, struct redis_hash *call) { str s; @@ -1519,7 +1987,11 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - redis_restore_call(r, ctx->m, call, CT_OWN_CALL); + if (ctx->m->conf.redis_multikey) { + redis_restore_call(r, ctx->m, call, CT_OWN_CALL); + } else { + json_restore_call(r, ctx->m, call, CT_OWN_CALL); + } mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -1547,7 +2019,11 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); + if (m->conf.redis_multikey) { + calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); + } else { + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); + } if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -1582,7 +2058,66 @@ err: } +static int json_update_crypto_params(JsonBuilder *builder, const char *pref, const str *callid, + unsigned int unique_id, + const char *key, const struct crypto_params *p) +{ + char tmp[2048]; ZERO(tmp); + + if (!p->crypto_suite) + return -1; + + sprintf(tmp,"%s-crypto_suite",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + json_builder_add_string_value (builder, p->crypto_suite->name); + + sprintf(tmp,"%s-master_key",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + snprintf(tmp,sizeof(p->master_key), "%s",p->master_key); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + sprintf(tmp,"%s-master_salt",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + snprintf(tmp,sizeof(p->master_salt), "%s",p->master_salt); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + sprintf(tmp,"%s-unenc-srtp",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + sprintf(tmp,"%i",p->session_params.unencrypted_srtp); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + sprintf(tmp,"%s-unenc-srtcp",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + sprintf(tmp,"%i",p->session_params.unencrypted_srtcp); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + sprintf(tmp,"%s-unauth-srtp",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + sprintf(tmp,"%i",p->session_params.unauthenticated_srtp); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + if (p->mki) { + sprintf(tmp,"%s-mki",key); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + snprintf(tmp,p->mki_len, "%s",p->mki); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + return 0; +} static int redis_update_crypto_params(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, @@ -1607,6 +2142,27 @@ static int redis_update_crypto_params(struct redis *r, const char *pref, const s return 0; } + +static void json_update_crypto_context(JsonBuilder *builder, const char *pref, const str *callid, + unsigned int unique_id, + const struct crypto_context *c) +{ + char tmp[2048]; ZERO(tmp); + + if (json_update_crypto_params(builder, pref, callid, unique_id, "", &c->params)) + return; + + json_builder_set_member_name (builder, "last_index"); + sprintf(tmp,"%lu",c->last_index); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "ssrc"); + sprintf(tmp,"%u",(unsigned) c->ssrc); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); +} + static void redis_update_crypto_context(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, const struct crypto_context *c) @@ -1634,6 +2190,25 @@ static void redis_update_stats(struct redis *r, const char *pref, const str *cal key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), key, atomic64_get(&s->errors)); } + +static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, const str *callid, + unsigned int unique_id, + const struct dtls_fingerprint *f) +{ + char tmp[2048]; ZERO(tmp); + + if (!f->hash_func) + return; + + json_builder_set_member_name (builder, "hash_func"); + json_builder_add_string_value (builder, f->hash_func->name); + + json_builder_set_member_name (builder, "fingerprint"); + snprintf(tmp,sizeof(f->digest), "%s",f->digest); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); +} + static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, const struct dtls_fingerprint *f) @@ -1645,6 +2220,590 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, con f->hash_func->name, S_LEN(f->digest, sizeof(f->digest))); } +/** + * encodes the few (k,v) pairs for one call under one json structure + */ + +char* redis_encode_json(struct call *c) { + + GList *l=0,*k=0, *m=0, *n=0; + struct endpoint_map *ep; + struct call_media *media; + struct rtp_payload_type *pt; + struct stream_fd *sfd; + struct packet_stream *ps; + struct intf_list *il; + struct call_monologue *ml, *ml2; +#if !(GLIB_CHECK_VERSION(2,36,0)) + g_type_init(); +#endif + JsonBuilder *builder = json_builder_new (); + + char tmp[2048]; ZERO(tmp); + + json_builder_begin_object (builder); + { + sprintf(tmp,"json-%s",STRSTR(&c->callid)); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + + { + json_builder_set_member_name (builder, "created"); + sprintf(tmp,"%ld",(long int) c->created); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "last_signal"); + sprintf(tmp,"%ld",(long int) c->last_signal); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "tos"); + sprintf(tmp,"%u",(int) c->tos); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "deleted"); + sprintf(tmp,"%ld",(long int) c->deleted); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "num_sfds"); + sprintf(tmp,"%u",g_queue_get_length(&c->stream_fds)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "num_streams"); + sprintf(tmp,"%u", g_queue_get_length(&c->streams)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "num_medias"); + sprintf(tmp,"%u", g_queue_get_length(&c->medias)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "num_tags"); + sprintf(tmp,"%u", g_queue_get_length(&c->monologues)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "num_maps"); + sprintf(tmp,"%u", g_queue_get_length(&c->endpoint_maps)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "ml_deleted"); + sprintf(tmp,"%ld", (long int) c->ml_deleted); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "created_from"); + sprintf(tmp,"%s",c->created_from); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "created_from_addr"); + sprintf(tmp,"%s",sockaddr_print_buf(&c->created_from_addr)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "redis_hosted_db"); + sprintf(tmp,"%u",c->redis_hosted_db); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + } + + json_builder_end_object (builder); + + for (l = c->stream_fds.head; l; l = l->next) { + sfd = l->data; + + sprintf(tmp,"sfd-%s-%u",STRSTR(&c->callid),sfd->unique_id); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + + { + json_builder_set_member_name (builder, "pref_family"); + sprintf(tmp,"%s",sfd->local_intf->logical->preferred_family->rfc_name); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "localport"); + sprintf(tmp,"%u",sfd->socket.local.port); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "logical_intf"); + sprintf(tmp,"%s",STRSTR(&sfd->local_intf->logical->name)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "local_intf_uid"); + sprintf(tmp,"%u",sfd->local_intf->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "stream"); + sprintf(tmp,"%u",sfd->stream->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_update_crypto_context(builder, "sfd", &c->callid, sfd->unique_id, &sfd->crypto); + + } + json_builder_end_object (builder); + + } // --- for + + for (l = c->streams.head; l; l = l->next) { + ps = l->data; + + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); + + sprintf(tmp,"stream-%s-%u",STRSTR(&c->callid),ps->unique_id); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + + { + json_builder_set_member_name (builder, "media"); + sprintf(tmp,"%u",ps->media->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "sfd"); + sprintf(tmp,"%u",ps->selected_sfd ? ps->selected_sfd->unique_id : -1); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "rtp_sink"); + sprintf(tmp,"%u",ps->rtp_sink ? ps->rtp_sink->unique_id : -1); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "rtcp_sink"); + sprintf(tmp,"%u",ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "rtcp_sibling"); + sprintf(tmp,"%u",ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "last_packet"); + sprintf(tmp,UINT64F,atomic64_get(&ps->last_packet)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "ps_flags"); + sprintf(tmp,"%u",ps->ps_flags); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "component"); + sprintf(tmp,"%u",ps->component); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + // this is redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "endpoint", &ps->endpoint); + json_builder_set_member_name (builder, "endpoint"); + sprintf(tmp,"%s",endpoint_print_buf(&ps->endpoint)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + // next is: + // redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "advertised_endpoint", + // &ps->advertised_endpoint); + json_builder_set_member_name (builder, "advertised_endpoint"); + sprintf(tmp,"%s",endpoint_print_buf(&ps->advertised_endpoint)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + // next is: + // redis_pipe(r, "HMSET %s-"PB"-%u %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"", + // pref, STR(callid), unique_id, + // key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), + // key, atomic64_get(&s->errors)); + // redis_update_stats(r, "stream", &c->callid, ps->unique_id, "stats", &ps->stats); + + json_builder_set_member_name (builder, "stats-packets"); + sprintf(tmp,"%ld",atomic64_get(&ps->stats.packets)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "stats-bytes"); + sprintf(tmp,"%ld",atomic64_get(&ps->stats.bytes)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "stats-errors"); + sprintf(tmp,"%ld",atomic64_get(&ps->stats.errors)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_update_crypto_context(builder, "stream", &c->callid, ps->unique_id, &ps->crypto); + + } + + json_builder_end_object (builder); + + // stream_sfds was here before + mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->out_lock); + + } // --- for streams.head + + + for (l = c->streams.head; l; l = l->next) { + ps = l->data; + + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); + + sprintf(tmp,"stream_sfds-%s-%u",STRSTR(&c->callid),ps->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (k = ps->sfds.head; k; k = k->next) { + sfd = k->data; + sprintf(tmp,"%u",sfd->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->out_lock); + } + + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + sprintf(tmp,"tag-%s-%u",STRSTR(&c->callid),ml->unique_id); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + { + json_builder_set_member_name (builder, "created"); + sprintf(tmp,"%llu",(long long unsigned) ml->created); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "active"); + sprintf(tmp,"%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "deleted"); + sprintf(tmp,"%llu",(long long unsigned) ml->deleted); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + if (ml->tag.s) { + json_builder_set_member_name (builder, "tag"); + sprintf(tmp,"%s",STRSTR(&ml->tag)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + if (ml->viabranch.s) { + json_builder_set_member_name (builder, "via-branch"); + sprintf(tmp,"%s",STRSTR(&ml->viabranch)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + } + json_builder_end_object (builder); + + // other_tags and medias- was here before + + } // --- for monologues.head + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + // -- we do it again here since the jsonbuilder is linear straight forward + k = g_hash_table_get_values(ml->other_tags); + sprintf(tmp,"other_tags-%s-%u",STRSTR(&c->callid),ml->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = k; m; m = m->next) { + ml2 = m->data; + sprintf(tmp,"%u",ml2->unique_id); + json_builder_add_string_value(builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + g_list_free(k); + + sprintf(tmp,"medias-%s-%u",STRSTR(&c->callid),ml->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (k = ml->medias.head; k; k = k->next) { + media = k->data; + sprintf(tmp,"%u",media->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + } + + + for (l = c->medias.head; l; l = l->next) { + media = l->data; + + sprintf(tmp,"media-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + { + json_builder_set_member_name (builder, "tag"); + sprintf(tmp,"%u",media->monologue->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "index"); + sprintf(tmp,"%u",media->index); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "type"); + sprintf(tmp,"%s",STRSTR(&media->type)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "protocol"); + sprintf(tmp,"%s",media->protocol ? media->protocol->name : ""); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "desired_family"); + sprintf(tmp,"%s",media->desired_family ? media->desired_family->rfc_name : ""); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "sdes_in_tag"); + sprintf(tmp,"%u",media->sdes_in.tag); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "sdes_out_tag"); + sprintf(tmp,"%u",media->sdes_out.tag); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "logical_intf"); + sprintf(tmp,"%s",STRSTR(&media->logical_intf->name)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "media_flags"); + sprintf(tmp,"%u",media->media_flags); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_update_crypto_params(builder, "media", &c->callid, media->unique_id, "sdes_in", + &media->sdes_in.params); + json_update_crypto_params(builder, "media", &c->callid, media->unique_id, "sdes_out", + &media->sdes_out.params); + json_update_dtls_fingerprint(builder, "media", &c->callid, media->unique_id, &media->fingerprint); + + // streams and maps- and payload_types- was here before + + } + json_builder_end_object (builder); + + } // --- for medias.head + + // -- we do it again here since the jsonbuilder is linear straight forward + for (l = c->medias.head; l; l = l->next) { + media = l->data; + + sprintf(tmp,"streams-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = media->streams.head; m; m = m->next) { + ps = m->data; + sprintf(tmp,"%u",ps->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + sprintf(tmp,"maps-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = media->endpoint_maps.head; m; m = m->next) { + ep = m->data; + sprintf(tmp,"%u",ep->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + k = g_hash_table_get_values(media->rtp_payload_types); + sprintf(tmp,"payload_types-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = k; m; m = m->next) { + pt = m->data; + sprintf(tmp,"%u/%s/%u/%s", + pt->payload_type, STRSTR(&pt->encoding), + pt->clock_rate, STRSTR(&pt->encoding_parameters)); + json_builder_add_string_value(builder, tmp); + } + json_builder_end_array (builder); + + g_list_free(k); + + } + + for (l = c->endpoint_maps.head; l; l = l->next) { + ep = l->data; + + sprintf(tmp,"map-%s-%u",STRSTR(&c->callid),ep->unique_id); + json_builder_set_member_name (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + { + + json_builder_set_member_name (builder, "wildcard"); + sprintf(tmp,"%i",ep->wildcard); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "num_ports"); + sprintf(tmp,"%u",ep->num_ports); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "intf_preferred_family"); + sprintf(tmp,"%s",ep->logical_intf->preferred_family->rfc_name); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + json_builder_set_member_name (builder, "logical_intf"); + sprintf(tmp,"%s",STRSTR(&ep->logical_intf->name)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + // next is: + // redis_update_endpoint(r, "map", &c->callid, ep->unique_id, "endpoint", &ep->endpoint); + json_builder_set_member_name (builder, "endpoint"); + sprintf(tmp,"%s",endpoint_print_buf(&ep->endpoint)); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + + // map_sfds was here before !!! + + } + json_builder_end_object (builder); + + } // --- for c->endpoint_maps.head + + // -- we do it again here since the jsonbuilder is linear straight forward + for (l = c->endpoint_maps.head; l; l = l->next) { + ep = l->data; + + sprintf(tmp,"map_sfds-%s-%u",STRSTR(&c->callid),ep->unique_id); + json_builder_set_member_name (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = ep->intf_sfds.head; m; m = m->next) { + il = m->data; + sprintf(tmp,"loc-%u",il->local_intf->unique_id); + json_builder_add_string_value(builder, tmp); + ZERO(tmp); + for (n = il->list.head; n; n = n->next) { + sfd = n->data; + sprintf(tmp,"%u",sfd->unique_id); + json_builder_add_string_value (builder, tmp); + ZERO(tmp); + } + } + json_builder_end_array (builder); + } + } + json_builder_end_object (builder); + + JsonGenerator *gen = json_generator_new (); + JsonNode * root = json_builder_get_root (builder); + json_generator_set_root (gen, root); + char* result = json_generator_to_data (gen, NULL); + + json_node_free (root); + g_object_unref (gen); + g_object_unref (builder); + + return result; + +} + + +void redis_update_onekey(struct call *c, struct redis *r) { + unsigned int redis_expires_s; + + if (!r) + return; + + mutex_lock(&r->lock); + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + return ; + } + + rwlock_lock_r(&c->master_lock); + + redis_expires_s = c->callmaster->conf.redis_expires_secs; + + c->redis_hosted_db = r->db; + if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { + rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); + goto err; + } + + char* result = redis_encode_json(c); + if (!result) + goto err; + + redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); + redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); + + redis_consume(r); + + if (result) + free(result); + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + + return; +err: + + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + if (r->ctx->err) + rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); + redisFree(r->ctx); + r->ctx = NULL; + +} static void redis_update_recording(struct redis *r, struct call *c) { struct recording *rec; @@ -1658,7 +2817,6 @@ static void redis_update_recording(struct redis *r, struct call *c) { } - /* * Redis data structure: * @@ -1984,7 +3142,11 @@ void redis_delete(struct call *c, struct redis *r) { if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) goto err; - redis_delete_call(c, r); + if (c->callmaster->conf.redis_multikey) { + redis_delete_call(c, r); + } else { + redis_delete_call_json(c, r); + } rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock); diff --git a/daemon/redis.h b/daemon/redis.h index 0321abd77..a1be63291 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -91,23 +91,17 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) #endif - - - - #define rlog(l, x...) ilog(l | LOG_FLAG_RESTORE, x) - - #define REDIS_FMT(x) (x)->len, (x)->str - void redis_notify_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); int redis_restore(struct callmaster *, struct redis *); void redis_update(struct call *, struct redis *); +void redis_update_onekey(struct call *c, struct redis *r); void redis_delete(struct call *, struct redis *); void redis_wipe(struct redis *); int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action); diff --git a/debian/control b/debian/control index 6cf0a4e73..36a91c787 100644 --- a/debian/control +++ b/debian/control @@ -13,6 +13,7 @@ Build-Depends: debhelper (>= 5), libevent-dev (>= 2.0), libglib2.0-dev (>= 2.30), libhiredis-dev, + libjson-glib-dev, libpcap0.8-dev | libpcap-dev, libpcre3-dev, libssl-dev (>= 1.0.1), diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 432a06082..7b770ce52 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -74,6 +74,7 @@ fi [ -z "$REDIS_WRITE_AUTH_PW" ] || export RTPENGINE_REDIS_WRITE_AUTH_PW="$REDIS_WRITE_AUTH_PW" [ -z "$REDIS_NUM_THREADS" ] || OPTIONS="$OPTIONS --redis-num-threads=$REDIS_NUM_THREADS" [ -z "$REDIS_EXPIRES" ] || OPTIONS="$OPTIONS --redis-expires=$REDIS_EXPIRES" +[ -z "$REDIS_MULTIKEY" ] || OPTIONS="$OPTIONS --redis-multikey=$REDIS_MULTIKEY" [ -z "$NO_REDIS_REQUIRED" -o \( "$NO_REDIS_REQUIRED" != "1" -a "$NO_REDIS_REQUIRED" != "yes" \) ] || OPTIONS="$OPTIONS --no-redis-required" [ -z "$B2B_URL" ] || OPTIONS="$OPTIONS --b2b-url=$B2B_URL" [ -z "$NO_FALLBACK" -o \( "$NO_FALLBACK" != "1" -a "$NO_FALLBACK" != "yes" \) ] || OPTIONS="$OPTIONS --no-fallback"