diff --git a/README.md b/README.md index bbed85197..9ead94c14 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,8 @@ option and which are reproduced below: -w, --redis-write=[PW@]IP:PORT/INT Connect to Redis write database -k, --subscribe-keyspace Subscription keyspace list --redis-num-threads=INT Number of Redis restore threads - --redis-expires=INT Expire time in seconds for redis keys + --redis-expires=INT Expire time in seconds for redis keys + --redis-multikey Use multiple redis keys for storing the call (old behaviour) DEPRECATED -q, --no-redis-required Start even if can't connect to redis databases -b, --b2b-url=STRING XMLRPC URL of B2B UA -L, --log-level=INT Mask log priorities above this level @@ -426,6 +427,10 @@ The options are described in more detail below. Expire time in seconds for redis keys. Default is 86400. +* --redis-multikey + + Use multiple redis keys for storing the call (old behaviour) DEPRECATED + * -q, --no-redis-required When this paramter is present or NO_REDIS_REQUIRED='yes' or '1' in config file, rtpengine starts even if there is no initial connection to redis databases(either to -r or to -w or to both redis). diff --git a/daemon/Makefile b/daemon/Makefile index 7eca49501..9843099a7 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -8,6 +8,7 @@ CFLAGS+= `pkg-config --cflags zlib` CFLAGS+= `pkg-config --cflags openssl` CFLAGS+= `pkg-config --cflags libevent_pthreads` CFLAGS+= `pcre-config --cflags` +CFLAGS+= `pkg-config --cflags json-glib-1.0` CFLAGS+= -I. -I../kernel-module/ -I../lib/ CFLAGS+= -D_GNU_SOURCE @@ -30,6 +31,7 @@ LDFLAGS+= -lpcap LDFLAGS+= `pcre-config --libs` LDFLAGS+= `xmlrpc-c-config client --libs` LDFLAGS+= -lhiredis +LDFLAGS+= `pkg-config --libs json-glib-1.0` include ../lib/lib.Makefile diff --git a/daemon/call.c b/daemon/call.c index c3b0a10d9..fc08a4390 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -594,8 +594,13 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); - if (update) - redis_update(ps->call, m->conf.redis_write); + if (update) { + if (m->conf.redis_multikey) { + redis_update(ps->call, m->conf.redis_write); + } else { + redis_update_onekey(ps->call, m->conf.redis_write); + } + } next: g_hash_table_remove(hlp.addr_sfd, &ep); diff --git a/daemon/call.h b/daemon/call.h index d8f19c7aa..2dc84b3b9 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -5,7 +5,8 @@ /* XXX split everything into call_signalling.[ch] and call_packets.[ch] or w/e */ - +#include +#include #include #include @@ -443,6 +444,7 @@ struct call { unsigned int foreign_call; // created_via_redis_notify call struct recording *recording; + JsonReader *root_reader; }; struct callmaster_config { @@ -460,7 +462,8 @@ struct callmaster_config { struct event_base *redis_notify_event_base; GQueue *redis_subscribed_keyspaces; struct redisAsyncContext *redis_notify_async_context; - unsigned int redis_expires_secs; + unsigned int redis_expires_secs; + unsigned int redis_multikey; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index e08e708f4..8bd2fc992 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -186,7 +186,11 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - redis_update(c, m->conf.redis_write); + if (m->conf.redis_multikey) { + redis_update(c, m->conf.redis_write); + } else { + redis_update_onekey(c, m->conf.redis_write); + } gettimeofday(&(monologue->started), NULL); @@ -334,7 +338,11 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - redis_update(c, m->conf.redis_write); + if (m->conf.redis_multikey) { + redis_update(c, m->conf.redis_write); + } else { + redis_update_onekey(c, m->conf.redis_write); + } ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -764,10 +772,16 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster } rwlock_unlock_w(&call->master_lock); - if (!flags.no_redis_update) - redis_update(call, m->conf.redis_write); - else + + if (!flags.no_redis_update) { + if (m->conf.redis_multikey) { + redis_update(call, m->conf.redis_write); + } else { + redis_update_onekey(call,m->conf.redis_write); + } + } else { ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); + } obj_put(call); gettimeofday(&(monologue->started), NULL); diff --git a/daemon/main.c b/daemon/main.c index 32698170d..d125ce179 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -62,6 +62,7 @@ static unsigned int timeout; static unsigned int silent_timeout; static unsigned int final_timeout; static unsigned int redis_expires = 86400; +static unsigned int redis_multikey = 0; static int port_min = 30000; static int port_max = 40000; static int max_sessions = -1; @@ -298,6 +299,7 @@ static void options(int *argc, char ***argv) { { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, { "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" }, + { "redis-multikey", 0, 0, G_OPTION_ARG_NONE, &redis_multikey, "Use multiple redis keys for storing the call (old behaviour) DEPRECATED", NULL }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -502,6 +504,11 @@ static void init_everything() { #if !GLIB_CHECK_VERSION(2,32,0) g_thread_init(NULL); #endif + +#if !(GLIB_CHECK_VERSION(2,36,0)) + g_type_init(); +#endif + if (!_log_stderr) openlog("rtpengine", LOG_PID | LOG_NDELAY, _log_facility); signals(); @@ -621,6 +628,7 @@ no_kernel: } mc.redis_expires_secs = redis_expires; + mc.redis_multikey = redis_multikey; ctx->m->conf = mc; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5195c6a6c..ffd482029 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1471,8 +1471,13 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { out: ca = sfd->call ? : NULL; - if (ca && update) - redis_update(ca, ca->callmaster->conf.redis_write); + if (ca && update) { + if (ca->callmaster->conf.redis_multikey) { + redis_update(ca, ca->callmaster->conf.redis_write); + } else { + redis_update_onekey(ca, ca->callmaster->conf.redis_write); + } + } done: log_info_clear(); } diff --git a/daemon/redis.c b/daemon/redis.c index 49fb976fc..a51eb3e8e 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -24,11 +24,11 @@ #include "dtls.h" #include "recording.h" #include "rtplib.h" +#include "str.h" - - - - +#include +#include +#include INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -65,6 +65,12 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #endif +#define REDIS_FMT(x) (x)->len, (x)->str + +static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type); +static int redis_check_conn(struct redis *r); +static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type); + static void redis_pipe(struct redis *r, const char *fmt, ...) { va_list ap; @@ -105,7 +111,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) { } - /* called with r->lock held */ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) { redisReply *rp; @@ -124,8 +129,6 @@ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type } - - /* called with r->lock held */ static void redis_consume(struct redis *r) { redisReply *rp; @@ -138,8 +141,6 @@ static void redis_consume(struct redis *r) { } - - /* called with r->lock held if necessary */ static int redis_connect(struct redis *r, int wait) { struct timeval tv; @@ -233,32 +234,18 @@ err: return -1; } -int str_cut(char *str, int begin, int len) { - int l = strlen(str); - - if (len < 0) len = l - begin; - if (begin + len > l) len = l - begin; - memmove(str + begin, str + begin + len, l - len + 1); - - return len; -} - -static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id, enum call_type); -static int redis_check_conn(struct redis *r); -void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { +void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) { struct callmaster *cm = privdata; struct redis *r = 0; struct call *c = NULL; str callid; - char db_str[16]; memset(&db_str, 0, 8); - char *pdbstr = db_str; - char *p = 0; + str keyspace_id; // sanity checks if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on onRedisNotification"); + rlog(LOG_ERROR, "Struct callmaster is NULL on on_redis_notification"); return; } @@ -283,25 +270,36 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { if (rr->elements != 4) goto err; - char *pch = strstr(rr->element[2]->str, "notifier-"); - if (pch == NULL) { - rlog(LOG_ERROR,"Redis-Notifier: The substring 'notifier-' has not been found in the redis notification !\n"); + // format: __keyspace@__: + str_init_len(&keyspace_id, rr->element[2]->str, rr->element[2]->len); + + if (str_shift_cmp(&keyspace_id, "__keyspace@")) goto err; - } + // extract + char *endp; + r->db = strtoul(keyspace_id.s, &endp, 10); + if (endp == keyspace_id.s || *endp != '_') + goto err; + if (str_shift(&keyspace_id, endp - keyspace_id.s + 3)) + goto err; + if (keyspace_id.s[-1] != ':') + goto err; + // now at - // extract from __keyspace@__ prefix - p = strstr(rr->element[2]->str, "@"); - ++p; - while (isdigit(*p)) { - *pdbstr = *p; - ++pdbstr; ++p; - if (pdbstr-db_str>15) { - rlog(LOG_ERROR, "Could not extract keyspace db from notification."); + if (cm->conf.redis_multikey) { + if (str_shift_cmp(&keyspace_id, "notifier-")) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'notifier-' to determine the redis key has not been found in the redis notification !\n"); + goto err; + } + } else { + if (str_shift_cmp(&keyspace_id, "json-")) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); goto err; } } - r->db = atoi(db_str); + + callid = keyspace_id; // select the right db for restoring the call if (redisCommandNR(r->ctx, "SELECT %i", r->db)) { @@ -312,15 +310,22 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } - pch += strlen("notifier-"); - str_cut(rr->element[2]->str,0,pch-rr->element[2]->str); - rr->element[2]->len = strlen(rr->element[2]->str); - rlog(LOG_DEBUG,"Redis-Notifier:%s:%d: Processing call with callid: %s\n", rr->element[3]->str, r->db, rr->element[2]->str); - - str_init(&callid,rr->element[2]->str); + if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0) + redis_restore_call(r, cm, &callid, CT_FOREIGN_CALL); - if (strncmp(rr->element[3]->str,"sadd",4)==0) - redis_restore_call(r, cm, rr->element[2], CT_FOREIGN_CALL); + if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) { + c = call_get(&callid, cm); + if (c) { + rwlock_unlock_w(&c->master_lock); + if (IS_FOREIGN_CALL(c)) + call_destroy(c); + else { + rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: %s\n", rr->element[2]->str); + goto err; + } + } + json_restore_call(r, cm, &callid, CT_FOREIGN_CALL); + } if (strncmp(rr->element[3]->str,"del",3)==0) { c = call_get(&callid, cm); @@ -329,6 +334,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } rwlock_unlock_w(&c->master_lock); + if (!IS_FOREIGN_CALL(c)) { + rlog(LOG_WARN, "Redis-Notifier: Ignoring DEL received for an OWN call: %s\n", rr->element[2]->str); + goto err; + } call_destroy(c); } @@ -458,21 +467,22 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a return -1; } - switch (action) { + if (cm->conf.redis_multikey) { + switch (action) { case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "psubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "punsubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void *) cm, "punsubscribe") != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL"); return -1; } @@ -480,6 +490,31 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a default: rlog(LOG_ERROR, "No subscribe action found: %d", action); return -1; + } + } else { + switch (action) { + case SUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_ALL: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); + return -1; + } + break; + default: + rlog(LOG_ERROR, "No subscribe action found: %d", action); + return -1; + } } return 0; @@ -647,7 +682,6 @@ err: } - static void redis_close(struct redis *r) { if (r->ctx) redisFree(r->ctx); @@ -656,7 +690,6 @@ static void redis_close(struct redis *r) { } - /* must be called with r->lock held */ static int redis_check_conn(struct redis *r) { // try redis connection @@ -698,6 +731,12 @@ static void redis_delete_list(struct redis *r, const str *callid, const char *pr redis_pipe(r, "DEL %s-"PB"-%u", prefix, STR(callid), i); } +/* called with r->lock held and c->master_lock held */ +static void redis_delete_call_json(struct call *c, struct redis *r) { + redis_pipe(r, "DEL json-"PB"", STR(&c->callid)); + redis_consume(r); +} + /* called with r->lock held and c->master_lock held */ static void redis_delete_call(struct call *c, struct redis *r) { redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); @@ -719,10 +758,104 @@ static void redis_delete_call(struct call *c, struct redis *r) { redis_consume(r); } +INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const char* tmp, int len) { + char enc[len * 3 + 1]; + str_uri_encode_len(enc, tmp, len); + json_builder_add_string_value(builder,enc); +} +INLINE char* json_reader_get_string_value_uri_enc(JsonReader *root_reader, int *lenp) { + const char *str = json_reader_get_string_value(root_reader); + char *ret; + int len = str_uri_decode_len(&ret, str, strlen(str)); + if (lenp) + *lenp = len; + return ret; // must be free'd +} + +// stolen from libhiredis +/* Create a reply object */ +INLINE redisReply *createReplyObject(int type) { + redisReply *r = calloc(1,sizeof(*r)); + + if (r == NULL) + return NULL; + + r->type = type; + return r; +} + +static int json_get_hash(struct redis_hash *out, struct call* c, + const char *key, + unsigned int id) +{ + static unsigned int MAXKEYLENGTH = 512; + char key_concatted[MAXKEYLENGTH]; + int rc=0; + + if (!c) + goto err; + + if (id == -1) { + rc = snprintf(key_concatted, MAXKEYLENGTH, "%s",key); + } else { + rc = snprintf(key_concatted, MAXKEYLENGTH, "%s-%u",key,id); + } + if (rc>=MAXKEYLENGTH) + rlog(LOG_ERROR,"Json key too long."); + + + if (!json_reader_read_member(c->root_reader, key_concatted)) { + rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); + goto err; + } + + out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, freeReplyObject); + if (!out->ht) + goto err; + out->rr = 0; + + gchar **members = json_reader_list_members(c->root_reader); + gchar **orig_members = members; + + for (int i=0; i < json_reader_count_members (c->root_reader); ++i) { + + if (!json_reader_read_member(c->root_reader, *members)) { + rlog(LOG_ERROR, "Could not read json member: %s",*members); + goto err3; + } + out->rr = createReplyObject(REDIS_REPLY_STRING); + if (!out->rr) { + rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)"); + goto err3; + } + out->rr->str = json_reader_get_string_value_uri_enc(c->root_reader, &out->rr->len); + + char* tmp = strdup(*members); + + if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) { + goto err3; + } + + json_reader_end_member(c->root_reader); + + ++members; + } // for + g_strfreev(orig_members); + json_reader_end_member (c->root_reader); + + return 0; +err3: + g_strfreev(members); + if (out->rr) + freeReplyObject(out->rr); + g_hash_table_destroy(out->ht); +err: + return -1; +} -static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const redisReply *which, +static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const str *which, unsigned int id) { redisReply *k, *v; @@ -732,9 +865,9 @@ static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *k if (!out->ht) goto err; if (id == -1) - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR_R(which)); + out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR(which)); else - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"-%u", key, STR_R(which), id); + out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"-%u", key, STR(which), id); if (!out->rr) goto err2; @@ -751,7 +884,8 @@ static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *k return 0; err3: - freeReplyObject(out->rr); + if (out->rr) + freeReplyObject(out->rr); err2: g_hash_table_destroy(out->ht); err: @@ -760,7 +894,9 @@ err: static void redis_destroy_hash(struct redis_hash *rh) { - freeReplyObject(rh->rr); + + if (rh->rr) + freeReplyObject(rh->rr); g_hash_table_destroy(rh->ht); } static void redis_destroy_list(struct redis_list *rl) { @@ -773,7 +909,19 @@ static void redis_destroy_list(struct redis_list *rl) { free(rl->ptrs); } +static void json_destroy_hash(struct redis_hash *rh) { + g_hash_table_destroy(rh->ht); +} + +static void json_destroy_list(struct redis_list *rl) { + unsigned int i; + for (i = 0; i < rl->len; i++) { + json_destroy_hash(&rl->rh[i]); + } + free(rl->rh); + free(rl->ptrs); +} static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) { redisReply *r; @@ -876,6 +1024,37 @@ static void *redis_list_get_ptr(struct redis_list *list, struct redis_hash *rh, return NULL; return redis_list_get_idx_ptr(list, idx); } + +static int json_build_list_cb(GQueue *q, struct call *c, const char *key, + unsigned int idx, struct redis_list *list, + int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) +{ + str s; + char key_concatted[256]; + + snprintf(key_concatted, 256, "%s-%u", key, idx); + + if (!json_reader_read_member(c->root_reader, key_concatted)) + rlog(LOG_ERROR,"Key in json not found:%s",key_concatted); + for (int jidx=0; jidx < json_reader_count_elements(c->root_reader); ++jidx) { + if (!json_reader_read_element(c->root_reader,jidx)) + rlog(LOG_ERROR,"Element in array not found."); + char *strp = s.s = json_reader_get_string_value_uri_enc(c->root_reader, &s.len); + if (!strp) + rlog(LOG_ERROR,"String in json not found."); + if (cb(&s, q, list, ptr)) { + free(s.s); + return -1; + } + free(strp); + json_reader_end_element(c->root_reader); + } + json_reader_end_member (c->root_reader); + + return 0; +} + + static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid, unsigned int idx, struct redis_list *list, int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) @@ -909,12 +1088,53 @@ static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) g_queue_push_tail(q, redis_list_get_idx_ptr(list, (unsigned) j)); return 0; } + +static int json_build_list(GQueue *q, struct call *c, const char *key, const str *callid, + unsigned int idx, struct redis_list *list) +{ + return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL); +} + static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid, unsigned int idx, struct redis_list *list) { return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL); } -static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const redisReply *id, + +static int json_get_list_hash(struct redis_list *out, struct call* c, + const char *key, + const struct redis_hash *rh, const char *rh_num_key) +{ + unsigned int i; + + if (redis_hash_get_unsigned(&out->len, rh, rh_num_key)) + return -1; + out->rh = malloc(sizeof(*out->rh) * out->len); + if (!out->rh) + return -1; + out->ptrs = malloc(sizeof(*out->ptrs) * out->len); + if (!out->ptrs) + goto err1; + + for (i = 0; i < out->len; i++) { + if (json_get_hash(&out->rh[i], c, key, i)) + goto err2; + } + + return 0; + +err2: + free(out->ptrs); + while (i) { + i--; + json_destroy_hash(&out->rh[i]); + } +err1: + free(out->rh); + return -1; +} + +static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const str *id, const struct redis_hash *rh, const char *rh_num_key) { unsigned int i; @@ -953,19 +1173,24 @@ err1: static int redis_hash_get_crypto_params(struct crypto_params *out, const struct redis_hash *h, const char *k) { str s; int i; + const char *err; if (redis_hash_get_str_f(&s, h, "%s-crypto_suite", k)) return 1; out->crypto_suite = crypto_find_suite(&s); + err = "crypto suite not known"; if (!out->crypto_suite) - return -1; + goto err; + err = "master_key"; if (redis_hash_get_c_buf_f(out->master_key, h, "%s-master_key", k)) - return -1; + goto err; + err = "master_salt"; if (redis_hash_get_c_buf_f(out->master_salt, h, "%s-master_salt", k)) - return -1; + goto err; if (!redis_hash_get_str_f(&s, h, "%s-mki", k)) { + err = "mki too long"; if (s.len > 255) return -1; out->mki = malloc(s.len); @@ -981,6 +1206,10 @@ static int redis_hash_get_crypto_params(struct crypto_params *out, const struct out->session_params.unauthenticated_srtp = i; return 0; + +err: + rlog(LOG_ERR, "Crypto params error: %s", err); + return -1; } static int redis_hash_get_crypto_context(struct crypto_context *out, const struct redis_hash *h) { int ret; @@ -998,8 +1227,6 @@ static int redis_hash_get_crypto_context(struct crypto_context *out, const struc return 0; } - - static int redis_sfds(struct call *c, struct redis_list *sfds) { unsigned int i; str family, intf_name; @@ -1012,42 +1239,57 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) { struct stream_fd *sfd; socket_t *sock; int port; + const char *err; for (i = 0; i < sfds->len; i++) { rh = &sfds->rh[i]; + err = "'localport' key not present"; if (redis_hash_get_int(&port, rh, "localport")) - return -1; + goto err; + err = "'pref_family' key not present"; if (redis_hash_get_str(&family, rh, "pref_family")) - return -1; + goto err; + err = "'logical_intf' key not present"; if (redis_hash_get_str(&intf_name, rh, "logical_intf")) - return -1; + goto err; + err = "'local_intf_uid' key not present"; if (redis_hash_get_unsigned(&loc_uid, rh, "local_intf_uid")) - return -1; + goto err; + err = "socket family not known"; fam = get_socket_family_rfc(&family); if (!fam) - return -1; + goto err; + err = "logical interface not known"; lif = get_logical_interface(&intf_name, fam, 0); if (!lif) - return -1; + goto err; + err = "not enough local interfaces"; loc = g_queue_peek_nth(&lif->list, loc_uid); if (!loc) - return -1; + goto err; + err = "failed to open ports"; if (__get_consecutive_ports(&q, 1, port, loc->spec)) - return -1; + goto err; + err = "no port returned"; sock = g_queue_pop_head(&q); if (!sock) - return -1; + goto err; sfd = stream_fd_new(sock, c, loc); // XXX tos + err = "failed to config crypto context"; if (redis_hash_get_crypto_context(&sfd->crypto, rh)) - return -1; + goto err; sfds->ptrs[i] = sfd; } return 0; + +err: + rlog(LOG_ERR, "Error creating sfd: %s", err); + return -1; } static int redis_streams(struct call *c, struct redis_list *streams) { @@ -1134,6 +1376,62 @@ static int rbl_cb_plts(str *s, GQueue *q, struct redis_list *list, void *ptr) { g_hash_table_replace(med->rtp_payload_types, &pt->payload_type, pt); return 0; } +static int json_medias(struct redis *r, struct call *c, struct redis_list *medias) { + unsigned int i; + struct redis_hash *rh; + struct call_media *med; + str s; + + for (i = 0; i < medias->len; i++) { + rh = &medias->rh[i]; + + /* from call.c:__get_media() */ + med = uid_slice_alloc0(med, &c->medias); + med->call = c; + med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, + __payload_type_free); + + if (redis_hash_get_unsigned(&med->index, rh, "index")) + return -1; + if (redis_hash_get_str(&s, rh, "type")) + return -1; + call_str_cpy(c, &med->type, &s); + + if (redis_hash_get_str(&s, rh, "protocol")) + return -1; + med->protocol = transport_protocol(&s); + + if (redis_hash_get_str(&s, rh, "desired_family")) + return -1; + med->desired_family = get_socket_family_rfc(&s); + + if (redis_hash_get_str(&s, rh, "logical_intf") + || !(med->logical_intf = get_logical_interface(&s, med->desired_family, 0))) + { + rlog(LOG_ERR, "unable to find specified local interface"); + med->logical_intf = get_logical_interface(NULL, med->desired_family, 0); + } + + if (redis_hash_get_unsigned(&med->sdes_in.tag, rh, "sdes_in_tag")) + return -1; + if (redis_hash_get_unsigned(&med->sdes_out.tag, rh, "sdes_out_tag")) + return -1; + if (redis_hash_get_unsigned((unsigned int *) &med->media_flags, rh, + "media_flags")) + return -1; + if (redis_hash_get_crypto_params(&med->sdes_in.params, rh, "sdes_in") < 0) + return -1; + if (redis_hash_get_crypto_params(&med->sdes_out.params, rh, "sdes_out") < 0) + return -1; + + json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts, med); + /* XXX dtls */ + + medias->ptrs[i] = med; + } + + return 0; +} static int redis_medias(struct redis *r, struct call *c, struct redis_list *medias) { unsigned int i; struct redis_hash *rh; @@ -1243,7 +1541,7 @@ static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams) return 0; } -static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) +static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_list *medias) { unsigned int i; struct call_monologue *ml, *other_ml; @@ -1255,7 +1553,7 @@ static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *t ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); - if (redis_build_list(&q, r, "other_tags", &c->callid, i, tags)) + if (json_build_list(&q, c, "other_tags", &c->callid, i, tags)) return -1; for (l = q.head; l; l = l->next) { other_ml = l->data; @@ -1265,21 +1563,75 @@ static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *t } g_queue_clear(&q); - if (redis_build_list(&ml->medias, r, "medias", &c->callid, i, medias)) + if (json_build_list(&ml->medias, c, "medias", &c->callid, i, medias)) return -1; } return 0; } -static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, - struct redis_list *sfds, struct redis_list *medias) +static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) { unsigned int i; - struct packet_stream *ps; + struct call_monologue *ml, *other_ml; + GQueue q = G_QUEUE_INIT; + GList *l; - for (i = 0; i < streams->len; i++) { - ps = streams->ptrs[i]; + for (i = 0; i < tags->len; i++) { + ml = tags->ptrs[i]; + + ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); + + if (redis_build_list(&q, r, "other_tags", &c->callid, i, tags)) + return -1; + for (l = q.head; l; l = l->next) { + other_ml = l->data; + if (!other_ml) + return -1; + g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml); + } + g_queue_clear(&q); + + if (redis_build_list(&ml->medias, r, "medias", &c->callid, i, medias)) + return -1; + } + + return 0; +} + +static int json_link_streams(struct call *c, struct redis_list *streams, + struct redis_list *sfds, struct redis_list *medias) +{ + unsigned int i; + struct packet_stream *ps; + + for (i = 0; i < streams->len; i++) { + ps = streams->ptrs[i]; + + ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); + ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); + ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); + ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); + ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); + + if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds)) + return -1; + + if (ps->media) + __rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types); + } + + return 0; +} + +static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, + struct redis_list *sfds, struct redis_list *medias) +{ + unsigned int i; + struct packet_stream *ps; + + for (i = 0; i < streams->len; i++) { + ps = streams->ptrs[i]; ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); @@ -1297,6 +1649,26 @@ static int redis_link_streams(struct redis *r, struct call *c, struct redis_list return 0; } +static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, + struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) +{ + unsigned int i; + struct call_media *med; + + for (i = 0; i < medias->len; i++) { + med = medias->ptrs[i]; + + med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); + if (!med->monologue) + return -1; + if (json_build_list(&med->streams, c, "streams", &c->callid, i, streams)) + return -1; + if (json_build_list(&med->endpoint_maps, c, "maps", &c->callid, i, maps)) + return -1; + } + return 0; +} + static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) { @@ -1346,6 +1718,21 @@ static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *pt return 0; } +static int json_link_maps(struct redis *r, struct call *c, struct redis_list *maps, + struct redis_list *sfds) +{ + unsigned int i; + struct endpoint_map *em; + + for (i = 0; i < maps->len; i++) { + em = maps->ptrs[i]; + + if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", em->unique_id, sfds, + rbl_cb_intf_sfds, em)) + return -1; + } + return 0; +} static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps, struct redis_list *sfds) @@ -1363,6 +1750,152 @@ static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *m return 0; } +static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { + redisReply* rr_jsonStr; + struct redis_hash call; + struct redis_list tags, sfds, streams, medias, maps; + struct call *c = NULL; + const char *err = 0; + int i; + str s; + JsonReader *root_reader =0; + JsonParser *parser =0; + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(callid)); + if (!rr_jsonStr) { + rlog(LOG_ERR, "Could not retrieve json data from redis for callid: " STR_FORMAT, + STR_FMT(callid)); + goto err1; + } + + parser = json_parser_new(); + if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) { + rlog(LOG_DEBUG, "Could not parse json data !"); + goto err1; + } + root_reader = json_reader_new (json_parser_get_root (parser)); + if (!root_reader) { + rlog(LOG_DEBUG, "Could not read json data !"); + goto err1; + } + + c = call_get_or_create(callid, m, type); + err = "failed to create call struct"; + if (!c) + goto err1; + + c->root_reader = root_reader; // attach the json to the call in order to restore data from there + + err = "call already exists"; + if (c->last_signal) + goto err2; + err = "'call' data incomplete"; + if (json_get_hash(&call, c, "json", -1)) + goto err2; + err = "'tags' incomplete"; + if (json_get_list_hash(&tags, c, "tag", &call, "num_tags")) + goto err3; + err = "'sfds' incomplete"; + if (json_get_list_hash(&sfds, c, "sfd", &call, "num_sfds")) + goto err4; + err = "'streams' incomplete"; + if (json_get_list_hash(&streams, c, "stream", &call, "num_streams")) + goto err5; + err = "'medias' incomplete"; + if (json_get_list_hash(&medias, c, "media", &call, "num_medias")) + goto err6; + err = "'maps' incomplete"; + if (json_get_list_hash(&maps, c, "map", &call, "num_maps")) + goto err7; + + err = "missing 'created' timestamp"; + if (redis_hash_get_time_t(&c->created, &call, "created")) + goto err8; + err = "missing 'last signal' timestamp"; + if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) + goto err8; + if (redis_hash_get_int(&i, &call, "tos")) + c->tos = 184; + else + c->tos = i; + redis_hash_get_time_t(&c->deleted, &call, "deleted"); + redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); + if (!redis_hash_get_str(&s, &call, "created_from")) + c->created_from = call_strdup(c, s.s); + if (!redis_hash_get_str(&s, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &s); + + err = "missing 'redis_hosted_db' value"; + if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) + goto err8; + + err = "failed to create sfds"; + if (redis_sfds(c, &sfds)) + goto err8; + err = "failed to create streams"; + if (redis_streams(c, &streams)) + goto err8; + err = "failed to create tags"; + if (redis_tags(c, &tags)) + goto err8; + err = "failed to create medias"; + if (json_medias(r, c, &medias)) + goto err8; + err = "failed to create maps"; + if (redis_maps(c, &maps)) + goto err8; + + err = "failed to link sfds"; + if (redis_link_sfds(&sfds, &streams)) + goto err8; + err = "failed to link streams"; + if (json_link_streams(c, &streams, &sfds, &medias)) + goto err8; + err = "failed to link tags"; + if (json_link_tags(c, &tags, &medias)) + goto err8; + err = "failed to link medias"; + if (json_link_medias(r, c, &medias, &streams, &maps, &tags)) + goto err8; + err = "failed to link maps"; + if (json_link_maps(r, c, &maps, &sfds)) + goto err8; + + err = NULL; + +err8: + json_destroy_list(&maps); +err7: + json_destroy_list(&medias); +err6: + json_destroy_list(&streams); +err5: + json_destroy_list(&sfds); +err4: + json_destroy_list(&tags); +err3: + json_destroy_hash(&call); +err2: + rwlock_unlock_w(&c->master_lock); +err1: + if (root_reader) + g_object_unref (root_reader); + if (parser) + g_object_unref (parser); + if (rr_jsonStr) + freeReplyObject(rr_jsonStr); + log_info_clear(); + if (err) { + rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(callid), + err); + if (c) + call_destroy(c); + else + redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(callid)); + } + if (c) + obj_put(c); +} static void redis_restore_recording(struct call *c, struct redis_hash *call) { str s; @@ -1378,7 +1911,7 @@ static void redis_restore_recording(struct call *c, struct redis_hash *call) { } -static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id, enum call_type type) { +static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; @@ -1386,10 +1919,7 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi const char *err; int i; - str_init_len(&s, id->str, id->len); - //s.s = id->str; - //s.len = id->len; - c = call_get_or_create(&s, m, type); + c = call_get_or_create(id, m, type); err = "failed to create call struct"; if (!c) goto err1; @@ -1489,11 +2019,11 @@ err2: err1: log_info_clear(); if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '%.*s' from Redis: %s", REDIS_FMT(id), err); + rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(id), err); if (c) call_destroy(c); else - redisCommandNR(m->conf.redis_write->ctx, "SREM calls "PB"", STR_R(id)); + redisCommandNR(m->conf.redis_write->ctx, "SREM calls "PB"", STR(id)); } if (c) @@ -1512,6 +2042,7 @@ static void restore_thread(void *call_p, void *ctx_p) { struct thread_ctx *ctx = ctx_p; redisReply *call = call_p; struct redis *r; + str callid; rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call)); @@ -1519,7 +2050,14 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - redis_restore_call(r, ctx->m, call, CT_OWN_CALL); + str_init_len(&callid, call->str, call->len); + + if (ctx->m->conf.redis_multikey) { + redis_restore_call(r, ctx->m, &callid, CT_OWN_CALL); + } else { + if (str_shift_cmp(&callid, "json-") == 0) + json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); + } mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -1547,7 +2085,11 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); + if (m->conf.redis_multikey) { + calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); + } else { + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); + } if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -1581,8 +2123,55 @@ err: return ret; } +#define JSON_ADD_STRING(f...) do { \ + int len = snprintf(tmp,sizeof(tmp), f); \ + json_builder_add_string_value_uri_enc(builder, tmp, len); \ + } while (0) +#define JSON_SET_NSTRING(a,b,c,d) do { \ + snprintf(tmp,sizeof(tmp), a,b); \ + json_builder_set_member_name(builder, tmp); \ + JSON_ADD_STRING(c, d); \ + } while (0) +#define JSON_SET_NSTRING_CSTR(a,b,d) JSON_SET_NSTRING_LEN(a, b, strlen(d), d) +#define JSON_SET_NSTRING_LEN(a,b,l,d) do { \ + snprintf(tmp,sizeof(tmp), a,b); \ + json_builder_set_member_name(builder, tmp); \ + json_builder_add_string_value_uri_enc(builder, d, l); \ + } while (0) +#define JSON_SET_SIMPLE(a,c,d) do { \ + json_builder_set_member_name(builder, a); \ + JSON_ADD_STRING(c, d); \ + } while (0) +#define JSON_SET_SIMPLE_LEN(a,l,d) do { \ + json_builder_set_member_name(builder, a); \ + json_builder_add_string_value_uri_enc(builder, d, l); \ + } while (0) +#define JSON_SET_SIMPLE_CSTR(a,d) JSON_SET_SIMPLE_LEN(a, strlen(d), d) +#define JSON_SET_SIMPLE_STR(a,d) JSON_SET_SIMPLE_LEN(a, (d)->len, (d)->s) + +static int json_update_crypto_params(JsonBuilder *builder, const char *pref, + unsigned int unique_id, + const char *key, const struct crypto_params *p) +{ + char tmp[2048]; + + if (!p->crypto_suite) + return -1; + + JSON_SET_NSTRING_CSTR("%s-crypto_suite",key,p->crypto_suite->name); + JSON_SET_NSTRING_LEN("%s-master_key",key, sizeof(p->master_key), (char *) p->master_key); + JSON_SET_NSTRING_LEN("%s-master_salt",key, sizeof(p->master_salt), (char *) p->master_salt); + + JSON_SET_NSTRING("%s-unenc-srtp",key,"%i",p->session_params.unencrypted_srtp); + JSON_SET_NSTRING("%s-unenc-srtcp",key,"%i",p->session_params.unencrypted_srtcp); + JSON_SET_NSTRING("%s-unauth-srtp",key,"%i",p->session_params.unauthenticated_srtp); + if (p->mki) { + JSON_SET_NSTRING_LEN("%s-mki",key, p->mki_len, (char *) p->mki); + } + return 0; +} static int redis_update_crypto_params(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, @@ -1607,6 +2196,21 @@ static int redis_update_crypto_params(struct redis *r, const char *pref, const s return 0; } + +static void json_update_crypto_context(JsonBuilder *builder, const char *pref, + unsigned int unique_id, + const struct crypto_context *c) +{ + char tmp[2048]; + + if (json_update_crypto_params(builder, pref, unique_id, "", &c->params)) + return; + + JSON_SET_SIMPLE("last_index","%lu",c->last_index); + JSON_SET_SIMPLE("ssrc","%u",(unsigned) c->ssrc); + +} + static void redis_update_crypto_context(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, const struct crypto_context *c) @@ -1634,6 +2238,18 @@ static void redis_update_stats(struct redis *r, const char *pref, const str *cal key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), key, atomic64_get(&s->errors)); } + +static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, + unsigned int unique_id, + const struct dtls_fingerprint *f) +{ + if (!f->hash_func) + return; + + JSON_SET_SIMPLE_CSTR("hash_func",f->hash_func->name); + JSON_SET_SIMPLE_LEN("fingerprint", sizeof(f->digest), (char *) f->digest); +} + static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, const struct dtls_fingerprint *f) @@ -1645,6 +2261,352 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, con f->hash_func->name, S_LEN(f->digest, sizeof(f->digest))); } +/** + * encodes the few (k,v) pairs for one call under one json structure + */ + +char* redis_encode_json(struct call *c) { + + GList *l=0,*k=0, *m=0, *n=0; + struct endpoint_map *ep; + struct call_media *media; + struct rtp_payload_type *pt; + struct stream_fd *sfd; + struct packet_stream *ps; + struct intf_list *il; + struct call_monologue *ml, *ml2; + JsonBuilder *builder = json_builder_new (); + + char tmp[2048]; + + json_builder_begin_object (builder); + { + json_builder_set_member_name(builder, "json"); + + json_builder_begin_object (builder); + + { + JSON_SET_SIMPLE("created","%ld",(long int) c->created); + JSON_SET_SIMPLE("last_signal","%ld",(long int) c->last_signal); + JSON_SET_SIMPLE("tos","%u",(int) c->tos); + JSON_SET_SIMPLE("deleted","%ld",(long int) c->deleted); + JSON_SET_SIMPLE("num_sfds","%u",g_queue_get_length(&c->stream_fds)); + JSON_SET_SIMPLE("num_streams","%u",g_queue_get_length(&c->streams)); + JSON_SET_SIMPLE("num_medias","%u",g_queue_get_length(&c->medias)); + JSON_SET_SIMPLE("num_tags","%u",g_queue_get_length(&c->monologues)); + JSON_SET_SIMPLE("num_maps","%u",g_queue_get_length(&c->endpoint_maps)); + JSON_SET_SIMPLE("ml_deleted","%ld",(long int) c->ml_deleted); + JSON_SET_SIMPLE_CSTR("created_from",c->created_from); + JSON_SET_SIMPLE_CSTR("created_from_addr",sockaddr_print_buf(&c->created_from_addr)); + JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db); + + } + + json_builder_end_object (builder); + + for (l = c->stream_fds.head; l; l = l->next) { + sfd = l->data; + + snprintf(tmp, sizeof(tmp), "sfd-%u", sfd->unique_id); + json_builder_set_member_name(builder, tmp); + + json_builder_begin_object (builder); + + { + JSON_SET_SIMPLE_CSTR("pref_family",sfd->local_intf->logical->preferred_family->rfc_name); + JSON_SET_SIMPLE("localport","%u",sfd->socket.local.port); + JSON_SET_SIMPLE_STR("logical_intf",&sfd->local_intf->logical->name); + JSON_SET_SIMPLE("local_intf_uid","%u",sfd->local_intf->unique_id); + JSON_SET_SIMPLE("stream","%u",sfd->stream->unique_id); + + json_update_crypto_context(builder, "sfd", sfd->unique_id, &sfd->crypto); + + } + json_builder_end_object (builder); + + } // --- for + + for (l = c->streams.head; l; l = l->next) { + ps = l->data; + + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); + + snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id); + json_builder_set_member_name(builder, tmp); + + json_builder_begin_object (builder); + + { + JSON_SET_SIMPLE("media","%u",ps->media->unique_id); + JSON_SET_SIMPLE("sfd","%u",ps->selected_sfd ? ps->selected_sfd->unique_id : -1); + JSON_SET_SIMPLE("rtp_sink","%u",ps->rtp_sink ? ps->rtp_sink->unique_id : -1); + JSON_SET_SIMPLE("rtcp_sink","%u",ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1); + JSON_SET_SIMPLE("rtcp_sibling","%u",ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1); + JSON_SET_SIMPLE("last_packet",UINT64F,atomic64_get(&ps->last_packet)); + JSON_SET_SIMPLE("ps_flags","%u",ps->ps_flags); + JSON_SET_SIMPLE("component","%u",ps->component); + JSON_SET_SIMPLE_CSTR("endpoint",endpoint_print_buf(&ps->endpoint)); + JSON_SET_SIMPLE_CSTR("advertised_endpoint",endpoint_print_buf(&ps->advertised_endpoint)); + JSON_SET_SIMPLE("stats-packets","%ld",atomic64_get(&ps->stats.packets)); + JSON_SET_SIMPLE("stats-bytes","%ld",atomic64_get(&ps->stats.bytes)); + JSON_SET_SIMPLE("stats-errors","%ld",atomic64_get(&ps->stats.errors)); + + json_update_crypto_context(builder, "stream", ps->unique_id, &ps->crypto); + + } + + json_builder_end_object (builder); + + // stream_sfds was here before + mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->out_lock); + + } // --- for streams.head + + + for (l = c->streams.head; l; l = l->next) { + ps = l->data; + + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); + + snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (k = ps->sfds.head; k; k = k->next) { + sfd = k->data; + JSON_ADD_STRING("%u",sfd->unique_id); + } + json_builder_end_array (builder); + + mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->out_lock); + } + + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + snprintf(tmp, sizeof(tmp), "tag-%u", ml->unique_id); + json_builder_set_member_name(builder, tmp); + + json_builder_begin_object (builder); + { + + JSON_SET_SIMPLE("created","%llu",(long long unsigned) ml->created); + JSON_SET_SIMPLE("active","%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1); + JSON_SET_SIMPLE("deleted","%llu",(long long unsigned) ml->deleted); + + if (ml->tag.s) { + JSON_SET_SIMPLE_STR("tag",&ml->tag); + } + if (ml->viabranch.s) { + JSON_SET_SIMPLE_STR("via-branch",&ml->viabranch); + } + } + json_builder_end_object (builder); + + // other_tags and medias- was here before + + } // --- for monologues.head + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + // -- we do it again here since the jsonbuilder is linear straight forward + k = g_hash_table_get_values(ml->other_tags); + snprintf(tmp, sizeof(tmp), "other_tags-%u", ml->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (m = k; m; m = m->next) { + ml2 = m->data; + JSON_ADD_STRING("%u",ml2->unique_id); + } + json_builder_end_array (builder); + + g_list_free(k); + + snprintf(tmp, sizeof(tmp), "medias-%u", ml->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (k = ml->medias.head; k; k = k->next) { + media = k->data; + JSON_ADD_STRING("%u",media->unique_id); + } + json_builder_end_array (builder); + } + + + for (l = c->medias.head; l; l = l->next) { + media = l->data; + + snprintf(tmp, sizeof(tmp), "media-%u", media->unique_id); + json_builder_set_member_name(builder, tmp); + + json_builder_begin_object (builder); + { + JSON_SET_SIMPLE("tag","%u",media->monologue->unique_id); + JSON_SET_SIMPLE("index","%u",media->index); + JSON_SET_SIMPLE_STR("type",&media->type); + JSON_SET_SIMPLE_CSTR("protocol",media->protocol ? media->protocol->name : ""); + JSON_SET_SIMPLE_CSTR("desired_family",media->desired_family ? media->desired_family->rfc_name : ""); + JSON_SET_SIMPLE("sdes_in_tag","%u",media->sdes_in.tag); + JSON_SET_SIMPLE("sdes_out_tag","%u",media->sdes_out.tag); + JSON_SET_SIMPLE_STR("logical_intf",&media->logical_intf->name); + JSON_SET_SIMPLE("media_flags","%u",media->media_flags); + + json_update_crypto_params(builder, "media", media->unique_id, "sdes_in", + &media->sdes_in.params); + json_update_crypto_params(builder, "media", media->unique_id, "sdes_out", + &media->sdes_out.params); + json_update_dtls_fingerprint(builder, "media", media->unique_id, &media->fingerprint); + + // streams and maps- and payload_types- was here before + + } + json_builder_end_object (builder); + + } // --- for medias.head + + // -- we do it again here since the jsonbuilder is linear straight forward + // XXX can this be moved into the above json object? + for (l = c->medias.head; l; l = l->next) { + media = l->data; + + snprintf(tmp, sizeof(tmp), "streams-%u", media->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (m = media->streams.head; m; m = m->next) { + ps = m->data; + JSON_ADD_STRING("%u",ps->unique_id); + } + json_builder_end_array (builder); + + snprintf(tmp, sizeof(tmp), "maps-%u", media->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (m = media->endpoint_maps.head; m; m = m->next) { + ep = m->data; + JSON_ADD_STRING("%u",ep->unique_id); + } + json_builder_end_array (builder); + + k = g_hash_table_get_values(media->rtp_payload_types); + snprintf(tmp, sizeof(tmp), "payload_types-%u", media->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (m = k; m; m = m->next) { + pt = m->data; + JSON_ADD_STRING("%u/" STR_FORMAT "/%u/" STR_FORMAT, + pt->payload_type, STR_FMT(&pt->encoding), + pt->clock_rate, STR_FMT(&pt->encoding_parameters)); + } + json_builder_end_array (builder); + + g_list_free(k); + + } + + for (l = c->endpoint_maps.head; l; l = l->next) { + ep = l->data; + + snprintf(tmp, sizeof(tmp), "map-%u", ep->unique_id); + json_builder_set_member_name(builder, tmp); + + json_builder_begin_object (builder); + { + JSON_SET_SIMPLE("wildcard","%i",ep->wildcard); + JSON_SET_SIMPLE("num_ports","%u",ep->num_ports); + JSON_SET_SIMPLE_CSTR("intf_preferred_family",ep->logical_intf->preferred_family->rfc_name); + JSON_SET_SIMPLE_STR("logical_intf",&ep->logical_intf->name); + JSON_SET_SIMPLE_CSTR("endpoint",endpoint_print_buf(&ep->endpoint)); + + } + json_builder_end_object (builder); + + } // --- for c->endpoint_maps.head + + // -- we do it again here since the jsonbuilder is linear straight forward + for (l = c->endpoint_maps.head; l; l = l->next) { + ep = l->data; + + snprintf(tmp, sizeof(tmp), "map_sfds-%u", ep->unique_id); + json_builder_set_member_name(builder, tmp); + json_builder_begin_array (builder); + for (m = ep->intf_sfds.head; m; m = m->next) { + il = m->data; + JSON_ADD_STRING("loc-%u",il->local_intf->unique_id); + for (n = il->list.head; n; n = n->next) { + sfd = n->data; + JSON_ADD_STRING("%u",sfd->unique_id); + } + } + json_builder_end_array (builder); + } + } + json_builder_end_object (builder); + + JsonGenerator *gen = json_generator_new (); + JsonNode * root = json_builder_get_root (builder); + json_generator_set_root (gen, root); + char* result = json_generator_to_data (gen, NULL); + + json_node_free (root); + g_object_unref (gen); + g_object_unref (builder); + + return result; + +} + + +void redis_update_onekey(struct call *c, struct redis *r) { + unsigned int redis_expires_s; + + if (!r) + return; + + mutex_lock(&r->lock); + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + return ; + } + + rwlock_lock_r(&c->master_lock); + + redis_expires_s = c->callmaster->conf.redis_expires_secs; + + c->redis_hosted_db = r->db; + if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { + rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); + goto err; + } + + char* result = redis_encode_json(c); + if (!result) + goto err; + + redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); + redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); + + redis_consume(r); + + if (result) + free(result); + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + + return; +err: + + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + if (r->ctx->err) + rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); + redisFree(r->ctx); + r->ctx = NULL; + +} static void redis_update_recording(struct redis *r, struct call *c) { struct recording *rec; @@ -1658,7 +2620,6 @@ static void redis_update_recording(struct redis *r, struct call *c) { } - /* * Redis data structure: * @@ -1984,7 +2945,11 @@ void redis_delete(struct call *c, struct redis *r) { if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) goto err; - redis_delete_call(c, r); + if (c->callmaster->conf.redis_multikey) { + redis_delete_call(c, r); + } else { + redis_delete_call_json(c, r); + } rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock); diff --git a/daemon/redis.h b/daemon/redis.h index 0321abd77..ce4eddf67 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -91,23 +91,15 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) #endif - - - - #define rlog(l, x...) ilog(l | LOG_FLAG_RESTORE, x) - - -#define REDIS_FMT(x) (x)->len, (x)->str - - void redis_notify_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); int redis_restore(struct callmaster *, struct redis *); void redis_update(struct call *, struct redis *); +void redis_update_onekey(struct call *c, struct redis *r); void redis_delete(struct call *, struct redis *); void redis_wipe(struct redis *); int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action); diff --git a/daemon/str.c b/daemon/str.c index 8a5d94f1a..21d99e81f 100644 --- a/daemon/str.c +++ b/daemon/str.c @@ -48,3 +48,51 @@ char *rand_hex_str(char *rand_str, int num_bytes) { } return rand_str; } + + +static const char *hex_chars = "0123456789abcdef"; +int str_uri_encode_len(char *out, const char *in, int len) { + const char *end = in + len; + char *ori_out = out; + + while (in < end) { + if (*in < ' ' || *in > '~' || *in == '%' || *in == '\\' || *in == '\'' || *in == '"') { + *(out++) = '%'; + *(out++) = hex_chars[(*((unsigned char *) in)) >> 4]; + *(out++) = hex_chars[(*((unsigned char *) in)) & 0xf]; + in++; + continue; + } + + *(out++) = *(in++); + } + + *out = 0; + return out - ori_out; +} + +int str_uri_decode_len(char **out, const char *in, int in_len) { + const char *end = in + in_len; + *out = malloc(in_len + 1); + char *outp = *out; + + while (in < end) { + if (*in != '%') { + *(outp++) = (*in++); + continue; + } + + if (end - in < 3 || !g_ascii_isxdigit(in[1]) || !g_ascii_isxdigit(in[2])) { + free(*out); + *out = NULL; + return -1; + } + + unsigned char c = g_ascii_xdigit_value(in[1]) << 4 | g_ascii_xdigit_value(in[2]); + *(outp++) = c; + in += 3; + } + + *outp = 0; + return outp - *out; +} diff --git a/debian/control b/debian/control index fa2988706..ad42acc8b 100644 --- a/debian/control +++ b/debian/control @@ -13,6 +13,7 @@ Build-Depends: debhelper (>= 5), libevent-dev (>= 2.0), libglib2.0-dev (>= 2.30), libhiredis-dev, + libjson-glib-dev, libpcap0.8-dev | libpcap-dev, libpcre3-dev, libssl-dev (>= 1.0.1), diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 85123cc00..66b81c371 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -74,6 +74,7 @@ fi [ -z "$REDIS_WRITE_AUTH_PW" ] || export RTPENGINE_REDIS_WRITE_AUTH_PW="$REDIS_WRITE_AUTH_PW" [ -z "$REDIS_NUM_THREADS" ] || OPTIONS="$OPTIONS --redis-num-threads=$REDIS_NUM_THREADS" [ -z "$REDIS_EXPIRES" ] || OPTIONS="$OPTIONS --redis-expires=$REDIS_EXPIRES" +[ -z "$REDIS_MULTIKEY" ] || OPTIONS="$OPTIONS --redis-multikey=$REDIS_MULTIKEY" [ -z "$NO_REDIS_REQUIRED" -o \( "$NO_REDIS_REQUIRED" != "1" -a "$NO_REDIS_REQUIRED" != "yes" \) ] || OPTIONS="$OPTIONS --no-redis-required" [ -z "$B2B_URL" ] || OPTIONS="$OPTIONS --b2b-url=$B2B_URL" [ -z "$NO_FALLBACK" -o \( "$NO_FALLBACK" != "1" -a "$NO_FALLBACK" != "yes" \) ] || OPTIONS="$OPTIONS --no-fallback" diff --git a/lib/str.h b/lib/str.h index dc32fc89d..6eaba3a73 100644 --- a/lib/str.h +++ b/lib/str.h @@ -56,6 +56,8 @@ INLINE str *str_dup(const str *s); INLINE str *str_chunk_insert(GStringChunk *c, const str *s); /* shifts pointer by len chars and decrements len. returns -1 if buffer too short, 0 otherwise */ INLINE int str_shift(str *s, int len); +/* eats the supplied string from the beginning of s. returns -1 if string head doesn't match */ +INLINE int str_shift_cmp(str *s, const char *); /* binary compares str object with memory chunk of equal size */ INLINE int str_memcmp(const str *s, void *m); /* locate a substring within a string, returns character index or -1 */ @@ -85,6 +87,12 @@ gboolean str_equal(gconstpointer a, gconstpointer b); /* destroy function, frees a slice-alloc'd str */ void str_slice_free(void *); +/* saves "in" into "out" pseudo-URI encoded. "out" point to a buffer with sufficient length. returns length */ +int str_uri_encode_len(char *out, const char *in, int in_len); +INLINE int str_uri_encode(char *out, const str *in); +/* reverse of the above. stores newly allocated buffer in *out. returns length */ +int str_uri_decode_len(char **out, const char *in, int in_len); + @@ -105,6 +113,16 @@ INLINE int str_shift(str *s, int len) { s->len -= len; return 0; } +INLINE int str_shift_cmp(str *s, const char *t) { + int len = strlen(t); + if (s->len < len) + return -1; + if (memcmp(s->s, t, len)) + return -1; + s->s += len; + s->len -= len; + return 0; +} INLINE char *str_chr(const str *s, int c) { return memchr(s->s, c, s->len); } @@ -292,6 +310,10 @@ INLINE int str_token(str *new_token, str *ori_and_remainder, int sep) { return 0; } +INLINE int str_uri_encode(char *out, const str *in) { + return str_uri_encode_len(out, in->s, in->len); +} + /* Generates a hex string representing n random bytes. len(rand_str) = 2*num_bytes + 1 */ char *rand_hex_str(char *rand_str, int num_bytes);