From f3364d9e7d65cf07cce35dde1867c1a07e516ca3 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Mon, 20 Feb 2017 16:10:54 +0100 Subject: [PATCH 1/7] Omits redisreply in redis restore and eliminates 'multikey' feature --- daemon/redis.c | 465 +++---------------------------------------------- daemon/redis.h | 17 +- 2 files changed, 33 insertions(+), 449 deletions(-) diff --git a/daemon/redis.c b/daemon/redis.c index a51eb3e8e..405083aca 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -67,7 +67,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define REDIS_FMT(x) (x)->len, (x)->str -static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type); static int redis_check_conn(struct redis *r); static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type); @@ -310,10 +309,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; } - if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0) - redis_restore_call(r, cm, &callid, CT_FOREIGN_CALL); - - if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) { + if (strncmp(rr->element[3]->str,"set",3)==0) { c = call_get(&callid, cm); if (c) { rwlock_unlock_w(&c->master_lock); @@ -785,8 +781,7 @@ INLINE redisReply *createReplyObject(int type) { } static int json_get_hash(struct redis_hash *out, struct call* c, - const char *key, - unsigned int id) + const char *key, unsigned int id) { static unsigned int MAXKEYLENGTH = 512; char key_concatted[MAXKEYLENGTH]; @@ -803,7 +798,6 @@ static int json_get_hash(struct redis_hash *out, struct call* c, if (rc>=MAXKEYLENGTH) rlog(LOG_ERROR,"Json key too long."); - if (!json_reader_read_member(c->root_reader, key_concatted)) { rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); goto err; @@ -812,7 +806,6 @@ static int json_get_hash(struct redis_hash *out, struct call* c, out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, freeReplyObject); if (!out->ht) goto err; - out->rr = 0; gchar **members = json_reader_list_members(c->root_reader); gchar **orig_members = members; @@ -823,16 +816,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c, rlog(LOG_ERROR, "Could not read json member: %s",*members); goto err3; } - out->rr = createReplyObject(REDIS_REPLY_STRING); - if (!out->rr) { - rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)"); - goto err3; - } - out->rr->str = json_reader_get_string_value_uri_enc(c->root_reader, &out->rr->len); - + str_init_len(&out->s,(char*)json_reader_get_string_value(c->root_reader),strlen((char*)json_reader_get_string_value(c->root_reader))); char* tmp = strdup(*members); - if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) { + if (g_hash_table_insert_check(out->ht, tmp, str_dup(&out->s)) != TRUE) { + ilog(LOG_WARNING,"Key %s already exists", tmp); goto err3; } @@ -847,68 +835,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c, err3: g_strfreev(members); - if (out->rr) - freeReplyObject(out->rr); g_hash_table_destroy(out->ht); err: return -1; } - -static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const str *which, - unsigned int id) -{ - redisReply *k, *v; - int i; - - out->ht = g_hash_table_new(g_str_hash, g_str_equal); - if (!out->ht) - goto err; - if (id == -1) - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR(which)); - else - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"-%u", key, STR(which), id); - if (!out->rr) - goto err2; - - for (i = 1; i < out->rr->elements; i += 2) { - k = out->rr->element[i - 1]; - v = out->rr->element[i]; - if (k->type != REDIS_REPLY_STRING || v->type != REDIS_REPLY_STRING) - continue; - - if (g_hash_table_insert_check(out->ht, k->str, v) != TRUE) - goto err3; - } - - return 0; - -err3: - if (out->rr) - freeReplyObject(out->rr); -err2: - g_hash_table_destroy(out->ht); -err: - return -1; -} - - -static void redis_destroy_hash(struct redis_hash *rh) { - - if (rh->rr) - freeReplyObject(rh->rr); - g_hash_table_destroy(rh->ht); -} -static void redis_destroy_list(struct redis_list *rl) { - unsigned int i; - - for (i = 0; i < rl->len; i++) { - redis_destroy_hash(&rl->rh[i]); - } - free(rl->rh); - free(rl->ptrs); -} - static void json_destroy_hash(struct redis_hash *rh) { g_hash_table_destroy(rh->ht); } @@ -924,7 +855,7 @@ static void json_destroy_list(struct redis_list *rl) { } static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) { - redisReply *r; + str *r; r = g_hash_table_lookup(h->ht, k); if (!r) { @@ -932,7 +863,7 @@ static int redis_hash_get_str(str *out, const struct redis_hash *h, const char * out->len = 0; return -1; } - out->s = r->str; + out->s = r->s; out->len = r->len; return 0; } @@ -1054,34 +985,6 @@ static int json_build_list_cb(GQueue *q, struct call *c, const char *key, return 0; } - -static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid, - unsigned int idx, struct redis_list *list, - int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) -{ - redisReply *rr; - int i; - str s; - - rr = redis_get(r, REDIS_REPLY_ARRAY, "LRANGE %s-"PB"-%u 0 -1", key, STR(callid), idx); - if (!rr) - return -1; - - for (i = 0; i < rr->elements; i++) { - if (rr->element[i]->type != REDIS_REPLY_STRING) { - freeReplyObject(rr); - return -1; - } - str_init_len(&s, rr->element[i]->str, rr->element[i]->len); - if (cb(&s, q, list, ptr)) { - freeReplyObject(rr); - return -1; - } - } - - freeReplyObject(rr); - return 0; -} static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) { int j; j = str_to_i(s, 0); @@ -1095,12 +998,6 @@ static int json_build_list(GQueue *q, struct call *c, const char *key, const str return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL); } -static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid, - unsigned int idx, struct redis_list *list) -{ - return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL); -} - static int json_get_list_hash(struct redis_list *out, struct call* c, const char *key, const struct redis_hash *rh, const char *rh_num_key) @@ -1134,41 +1031,6 @@ err1: return -1; } -static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const str *id, - const struct redis_hash *rh, const char *rh_num_key) -{ - unsigned int i; - - if (redis_hash_get_unsigned(&out->len, rh, rh_num_key)) - return -1; - out->rh = malloc(sizeof(*out->rh) * out->len); - if (!out->rh) - return -1; - out->ptrs = malloc(sizeof(*out->ptrs) * out->len); - if (!out->ptrs) - goto err1; - - for (i = 0; i < out->len; i++) { - if (redis_get_hash(&out->rh[i], r, key, id, i)) - goto err2; - } - - return 0; - -err2: - free(out->ptrs); - while (i) { - i--; - redis_destroy_hash(&out->rh[i]); - } -err1: - free(out->rh); - return -1; -} - - - - /* can return 1, 0 or -1 */ static int redis_hash_get_crypto_params(struct crypto_params *out, const struct redis_hash *h, const char *k) { str s; @@ -1432,62 +1294,6 @@ static int json_medias(struct redis *r, struct call *c, struct redis_list *media return 0; } -static int redis_medias(struct redis *r, struct call *c, struct redis_list *medias) { - unsigned int i; - struct redis_hash *rh; - struct call_media *med; - str s; - - for (i = 0; i < medias->len; i++) { - rh = &medias->rh[i]; - - /* from call.c:__get_media() */ - med = uid_slice_alloc0(med, &c->medias); - med->call = c; - med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, - __payload_type_free); - - if (redis_hash_get_unsigned(&med->index, rh, "index")) - return -1; - if (redis_hash_get_str(&s, rh, "type")) - return -1; - call_str_cpy(c, &med->type, &s); - - if (redis_hash_get_str(&s, rh, "protocol")) - return -1; - med->protocol = transport_protocol(&s); - - if (redis_hash_get_str(&s, rh, "desired_family")) - return -1; - med->desired_family = get_socket_family_rfc(&s); - - if (redis_hash_get_str(&s, rh, "logical_intf") - || !(med->logical_intf = get_logical_interface(&s, med->desired_family, 0))) - { - rlog(LOG_ERR, "unable to find specified local interface"); - med->logical_intf = get_logical_interface(NULL, med->desired_family, 0); - } - - if (redis_hash_get_unsigned(&med->sdes_in.tag, rh, "sdes_in_tag")) - return -1; - if (redis_hash_get_unsigned(&med->sdes_out.tag, rh, "sdes_out_tag")) - return -1; - if (redis_hash_get_unsigned((unsigned int *) &med->media_flags, rh, - "media_flags")) - return -1; - if (redis_hash_get_crypto_params(&med->sdes_in.params, rh, "sdes_in") < 0) - return -1; - if (redis_hash_get_crypto_params(&med->sdes_out.params, rh, "sdes_out") < 0) - return -1; - - redis_build_list_cb(NULL, r, "payload_types", &c->callid, i, NULL, rbl_cb_plts, med); - /* XXX dtls */ - - medias->ptrs[i] = med; - } - - return 0; -} static int redis_maps(struct call *c, struct redis_list *maps) { unsigned int i; @@ -1570,35 +1376,6 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ return 0; } -static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) -{ - unsigned int i; - struct call_monologue *ml, *other_ml; - GQueue q = G_QUEUE_INIT; - GList *l; - - for (i = 0; i < tags->len; i++) { - ml = tags->ptrs[i]; - - ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); - - if (redis_build_list(&q, r, "other_tags", &c->callid, i, tags)) - return -1; - for (l = q.head; l; l = l->next) { - other_ml = l->data; - if (!other_ml) - return -1; - g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml); - } - g_queue_clear(&q); - - if (redis_build_list(&ml->medias, r, "medias", &c->callid, i, medias)) - return -1; - } - - return 0; -} - static int json_link_streams(struct call *c, struct redis_list *streams, struct redis_list *sfds, struct redis_list *medias) { @@ -1624,31 +1401,6 @@ static int json_link_streams(struct call *c, struct redis_list *streams, return 0; } -static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, - struct redis_list *sfds, struct redis_list *medias) -{ - unsigned int i; - struct packet_stream *ps; - - for (i = 0; i < streams->len; i++) { - ps = streams->ptrs[i]; - - ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); - ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); - ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); - ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); - ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); - - if (redis_build_list(&ps->sfds, r, "stream_sfds", &c->callid, i, sfds)) - return -1; - - if (ps->media) - __rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types); - } - - return 0; -} - static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) { @@ -1669,26 +1421,6 @@ static int json_link_medias(struct redis *r, struct call *c, struct redis_list * return 0; } -static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias, - struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) -{ - unsigned int i; - struct call_media *med; - - for (i = 0; i < medias->len; i++) { - med = medias->ptrs[i]; - - med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); - if (!med->monologue) - return -1; - if (redis_build_list(&med->streams, r, "streams", &c->callid, i, streams)) - return -1; - if (redis_build_list(&med->endpoint_maps, r, "maps", &c->callid, i, maps)) - return -1; - } - return 0; -} - static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *ptr) { int i; struct intf_list *il; @@ -1734,37 +1466,25 @@ static int json_link_maps(struct redis *r, struct call *c, struct redis_list *ma return 0; } -static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps, - struct redis_list *sfds) -{ - unsigned int i; - struct endpoint_map *em; - - for (i = 0; i < maps->len; i++) { - em = maps->ptrs[i]; - - if (redis_build_list_cb(&em->intf_sfds, r, "map_sfds", &c->callid, em->unique_id, sfds, - rbl_cb_intf_sfds, em)) - return -1; - } - return 0; -} - -static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { +static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { redisReply* rr_jsonStr; struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; + str callid; + const char *err = 0; int i; - str s; JsonReader *root_reader =0; JsonParser *parser =0; - rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(callid)); + // TODO: Maybe refactor + str_init_len(&callid, id->s, id->len); + str_shift(&callid,strlen("json-")); + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(&callid)); if (!rr_jsonStr) { - rlog(LOG_ERR, "Could not retrieve json data from redis for callid: " STR_FORMAT, - STR_FMT(callid)); + rlog(LOG_ERR, "Could not retrieve json data from redis for key: %s", id->s); goto err1; } @@ -1779,7 +1499,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * goto err1; } - c = call_get_or_create(callid, m, type); + c = call_get_or_create(&callid, m, type); err = "failed to create call struct"; if (!c) goto err1; @@ -1790,6 +1510,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (c->last_signal) goto err2; err = "'call' data incomplete"; + if (json_get_hash(&call, c, "json", -1)) goto err2; err = "'tags' incomplete"; @@ -1820,10 +1541,10 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * c->tos = i; redis_hash_get_time_t(&c->deleted, &call, "deleted"); redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&s, &call, "created_from")) - c->created_from = call_strdup(c, s.s); - if (!redis_hash_get_str(&s, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &s); + if (!redis_hash_get_str(&callid, &call, "created_from")) + c->created_from = call_strdup(c, callid.s); + if (!redis_hash_get_str(&callid, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &callid); err = "missing 'redis_hosted_db' value"; if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) @@ -1886,12 +1607,12 @@ err1: freeReplyObject(rr_jsonStr); log_info_clear(); if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(callid), + rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(&callid), err); if (c) call_destroy(c); else - redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(callid)); + redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(&callid)); } if (c) obj_put(c); @@ -1911,127 +1632,6 @@ static void redis_restore_recording(struct call *c, struct redis_hash *call) { } -static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { - struct redis_hash call; - struct redis_list tags, sfds, streams, medias, maps; - struct call *c = NULL; - str s; - const char *err; - int i; - - c = call_get_or_create(id, m, type); - err = "failed to create call struct"; - if (!c) - goto err1; - err = "call already exists"; - if (c->last_signal) - goto err2; - err = "'call' data incomplete"; - if (redis_get_hash(&call, r, "call", id, -1)) - goto err2; - err = "'tags' incomplete"; - if (redis_get_list_hash(&tags, r, "tag", id, &call, "num_tags")) - goto err3; - err = "'sfds' incomplete"; - if (redis_get_list_hash(&sfds, r, "sfd", id, &call, "num_sfds")) - goto err4; - err = "'streams' incomplete"; - if (redis_get_list_hash(&streams, r, "stream", id, &call, "num_streams")) - goto err5; - err = "'medias' incomplete"; - if (redis_get_list_hash(&medias, r, "media", id, &call, "num_medias")) - goto err6; - err = "'maps' incomplete"; - if (redis_get_list_hash(&maps, r, "map", id, &call, "num_maps")) - goto err7; - - err = "missing 'created' timestamp"; - if (redis_hash_get_time_t(&c->created, &call, "created")) - goto err8; - err = "missing 'last signal' timestamp"; - if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) - goto err8; - if (redis_hash_get_int(&i, &call, "tos")) - c->tos = 184; - else - c->tos = i; - redis_hash_get_time_t(&c->deleted, &call, "deleted"); - redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&s, &call, "created_from")) - c->created_from = call_strdup(c, s.s); - if (!redis_hash_get_str(&s, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &s); - - err = "missing 'redis_hosted_db' value"; - if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) - goto err8; - - err = "failed to create sfds"; - if (redis_sfds(c, &sfds)) - goto err8; - err = "failed to create streams"; - if (redis_streams(c, &streams)) - goto err8; - err = "failed to create tags"; - if (redis_tags(c, &tags)) - goto err8; - err = "failed to create medias"; - if (redis_medias(r, c, &medias)) - goto err8; - err = "failed to create maps"; - if (redis_maps(c, &maps)) - goto err8; - - err = "failed to link sfds"; - if (redis_link_sfds(&sfds, &streams)) - goto err8; - err = "failed to link streams"; - if (redis_link_streams(r, c, &streams, &sfds, &medias)) - goto err8; - err = "failed to link tags"; - if (redis_link_tags(r, c, &tags, &medias)) - goto err8; - err = "failed to link medias"; - if (redis_link_medias(r, c, &medias, &streams, &maps, &tags)) - goto err8; - err = "failed to link maps"; - if (redis_link_maps(r, c, &maps, &sfds)) - goto err8; - - redis_restore_recording(c, &call); - - err = NULL; - -err8: - redis_destroy_list(&maps); -err7: - redis_destroy_list(&medias); -err6: - redis_destroy_list(&streams); -err5: - redis_destroy_list(&sfds); -err4: - redis_destroy_list(&tags); -err3: - redis_destroy_hash(&call); -err2: - rwlock_unlock_w(&c->master_lock); -err1: - log_info_clear(); - if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(id), err); - if (c) - call_destroy(c); - else - redisCommandNR(m->conf.redis_write->ctx, "SREM calls "PB"", STR(id)); - } - - if (c) - obj_put(c); -} - - - struct thread_ctx { struct callmaster *m; GQueue r_q; @@ -2043,6 +1643,7 @@ static void restore_thread(void *call_p, void *ctx_p) { redisReply *call = call_p; struct redis *r; str callid; + str_init(&callid,call->str); rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call)); @@ -2050,14 +1651,7 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - str_init_len(&callid, call->str, call->len); - - if (ctx->m->conf.redis_multikey) { - redis_restore_call(r, ctx->m, &callid, CT_OWN_CALL); - } else { - if (str_shift_cmp(&callid, "json-") == 0) - json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); - } + json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -2085,11 +1679,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - if (m->conf.redis_multikey) { - calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); - } else { - calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); - } + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -2299,7 +1889,6 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE_CSTR("created_from",c->created_from); JSON_SET_SIMPLE_CSTR("created_from_addr",sockaddr_print_buf(&c->created_from_addr)); JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db); - } json_builder_end_object (builder); diff --git a/daemon/redis.h b/daemon/redis.h index ce4eddf67..6cc0f9bc4 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -13,6 +13,7 @@ #include #include #include "call.h" +#include "str.h" #define REDIS_RESTORE_NUM_THREADS 4 @@ -62,7 +63,7 @@ struct redis { int no_redis_required; }; struct redis_hash { - redisReply *rr; + str s; GHashTable *ht; }; struct redis_list { @@ -72,12 +73,6 @@ struct redis_list { }; - - - - - - #if !GLIB_CHECK_VERSION(2,40,0) INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) { gboolean ret = TRUE; @@ -129,12 +124,12 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a #define define_get_int_type(name, type, func) \ static int redis_hash_get_ ## name(type *out, const struct redis_hash *h, const char *k) { \ - redisReply *r; \ + str* s; \ \ - r = g_hash_table_lookup(h->ht, k); \ - if (!r) \ + s = g_hash_table_lookup(h->ht, k); \ + if (!s) \ return -1; \ - *out = func(r->str, NULL, 10); \ + *out = func(s->s, NULL, 10); \ return 0; \ } From 14b37ebfe52a031ff36a461aedb9649591c7085a Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 21 Feb 2017 09:24:06 +0100 Subject: [PATCH 2/7] Removes multikey stuff --- daemon/call.c | 4 - daemon/call.h | 1 - daemon/call_interfaces.c | 16 +- daemon/main.c | 3 - daemon/media_socket.c | 6 +- daemon/redis.c | 497 ++------------------------------------- 6 files changed, 25 insertions(+), 502 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index fc08a4390..ffa91bc8b 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -595,11 +595,7 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); if (update) { - if (m->conf.redis_multikey) { - redis_update(ps->call, m->conf.redis_write); - } else { redis_update_onekey(ps->call, m->conf.redis_write); - } } next: diff --git a/daemon/call.h b/daemon/call.h index 2dc84b3b9..a5777cee9 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -463,7 +463,6 @@ struct callmaster_config { GQueue *redis_subscribed_keyspaces; struct redisAsyncContext *redis_notify_async_context; unsigned int redis_expires_secs; - unsigned int redis_multikey; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 8bd2fc992..f13feb6b9 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -186,11 +186,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - if (m->conf.redis_multikey) { - redis_update(c, m->conf.redis_write); - } else { - redis_update_onekey(c, m->conf.redis_write); - } + redis_update_onekey(c, m->conf.redis_write); gettimeofday(&(monologue->started), NULL); @@ -338,11 +334,7 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - if (m->conf.redis_multikey) { - redis_update(c, m->conf.redis_write); - } else { - redis_update_onekey(c, m->conf.redis_write); - } + redis_update_onekey(c, m->conf.redis_write); ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -774,11 +766,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster rwlock_unlock_w(&call->master_lock); if (!flags.no_redis_update) { - if (m->conf.redis_multikey) { - redis_update(call, m->conf.redis_write); - } else { redis_update_onekey(call,m->conf.redis_write); - } } else { ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); } diff --git a/daemon/main.c b/daemon/main.c index d125ce179..310263304 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -62,7 +62,6 @@ static unsigned int timeout; static unsigned int silent_timeout; static unsigned int final_timeout; static unsigned int redis_expires = 86400; -static unsigned int redis_multikey = 0; static int port_min = 30000; static int port_max = 40000; static int max_sessions = -1; @@ -299,7 +298,6 @@ static void options(int *argc, char ***argv) { { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, { "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" }, - { "redis-multikey", 0, 0, G_OPTION_ARG_NONE, &redis_multikey, "Use multiple redis keys for storing the call (old behaviour) DEPRECATED", NULL }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -628,7 +626,6 @@ no_kernel: } mc.redis_expires_secs = redis_expires; - mc.redis_multikey = redis_multikey; ctx->m->conf = mc; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index ffd482029..afafd5210 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1472,11 +1472,7 @@ out: ca = sfd->call ? : NULL; if (ca && update) { - if (ca->callmaster->conf.redis_multikey) { - redis_update(ca, ca->callmaster->conf.redis_write); - } else { - redis_update_onekey(ca, ca->callmaster->conf.redis_write); - } + redis_update_onekey(ca, ca->callmaster->conf.redis_write); } done: log_info_clear(); diff --git a/daemon/redis.c b/daemon/redis.c index 405083aca..81610c569 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -286,16 +286,9 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; // now at - if (cm->conf.redis_multikey) { - if (str_shift_cmp(&keyspace_id, "notifier-")) { - rlog(LOG_ERROR,"Redis-Notifier: The prefix 'notifier-' to determine the redis key has not been found in the redis notification !\n"); - goto err; - } - } else { - if (str_shift_cmp(&keyspace_id, "json-")) { - rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); - goto err; - } + if (str_shift_cmp(&keyspace_id, "json-")) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); + goto err; } callid = keyspace_id; @@ -463,54 +456,28 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a return -1; } - if (cm->conf.redis_multikey) { - switch (action) { - case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL"); - return -1; - } - break; - default: - rlog(LOG_ERROR, "No subscribe action found: %d", action); + switch (action) { + case SUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); return -1; } - } else { - switch (action) { - case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); - return -1; - } - break; - default: - rlog(LOG_ERROR, "No subscribe action found: %d", action); + break; + case UNSUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_ALL: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); return -1; } + break; + default: + rlog(LOG_ERROR, "No subscribe action found: %d", action); + return -1; } return 0; @@ -718,42 +685,12 @@ static int redis_check_conn(struct redis *r) { return REDIS_STATE_RECONNECTED; } - - -static void redis_delete_list(struct redis *r, const str *callid, const char *prefix, GQueue *q) { - unsigned int i; - - for (i = 0; i < g_queue_get_length(q); i++) - redis_pipe(r, "DEL %s-"PB"-%u", prefix, STR(callid), i); -} - /* called with r->lock held and c->master_lock held */ static void redis_delete_call_json(struct call *c, struct redis *r) { redis_pipe(r, "DEL json-"PB"", STR(&c->callid)); redis_consume(r); } -/* called with r->lock held and c->master_lock held */ -static void redis_delete_call(struct call *c, struct redis *r) { - redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); - redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); - redis_pipe(r, "DEL call-"PB"", STR(&c->callid)); - redis_delete_list(r, &c->callid, "sfd", &c->stream_fds); - redis_delete_list(r, &c->callid, "stream", &c->streams); - redis_delete_list(r, &c->callid, "stream_sfds", &c->streams); - redis_delete_list(r, &c->callid, "tag", &c->monologues); - redis_delete_list(r, &c->callid, "other_tags", &c->monologues); - redis_delete_list(r, &c->callid, "medias", &c->monologues); - redis_delete_list(r, &c->callid, "media", &c->medias); - redis_delete_list(r, &c->callid, "streams", &c->medias); - redis_delete_list(r, &c->callid, "maps", &c->medias); - redis_delete_list(r, &c->callid, "payload_types", &c->medias); - redis_delete_list(r, &c->callid, "map", &c->endpoint_maps); - redis_delete_list(r, &c->callid, "map_sfds", &c->endpoint_maps); - - redis_consume(r); -} - INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const char* tmp, int len) { char enc[len * 3 + 1]; str_uri_encode_len(enc, tmp, len); @@ -768,18 +705,6 @@ INLINE char* json_reader_get_string_value_uri_enc(JsonReader *root_reader, int * return ret; // must be free'd } -// stolen from libhiredis -/* Create a reply object */ -INLINE redisReply *createReplyObject(int type) { - redisReply *r = calloc(1,sizeof(*r)); - - if (r == NULL) - return NULL; - - r->type = type; - return r; -} - static int json_get_hash(struct redis_hash *out, struct call* c, const char *key, unsigned int id) { @@ -1763,30 +1688,6 @@ static int json_update_crypto_params(JsonBuilder *builder, const char *pref, return 0; } -static int redis_update_crypto_params(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const char *key, const struct crypto_params *p) -{ - if (!p->crypto_suite) - return -1; - redis_pipe(r, "HMSET %s-"PB"-%u %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB" " - "%s-unenc-srtp %i %s-unenc-srtcp %i %s-unauth-srtp %i", - pref, STR(callid), unique_id, - key, p->crypto_suite->name, - key, S_LEN(p->master_key, sizeof(p->master_key)), - key, S_LEN(p->master_salt, sizeof(p->master_salt)), - key, p->session_params.unencrypted_srtp, - key, p->session_params.unencrypted_srtcp, - key, p->session_params.unauthenticated_srtp); - if (p->mki) - redis_pipe(r, "HMSET %s-"PB"-%u %s-mki "PB"", - pref, STR(callid), unique_id, - key, - S_LEN(p->mki, p->mki_len)); - - return 0; -} - static void json_update_crypto_context(JsonBuilder *builder, const char *pref, unsigned int unique_id, const struct crypto_context *c) @@ -1801,34 +1702,6 @@ static void json_update_crypto_context(JsonBuilder *builder, const char *pref, } -static void redis_update_crypto_context(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const struct crypto_context *c) -{ - if (redis_update_crypto_params(r, pref, callid, unique_id, "", &c->params)) - return; - redis_pipe(r, "HMSET %s-"PB"-%u last_index "UINT64F" ssrc %u", - pref, STR(callid), unique_id, - c->last_index, (unsigned) c->ssrc); -} -static void redis_update_endpoint(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const char *key, const struct endpoint *e) -{ - redis_pipe(r, "HMSET %s-"PB"-%u %s %s", - pref, STR(callid), unique_id, - key, endpoint_print_buf(e)); -} -static void redis_update_stats(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const char *key, const struct stats *s) -{ - redis_pipe(r, "HMSET %s-"PB"-%u %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"", - pref, STR(callid), unique_id, - key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), - key, atomic64_get(&s->errors)); -} - static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, unsigned int unique_id, const struct dtls_fingerprint *f) @@ -1840,17 +1713,6 @@ static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, JSON_SET_SIMPLE_LEN("fingerprint", sizeof(f->digest), (char *) f->digest); } -static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const struct dtls_fingerprint *f) -{ - if (!f->hash_func) - return; - redis_pipe(r, "HMSET %s-"PB"-%u hash_func %s fingerprint "PB"", - pref, STR(callid), unique_id, - f->hash_func->name, - S_LEN(f->digest, sizeof(f->digest))); -} /** * encodes the few (k,v) pairs for one call under one json structure */ @@ -2208,317 +2070,6 @@ static void redis_update_recording(struct redis *r, struct call *c) { STR(&rec->metadata), rec->meta_prefix); } - -/* - * Redis data structure: - * - * SET: calls %s %s %s ... - * - * HASH: call-$callid num_sfds %u num_streams %u num_medias %u num_tags %u num_maps %u - * - * HASH: sfd-$callid-$num stream %u - * - * HASH: stream-$callid-$num media %u sfd %u rtp_sink %u rtcp_sink %u rtcp_sibling %u - * LIST: stream_sfds-$callid-$num %u %u ... - * - * HASH: tag-$callid-$num - * LIST: other_tags-$callid-$num %u %u ... - * LIST: medias-$callid-$num %u %u ... - * - * HASH: media-$callid-$num tag %u - * LIST: streams-$callid-$num %u %u ... - * LIST: maps-$callid-$num %u %u ... - * - * HASH: map-$callid-$num - * LIST: map_sfds-$callid-$num %u %u ... - */ - -/* must be called lock-free */ -void redis_update(struct call *c, struct redis *r) { - GList *l, *n, *k, *m; - struct call_monologue *ml, *ml2; - - struct call_media *media; - struct packet_stream *ps; - struct stream_fd *sfd; - struct intf_list *il; - struct endpoint_map *ep; - struct rtp_payload_type *pt; - unsigned int redis_expires_s; - - if (!r) - return; - - mutex_lock(&r->lock); - if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { - mutex_unlock(&r->lock); - return ; - } - - rwlock_lock_r(&c->master_lock); - - redis_expires_s = c->callmaster->conf.redis_expires_secs; - - c->redis_hosted_db = r->db; - if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { - rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); - goto err; - } - - redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); - redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); - redis_pipe(r, "DEL call-"PB"", STR(&c->callid)); - redis_pipe(r, "HMSET call-"PB" created %llu last_signal %llu tos %i deleted %llu " - "num_sfds %u num_streams %u num_medias %u num_tags %u " - "num_maps %u " - "ml_deleted %llu created_from %s created_from_addr %s redis_hosted_db %u", - STR(&c->callid), (long long unsigned) c->created, (long long unsigned) c->last_signal, - (int) c->tos, (long long unsigned) c->deleted, - g_queue_get_length(&c->stream_fds), g_queue_get_length(&c->streams), - g_queue_get_length(&c->medias), g_queue_get_length(&c->monologues), - g_queue_get_length(&c->endpoint_maps), - (long long unsigned) c->ml_deleted, - c->created_from, sockaddr_print_buf(&c->created_from_addr), - c->redis_hosted_db); - /* XXX DTLS cert?? */ - - redis_update_recording(r, c); - - redis_pipe(r, "DEL sfd-"PB"-0", STR(&c->callid)); - - for (l = c->stream_fds.head; l; l = l->next) { - sfd = l->data; - - redis_pipe(r, "HMSET sfd-"PB"-%u pref_family %s localport %u logical_intf "PB" " - "local_intf_uid %u " - "stream %u", - STR(&c->callid), sfd->unique_id, - sfd->local_intf->logical->preferred_family->rfc_name, - sfd->socket.local.port, - STR(&sfd->local_intf->logical->name), - sfd->local_intf->unique_id, - sfd->stream->unique_id); - redis_update_crypto_context(r, "sfd", &c->callid, sfd->unique_id, &sfd->crypto); - /* XXX DTLS?? */ - redis_pipe(r, "EXPIRE sfd-"PB"-%u %u", STR(&c->callid), sfd->unique_id, redis_expires_s); - - redis_pipe(r, "DEL sfd-"PB"-%u", STR(&c->callid), sfd->unique_id + 1); - } - - redis_pipe(r, "DEL stream-"PB"-0 stream_sfds-"PB"-0", STR(&c->callid), STR(&c->callid)); - - for (l = c->streams.head; l; l = l->next) { - ps = l->data; - - mutex_lock(&ps->in_lock); - mutex_lock(&ps->out_lock); - - redis_pipe(r, "HMSET stream-"PB"-%u media %u sfd %u rtp_sink %u " - "rtcp_sink %u rtcp_sibling %u last_packet "UINT64F" " - "ps_flags %u component %u", - STR(&c->callid), ps->unique_id, - ps->media->unique_id, - ps->selected_sfd ? ps->selected_sfd->unique_id : -1, - ps->rtp_sink ? ps->rtp_sink->unique_id : -1, - ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1, - ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1, - atomic64_get(&ps->last_packet), - ps->ps_flags, - ps->component); - redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "endpoint", &ps->endpoint); - redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "advertised_endpoint", - &ps->advertised_endpoint); - redis_update_stats(r, "stream", &c->callid, ps->unique_id, "stats", &ps->stats); - redis_update_crypto_context(r, "stream", &c->callid, ps->unique_id, &ps->crypto); - /* XXX DTLS?? */ - - for (k = ps->sfds.head; k; k = k->next) { - sfd = k->data; - redis_pipe(r, "RPUSH stream_sfds-"PB"-%u %u", - STR(&c->callid), ps->unique_id, - sfd->unique_id); - } - - mutex_unlock(&ps->in_lock); - mutex_unlock(&ps->out_lock); - - redis_pipe(r, "EXPIRE stream-"PB"-%u %u", STR(&c->callid), ps->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE stream_sfds-"PB"-%u %u", STR(&c->callid), ps->unique_id, redis_expires_s); - - redis_pipe(r, "DEL stream-"PB"-%u stream_sfds-"PB"-%u", - STR(&c->callid), ps->unique_id + 1, - STR(&c->callid), ps->unique_id + 1); - } - - redis_pipe(r, "DEL tag-"PB"-0 other_tags-"PB"-0 medias-"PB"-0", - STR(&c->callid), STR(&c->callid), STR(&c->callid)); - - for (l = c->monologues.head; l; l = l->next) { - ml = l->data; - - redis_pipe(r, "HMSET tag-"PB"-%u created %llu active %u deleted %llu", - STR(&c->callid), ml->unique_id, - (long long unsigned) ml->created, - ml->active_dialogue ? ml->active_dialogue->unique_id : -1, - (long long unsigned) ml->deleted); - if (ml->tag.s) - redis_pipe(r, "HMSET tag-"PB"-%u tag "PB"", - STR(&c->callid), ml->unique_id, - STR(&ml->tag)); - if (ml->viabranch.s) - redis_pipe(r, "HMSET tag-"PB"-%u via-branch "PB"", - STR(&c->callid), ml->unique_id, - STR(&ml->viabranch)); - - k = g_hash_table_get_values(ml->other_tags); - for (m = k; m; m = m->next) { - ml2 = m->data; - redis_pipe(r, "RPUSH other_tags-"PB"-%u %u", - STR(&c->callid), ml->unique_id, - ml2->unique_id); - } - g_list_free(k); - - for (k = ml->medias.head; k; k = k->next) { - media = k->data; - redis_pipe(r, "RPUSH medias-"PB"-%u %u", - STR(&c->callid), ml->unique_id, - media->unique_id); - } - - redis_pipe(r, "EXPIRE tag-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE other_tags-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE medias-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s); - - redis_pipe(r, "DEL tag-"PB"-%u other_tags-"PB"-%u medias-"PB"-%u", - STR(&c->callid), ml->unique_id + 1, - STR(&c->callid), ml->unique_id + 1, - STR(&c->callid), ml->unique_id + 1); - } - - redis_pipe(r, "DEL media-"PB"-0 streams-"PB"-0 maps-"PB"-0 payload_types-"PB"-0", - STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid)); - - for (l = c->medias.head; l; l = l->next) { - media = l->data; - - redis_pipe(r, "HMSET media-"PB"-%u " - "tag %u " - "index %u " - "type "PB" protocol %s desired_family %s " - "sdes_in_tag %u sdes_out_tag %u logical_intf "PB" " - "media_flags %u", - STR(&c->callid), media->unique_id, - media->monologue->unique_id, - media->index, - STR(&media->type), media->protocol ? media->protocol->name : "", - media->desired_family ? media->desired_family->rfc_name : "", - media->sdes_in.tag, media->sdes_out.tag, - STR(&media->logical_intf->name), - media->media_flags); - redis_update_crypto_params(r, "media", &c->callid, media->unique_id, "sdes_in", - &media->sdes_in.params); - redis_update_crypto_params(r, "media", &c->callid, media->unique_id, "sdes_out", - &media->sdes_out.params); - redis_update_dtls_fingerprint(r, "media", &c->callid, media->unique_id, &media->fingerprint); - - for (m = media->streams.head; m; m = m->next) { - ps = m->data; - redis_pipe(r, "RPUSH streams-"PB"-%u %u", - STR(&c->callid), media->unique_id, - ps->unique_id); - } - - for (m = media->endpoint_maps.head; m; m = m->next) { - ep = m->data; - redis_pipe(r, "RPUSH maps-"PB"-%u %u", - STR(&c->callid), media->unique_id, - ep->unique_id); - } - - k = g_hash_table_get_values(media->rtp_payload_types); - for (m = k; m; m = m->next) { - pt = m->data; - redis_pipe(r, "RPUSH payload_types-"PB"-%u %u/"PB"/%u/"PB"", - STR(&c->callid), media->unique_id, - pt->payload_type, STR(&pt->encoding), - pt->clock_rate, STR(&pt->encoding_parameters)); - } - g_list_free(k); - - redis_pipe(r, "EXPIRE media-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE streams-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE maps-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE payload_types-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - - redis_pipe(r, "DEL media-"PB"-%u streams-"PB"-%u maps-"PB"-%u payload_types-"PB"-%u", - STR(&c->callid), media->unique_id + 1, - STR(&c->callid), media->unique_id + 1, - STR(&c->callid), media->unique_id + 1, - STR(&c->callid), media->unique_id + 1); - } - - redis_pipe(r, "DEL map-"PB"-0 map_sfds-"PB"-0", - STR(&c->callid), STR(&c->callid)); - - for (l = c->endpoint_maps.head; l; l = l->next) { - ep = l->data; - - redis_pipe(r, "HMSET map-"PB"-%u wildcard %i num_ports %u intf_preferred_family %s " - "logical_intf "PB"", - STR(&c->callid), ep->unique_id, - ep->wildcard, - ep->num_ports, - ep->logical_intf->preferred_family->rfc_name, - STR(&ep->logical_intf->name)); - redis_update_endpoint(r, "map", &c->callid, ep->unique_id, "endpoint", &ep->endpoint); - - for (m = ep->intf_sfds.head; m; m = m->next) { - il = m->data; - - redis_pipe(r, "RPUSH map_sfds-"PB"-%u loc-%u", - STR(&c->callid), ep->unique_id, - il->local_intf->unique_id); - - for (n = il->list.head; n; n = n->next) { - sfd = n->data; - - redis_pipe(r, "RPUSH map_sfds-"PB"-%u %u", - STR(&c->callid), ep->unique_id, - sfd->unique_id); - } - - } - - redis_pipe(r, "EXPIRE map-"PB"-%u %u", STR(&c->callid), ep->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE map_sfds-"PB"-%u %u", STR(&c->callid), ep->unique_id, redis_expires_s); - - redis_pipe(r, "DEL map-"PB"-%u map_sfds-"PB"-%u", - STR(&c->callid), ep->unique_id + 1, - STR(&c->callid), ep->unique_id + 1); - } - - redis_pipe(r, "EXPIRE call-"PB" %u", STR(&c->callid), redis_expires_s); - redis_pipe(r, "SADD calls "PB"", STR(&c->callid)); - redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); - redis_pipe(r, "EXPIRE notifier-"PB" %u", STR(&c->callid), redis_expires_s); - - redis_consume(r); - - mutex_unlock(&r->lock); - rwlock_unlock_r(&c->master_lock); - - return; -err: - - mutex_unlock(&r->lock); - rwlock_unlock_r(&c->master_lock); - if (r->ctx->err) - rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); - redisFree(r->ctx); - r->ctx = NULL; -} - /* must be called lock-free */ void redis_delete(struct call *c, struct redis *r) { if (!r) @@ -2534,11 +2085,7 @@ void redis_delete(struct call *c, struct redis *r) { if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) goto err; - if (c->callmaster->conf.redis_multikey) { - redis_delete_call(c, r); - } else { - redis_delete_call_json(c, r); - } + redis_delete_call_json(c, r); rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock); From 6985784cead090464d59c43b156bc911b6675828 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 21 Feb 2017 10:41:41 +0100 Subject: [PATCH 3/7] Fixes redis recording flag in onekey concept --- daemon/redis.c | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/daemon/redis.c b/daemon/redis.c index 81610c569..32cb65feb 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1397,6 +1397,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; str callid; + str s; const char *err = 0; int i; @@ -1507,6 +1508,14 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (json_link_maps(r, c, &maps, &sfds)) goto err8; + // presence of this key determines whether we were recording at all + if (!redis_hash_get_str(&s, &call, "recording_meta_prefix")) { + recording_start(c, s.s); + + if (!redis_hash_get_str(&s, &call, "recording_metadata")) + call_str_cpy(c, &c->recording->metadata, &s); + } + err = NULL; err8: @@ -1543,20 +1552,6 @@ err1: obj_put(c); } -static void redis_restore_recording(struct call *c, struct redis_hash *call) { - str s; - - // presence of this key determines whether we were recording at all - if (redis_hash_get_str(&s, call, "recording_meta_prefix")) - return; - - recording_start(c, s.s); - - if (!redis_hash_get_str(&s, call, "recording_metadata")) - call_str_cpy(c, &c->recording->metadata, &s); -} - - struct thread_ctx { struct callmaster *m; GQueue r_q; @@ -1728,6 +1723,7 @@ char* redis_encode_json(struct call *c) { struct intf_list *il; struct call_monologue *ml, *ml2; JsonBuilder *builder = json_builder_new (); + struct recording *rec = 0; char tmp[2048]; @@ -1751,6 +1747,11 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE_CSTR("created_from",c->created_from); JSON_SET_SIMPLE_CSTR("created_from_addr",sockaddr_print_buf(&c->created_from_addr)); JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db); + + if ((rec = c->recording)) { + JSON_SET_SIMPLE_STR("recording_metadata",&rec->metadata); + JSON_SET_SIMPLE_CSTR("recording_meta_prefix",rec->meta_prefix); + } } json_builder_end_object (builder); @@ -2040,6 +2041,8 @@ void redis_update_onekey(struct call *c, struct redis *r) { redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); +//TODOO redis_update_recording(r, c); + redis_consume(r); if (result) @@ -2059,17 +2062,6 @@ err: } -static void redis_update_recording(struct redis *r, struct call *c) { - struct recording *rec; - - if (!(rec = c->recording)) - return; - - redis_pipe(r, "HMSET call-"PB" recording_metadata "PB" recording_meta_prefix %s ", - STR(&c->callid), - STR(&rec->metadata), rec->meta_prefix); -} - /* must be called lock-free */ void redis_delete(struct call *c, struct redis *r) { if (!r) From d08dd6a5f3b30467467b0c681ae3919c04e2a1fe Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Wed, 22 Feb 2017 09:42:46 +0100 Subject: [PATCH 4/7] Fixes SRTP restore in onekey concept --- daemon/redis.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/redis.c b/daemon/redis.c index 32cb65feb..aff1c0864 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -741,7 +741,7 @@ static int json_get_hash(struct redis_hash *out, struct call* c, rlog(LOG_ERROR, "Could not read json member: %s",*members); goto err3; } - str_init_len(&out->s,(char*)json_reader_get_string_value(c->root_reader),strlen((char*)json_reader_get_string_value(c->root_reader))); + str_init(&out->s,(char*)json_reader_get_string_value_uri_enc(c->root_reader,&out->s.len)); char* tmp = strdup(*members); if (g_hash_table_insert_check(out->ht, tmp, str_dup(&out->s)) != TRUE) { From 279e5fa36e4d43a389878660c4208302da043ce2 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Wed, 22 Feb 2017 09:44:10 +0100 Subject: [PATCH 5/7] Removes commented code --- daemon/redis.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/daemon/redis.c b/daemon/redis.c index aff1c0864..ec2b6af6e 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -2041,8 +2041,6 @@ void redis_update_onekey(struct call *c, struct redis *r) { redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); -//TODOO redis_update_recording(r, c); - redis_consume(r); if (result) From d904fb2fe511760a9dac650cd67d2f3f072f898a Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Wed, 22 Feb 2017 10:16:15 +0100 Subject: [PATCH 6/7] Removes 'json-' prefix from redis key (callid) --- daemon/redis.c | 46 ++++++++++++++++++---------------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/daemon/redis.c b/daemon/redis.c index ec2b6af6e..b9a0b6a16 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -284,13 +284,8 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; if (keyspace_id.s[-1] != ':') goto err; - // now at - - if (str_shift_cmp(&keyspace_id, "json-")) { - rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); - goto err; - } + // now at callid = keyspace_id; // select the right db for restoring the call @@ -458,13 +453,13 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a switch (action) { case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); return -1; } @@ -687,7 +682,7 @@ static int redis_check_conn(struct redis *r) { /* called with r->lock held and c->master_lock held */ static void redis_delete_call_json(struct call *c, struct redis *r) { - redis_pipe(r, "DEL json-"PB"", STR(&c->callid)); + redis_pipe(r, "DEL "PB"", STR(&c->callid)); redis_consume(r); } @@ -1391,26 +1386,21 @@ static int json_link_maps(struct redis *r, struct call *c, struct redis_list *ma return 0; } -static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { +static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { redisReply* rr_jsonStr; struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; - str callid; - str s; + str s, id; const char *err = 0; int i; JsonReader *root_reader =0; JsonParser *parser =0; - // TODO: Maybe refactor - str_init_len(&callid, id->s, id->len); - str_shift(&callid,strlen("json-")); - - rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(&callid)); + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, STR(callid)); if (!rr_jsonStr) { - rlog(LOG_ERR, "Could not retrieve json data from redis for key: %s", id->s); + rlog(LOG_ERR, "Could not retrieve json data from redis for key: %s", callid->s); goto err1; } @@ -1425,7 +1415,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * goto err1; } - c = call_get_or_create(&callid, m, type); + c = call_get_or_create(callid, m, type); err = "failed to create call struct"; if (!c) goto err1; @@ -1467,10 +1457,10 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * c->tos = i; redis_hash_get_time_t(&c->deleted, &call, "deleted"); redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&callid, &call, "created_from")) - c->created_from = call_strdup(c, callid.s); - if (!redis_hash_get_str(&callid, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &callid); + if (!redis_hash_get_str(&id, &call, "created_from")) + c->created_from = call_strdup(c, id.s); + if (!redis_hash_get_str(&id, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &id); err = "missing 'redis_hosted_db' value"; if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) @@ -1541,12 +1531,12 @@ err1: freeReplyObject(rr_jsonStr); log_info_clear(); if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(&callid), + rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(callid), err); if (c) call_destroy(c); else - redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(&callid)); + redisCommandNR(m->conf.redis_write->ctx, "DEL " PB, STR(callid)); } if (c) obj_put(c); @@ -1599,7 +1589,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS *"); if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -2038,8 +2028,8 @@ void redis_update_onekey(struct call *c, struct redis *r) { if (!result) goto err; - redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); - redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); + redis_pipe(r, "SET "PB" %s", STR(&c->callid), result); + redis_pipe(r, "EXPIRE "PB" %i", STR(&c->callid), redis_expires_s); redis_consume(r); From 267b57c33f247e3e2e7ee0d0269c3c326ef1e120 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Sun, 5 Mar 2017 10:11:55 +0100 Subject: [PATCH 7/7] Implemented comments from Richard from pull req #323 --- daemon/redis.c | 14 +++++++------- daemon/redis.h | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/daemon/redis.c b/daemon/redis.c index b9a0b6a16..1428aca33 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -706,6 +706,7 @@ static int json_get_hash(struct redis_hash *out, struct call* c, static unsigned int MAXKEYLENGTH = 512; char key_concatted[MAXKEYLENGTH]; int rc=0; + str tmpstr; if (!c) goto err; @@ -723,7 +724,7 @@ static int json_get_hash(struct redis_hash *out, struct call* c, goto err; } - out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, freeReplyObject); + out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); if (!out->ht) goto err; @@ -736,10 +737,10 @@ static int json_get_hash(struct redis_hash *out, struct call* c, rlog(LOG_ERROR, "Could not read json member: %s",*members); goto err3; } - str_init(&out->s,(char*)json_reader_get_string_value_uri_enc(c->root_reader,&out->s.len)); + str_init(&tmpstr,(char*)json_reader_get_string_value_uri_enc(c->root_reader,&tmpstr.len)); char* tmp = strdup(*members); - if (g_hash_table_insert_check(out->ht, tmp, str_dup(&out->s)) != TRUE) { + if (g_hash_table_insert_check(out->ht, tmp, str_dup(&tmpstr)) != TRUE) { ilog(LOG_WARNING,"Key %s already exists", tmp); goto err3; } @@ -783,8 +784,7 @@ static int redis_hash_get_str(str *out, const struct redis_hash *h, const char * out->len = 0; return -1; } - out->s = r->s; - out->len = r->len; + *out = *r; return 0; } @@ -1400,7 +1400,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, STR(callid)); if (!rr_jsonStr) { - rlog(LOG_ERR, "Could not retrieve json data from redis for key: %s", callid->s); + rlog(LOG_ERR, "Could not retrieve json data from redis for key: "STR_FORMAT, STR_FMT(callid)); goto err1; } @@ -1553,7 +1553,7 @@ static void restore_thread(void *call_p, void *ctx_p) { redisReply *call = call_p; struct redis *r; str callid; - str_init(&callid,call->str); + str_init_len(&callid, call->str, call->len); rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call)); diff --git a/daemon/redis.h b/daemon/redis.h index 6cc0f9bc4..1932ea8d3 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -62,10 +62,11 @@ struct redis { int state; int no_redis_required; }; + struct redis_hash { - str s; GHashTable *ht; }; + struct redis_list { unsigned int len; struct redis_hash *rh;