diff --git a/daemon/call.c b/daemon/call.c index fc08a4390..ffa91bc8b 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -595,11 +595,7 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); if (update) { - if (m->conf.redis_multikey) { - redis_update(ps->call, m->conf.redis_write); - } else { redis_update_onekey(ps->call, m->conf.redis_write); - } } next: diff --git a/daemon/call.h b/daemon/call.h index 2dc84b3b9..a5777cee9 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -463,7 +463,6 @@ struct callmaster_config { GQueue *redis_subscribed_keyspaces; struct redisAsyncContext *redis_notify_async_context; unsigned int redis_expires_secs; - unsigned int redis_multikey; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 8bd2fc992..f13feb6b9 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -186,11 +186,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - if (m->conf.redis_multikey) { - redis_update(c, m->conf.redis_write); - } else { - redis_update_onekey(c, m->conf.redis_write); - } + redis_update_onekey(c, m->conf.redis_write); gettimeofday(&(monologue->started), NULL); @@ -338,11 +334,7 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - if (m->conf.redis_multikey) { - redis_update(c, m->conf.redis_write); - } else { - redis_update_onekey(c, m->conf.redis_write); - } + redis_update_onekey(c, m->conf.redis_write); ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -774,11 +766,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster rwlock_unlock_w(&call->master_lock); if (!flags.no_redis_update) { - if (m->conf.redis_multikey) { - redis_update(call, m->conf.redis_write); - } else { redis_update_onekey(call,m->conf.redis_write); - } } else { ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); } diff --git a/daemon/main.c b/daemon/main.c index d125ce179..310263304 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -62,7 +62,6 @@ static unsigned int timeout; static unsigned int silent_timeout; static unsigned int final_timeout; static unsigned int redis_expires = 86400; -static unsigned int redis_multikey = 0; static int port_min = 30000; static int port_max = 40000; static int max_sessions = -1; @@ -299,7 +298,6 @@ static void options(int *argc, char ***argv) { { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, { "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" }, - { "redis-multikey", 0, 0, G_OPTION_ARG_NONE, &redis_multikey, "Use multiple redis keys for storing the call (old behaviour) DEPRECATED", NULL }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -628,7 +626,6 @@ no_kernel: } mc.redis_expires_secs = redis_expires; - mc.redis_multikey = redis_multikey; ctx->m->conf = mc; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index ffd482029..afafd5210 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1472,11 +1472,7 @@ out: ca = sfd->call ? : NULL; if (ca && update) { - if (ca->callmaster->conf.redis_multikey) { - redis_update(ca, ca->callmaster->conf.redis_write); - } else { - redis_update_onekey(ca, ca->callmaster->conf.redis_write); - } + redis_update_onekey(ca, ca->callmaster->conf.redis_write); } done: log_info_clear(); diff --git a/daemon/redis.c b/daemon/redis.c index a51eb3e8e..1428aca33 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -67,7 +67,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define REDIS_FMT(x) (x)->len, (x)->str -static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type); static int redis_check_conn(struct redis *r); static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type); @@ -285,20 +284,8 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; if (keyspace_id.s[-1] != ':') goto err; - // now at - - if (cm->conf.redis_multikey) { - if (str_shift_cmp(&keyspace_id, "notifier-")) { - rlog(LOG_ERROR,"Redis-Notifier: The prefix 'notifier-' to determine the redis key has not been found in the redis notification !\n"); - goto err; - } - } else { - if (str_shift_cmp(&keyspace_id, "json-")) { - rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); - goto err; - } - } + // now at callid = keyspace_id; // select the right db for restoring the call @@ -310,10 +297,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; } - if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0) - redis_restore_call(r, cm, &callid, CT_FOREIGN_CALL); - - if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) { + if (strncmp(rr->element[3]->str,"set",3)==0) { c = call_get(&callid, cm); if (c) { rwlock_unlock_w(&c->master_lock); @@ -467,54 +451,28 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a return -1; } - if (cm->conf.redis_multikey) { - switch (action) { - case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL"); - return -1; - } - break; - default: - rlog(LOG_ERROR, "No subscribe action found: %d", action); + switch (action) { + case SUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); return -1; } - } else { - switch (action) { - case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); - return -1; - } - break; - case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { - rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); - return -1; - } - break; - default: - rlog(LOG_ERROR, "No subscribe action found: %d", action); + break; + case UNSUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); return -1; } + break; + case UNSUBSCRIBE_ALL: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); + return -1; + } + break; + default: + rlog(LOG_ERROR, "No subscribe action found: %d", action); + return -1; } return 0; @@ -722,39 +680,9 @@ static int redis_check_conn(struct redis *r) { return REDIS_STATE_RECONNECTED; } - - -static void redis_delete_list(struct redis *r, const str *callid, const char *prefix, GQueue *q) { - unsigned int i; - - for (i = 0; i < g_queue_get_length(q); i++) - redis_pipe(r, "DEL %s-"PB"-%u", prefix, STR(callid), i); -} - /* called with r->lock held and c->master_lock held */ static void redis_delete_call_json(struct call *c, struct redis *r) { - redis_pipe(r, "DEL json-"PB"", STR(&c->callid)); - redis_consume(r); -} - -/* called with r->lock held and c->master_lock held */ -static void redis_delete_call(struct call *c, struct redis *r) { - redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); - redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); - redis_pipe(r, "DEL call-"PB"", STR(&c->callid)); - redis_delete_list(r, &c->callid, "sfd", &c->stream_fds); - redis_delete_list(r, &c->callid, "stream", &c->streams); - redis_delete_list(r, &c->callid, "stream_sfds", &c->streams); - redis_delete_list(r, &c->callid, "tag", &c->monologues); - redis_delete_list(r, &c->callid, "other_tags", &c->monologues); - redis_delete_list(r, &c->callid, "medias", &c->monologues); - redis_delete_list(r, &c->callid, "media", &c->medias); - redis_delete_list(r, &c->callid, "streams", &c->medias); - redis_delete_list(r, &c->callid, "maps", &c->medias); - redis_delete_list(r, &c->callid, "payload_types", &c->medias); - redis_delete_list(r, &c->callid, "map", &c->endpoint_maps); - redis_delete_list(r, &c->callid, "map_sfds", &c->endpoint_maps); - + redis_pipe(r, "DEL "PB"", STR(&c->callid)); redis_consume(r); } @@ -772,25 +700,13 @@ INLINE char* json_reader_get_string_value_uri_enc(JsonReader *root_reader, int * return ret; // must be free'd } -// stolen from libhiredis -/* Create a reply object */ -INLINE redisReply *createReplyObject(int type) { - redisReply *r = calloc(1,sizeof(*r)); - - if (r == NULL) - return NULL; - - r->type = type; - return r; -} - static int json_get_hash(struct redis_hash *out, struct call* c, - const char *key, - unsigned int id) + const char *key, unsigned int id) { static unsigned int MAXKEYLENGTH = 512; char key_concatted[MAXKEYLENGTH]; int rc=0; + str tmpstr; if (!c) goto err; @@ -803,16 +719,14 @@ static int json_get_hash(struct redis_hash *out, struct call* c, if (rc>=MAXKEYLENGTH) rlog(LOG_ERROR,"Json key too long."); - if (!json_reader_read_member(c->root_reader, key_concatted)) { rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); goto err; } - out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, freeReplyObject); + out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); if (!out->ht) goto err; - out->rr = 0; gchar **members = json_reader_list_members(c->root_reader); gchar **orig_members = members; @@ -823,16 +737,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c, rlog(LOG_ERROR, "Could not read json member: %s",*members); goto err3; } - out->rr = createReplyObject(REDIS_REPLY_STRING); - if (!out->rr) { - rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)"); - goto err3; - } - out->rr->str = json_reader_get_string_value_uri_enc(c->root_reader, &out->rr->len); - + str_init(&tmpstr,(char*)json_reader_get_string_value_uri_enc(c->root_reader,&tmpstr.len)); char* tmp = strdup(*members); - if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) { + if (g_hash_table_insert_check(out->ht, tmp, str_dup(&tmpstr)) != TRUE) { + ilog(LOG_WARNING,"Key %s already exists", tmp); goto err3; } @@ -847,68 +756,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c, err3: g_strfreev(members); - if (out->rr) - freeReplyObject(out->rr); - g_hash_table_destroy(out->ht); -err: - return -1; -} - - -static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const str *which, - unsigned int id) -{ - redisReply *k, *v; - int i; - - out->ht = g_hash_table_new(g_str_hash, g_str_equal); - if (!out->ht) - goto err; - if (id == -1) - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR(which)); - else - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"-%u", key, STR(which), id); - if (!out->rr) - goto err2; - - for (i = 1; i < out->rr->elements; i += 2) { - k = out->rr->element[i - 1]; - v = out->rr->element[i]; - if (k->type != REDIS_REPLY_STRING || v->type != REDIS_REPLY_STRING) - continue; - - if (g_hash_table_insert_check(out->ht, k->str, v) != TRUE) - goto err3; - } - - return 0; - -err3: - if (out->rr) - freeReplyObject(out->rr); -err2: g_hash_table_destroy(out->ht); err: return -1; } - -static void redis_destroy_hash(struct redis_hash *rh) { - - if (rh->rr) - freeReplyObject(rh->rr); - g_hash_table_destroy(rh->ht); -} -static void redis_destroy_list(struct redis_list *rl) { - unsigned int i; - - for (i = 0; i < rl->len; i++) { - redis_destroy_hash(&rl->rh[i]); - } - free(rl->rh); - free(rl->ptrs); -} - static void json_destroy_hash(struct redis_hash *rh) { g_hash_table_destroy(rh->ht); } @@ -924,7 +776,7 @@ static void json_destroy_list(struct redis_list *rl) { } static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) { - redisReply *r; + str *r; r = g_hash_table_lookup(h->ht, k); if (!r) { @@ -932,8 +784,7 @@ static int redis_hash_get_str(str *out, const struct redis_hash *h, const char * out->len = 0; return -1; } - out->s = r->str; - out->len = r->len; + *out = *r; return 0; } @@ -1054,34 +905,6 @@ static int json_build_list_cb(GQueue *q, struct call *c, const char *key, return 0; } - -static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid, - unsigned int idx, struct redis_list *list, - int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) -{ - redisReply *rr; - int i; - str s; - - rr = redis_get(r, REDIS_REPLY_ARRAY, "LRANGE %s-"PB"-%u 0 -1", key, STR(callid), idx); - if (!rr) - return -1; - - for (i = 0; i < rr->elements; i++) { - if (rr->element[i]->type != REDIS_REPLY_STRING) { - freeReplyObject(rr); - return -1; - } - str_init_len(&s, rr->element[i]->str, rr->element[i]->len); - if (cb(&s, q, list, ptr)) { - freeReplyObject(rr); - return -1; - } - } - - freeReplyObject(rr); - return 0; -} static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) { int j; j = str_to_i(s, 0); @@ -1095,12 +918,6 @@ static int json_build_list(GQueue *q, struct call *c, const char *key, const str return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL); } -static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid, - unsigned int idx, struct redis_list *list) -{ - return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL); -} - static int json_get_list_hash(struct redis_list *out, struct call* c, const char *key, const struct redis_hash *rh, const char *rh_num_key) @@ -1134,41 +951,6 @@ err1: return -1; } -static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const str *id, - const struct redis_hash *rh, const char *rh_num_key) -{ - unsigned int i; - - if (redis_hash_get_unsigned(&out->len, rh, rh_num_key)) - return -1; - out->rh = malloc(sizeof(*out->rh) * out->len); - if (!out->rh) - return -1; - out->ptrs = malloc(sizeof(*out->ptrs) * out->len); - if (!out->ptrs) - goto err1; - - for (i = 0; i < out->len; i++) { - if (redis_get_hash(&out->rh[i], r, key, id, i)) - goto err2; - } - - return 0; - -err2: - free(out->ptrs); - while (i) { - i--; - redis_destroy_hash(&out->rh[i]); - } -err1: - free(out->rh); - return -1; -} - - - - /* can return 1, 0 or -1 */ static int redis_hash_get_crypto_params(struct crypto_params *out, const struct redis_hash *h, const char *k) { str s; @@ -1432,62 +1214,6 @@ static int json_medias(struct redis *r, struct call *c, struct redis_list *media return 0; } -static int redis_medias(struct redis *r, struct call *c, struct redis_list *medias) { - unsigned int i; - struct redis_hash *rh; - struct call_media *med; - str s; - - for (i = 0; i < medias->len; i++) { - rh = &medias->rh[i]; - - /* from call.c:__get_media() */ - med = uid_slice_alloc0(med, &c->medias); - med->call = c; - med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, - __payload_type_free); - - if (redis_hash_get_unsigned(&med->index, rh, "index")) - return -1; - if (redis_hash_get_str(&s, rh, "type")) - return -1; - call_str_cpy(c, &med->type, &s); - - if (redis_hash_get_str(&s, rh, "protocol")) - return -1; - med->protocol = transport_protocol(&s); - - if (redis_hash_get_str(&s, rh, "desired_family")) - return -1; - med->desired_family = get_socket_family_rfc(&s); - - if (redis_hash_get_str(&s, rh, "logical_intf") - || !(med->logical_intf = get_logical_interface(&s, med->desired_family, 0))) - { - rlog(LOG_ERR, "unable to find specified local interface"); - med->logical_intf = get_logical_interface(NULL, med->desired_family, 0); - } - - if (redis_hash_get_unsigned(&med->sdes_in.tag, rh, "sdes_in_tag")) - return -1; - if (redis_hash_get_unsigned(&med->sdes_out.tag, rh, "sdes_out_tag")) - return -1; - if (redis_hash_get_unsigned((unsigned int *) &med->media_flags, rh, - "media_flags")) - return -1; - if (redis_hash_get_crypto_params(&med->sdes_in.params, rh, "sdes_in") < 0) - return -1; - if (redis_hash_get_crypto_params(&med->sdes_out.params, rh, "sdes_out") < 0) - return -1; - - redis_build_list_cb(NULL, r, "payload_types", &c->callid, i, NULL, rbl_cb_plts, med); - /* XXX dtls */ - - medias->ptrs[i] = med; - } - - return 0; -} static int redis_maps(struct call *c, struct redis_list *maps) { unsigned int i; @@ -1570,35 +1296,6 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ return 0; } -static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) -{ - unsigned int i; - struct call_monologue *ml, *other_ml; - GQueue q = G_QUEUE_INIT; - GList *l; - - for (i = 0; i < tags->len; i++) { - ml = tags->ptrs[i]; - - ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); - - if (redis_build_list(&q, r, "other_tags", &c->callid, i, tags)) - return -1; - for (l = q.head; l; l = l->next) { - other_ml = l->data; - if (!other_ml) - return -1; - g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml); - } - g_queue_clear(&q); - - if (redis_build_list(&ml->medias, r, "medias", &c->callid, i, medias)) - return -1; - } - - return 0; -} - static int json_link_streams(struct call *c, struct redis_list *streams, struct redis_list *sfds, struct redis_list *medias) { @@ -1624,31 +1321,6 @@ static int json_link_streams(struct call *c, struct redis_list *streams, return 0; } -static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, - struct redis_list *sfds, struct redis_list *medias) -{ - unsigned int i; - struct packet_stream *ps; - - for (i = 0; i < streams->len; i++) { - ps = streams->ptrs[i]; - - ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); - ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); - ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); - ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); - ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); - - if (redis_build_list(&ps->sfds, r, "stream_sfds", &c->callid, i, sfds)) - return -1; - - if (ps->media) - __rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types); - } - - return 0; -} - static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) { @@ -1669,26 +1341,6 @@ static int json_link_medias(struct redis *r, struct call *c, struct redis_list * return 0; } -static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias, - struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) -{ - unsigned int i; - struct call_media *med; - - for (i = 0; i < medias->len; i++) { - med = medias->ptrs[i]; - - med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); - if (!med->monologue) - return -1; - if (redis_build_list(&med->streams, r, "streams", &c->callid, i, streams)) - return -1; - if (redis_build_list(&med->endpoint_maps, r, "maps", &c->callid, i, maps)) - return -1; - } - return 0; -} - static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *ptr) { int i; struct intf_list *il; @@ -1734,37 +1386,21 @@ static int json_link_maps(struct redis *r, struct call *c, struct redis_list *ma return 0; } -static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps, - struct redis_list *sfds) -{ - unsigned int i; - struct endpoint_map *em; - - for (i = 0; i < maps->len; i++) { - em = maps->ptrs[i]; - - if (redis_build_list_cb(&em->intf_sfds, r, "map_sfds", &c->callid, em->unique_id, sfds, - rbl_cb_intf_sfds, em)) - return -1; - } - return 0; -} - static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { redisReply* rr_jsonStr; struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; + str s, id; + const char *err = 0; int i; - str s; JsonReader *root_reader =0; JsonParser *parser =0; - rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(callid)); + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, STR(callid)); if (!rr_jsonStr) { - rlog(LOG_ERR, "Could not retrieve json data from redis for callid: " STR_FORMAT, - STR_FMT(callid)); + rlog(LOG_ERR, "Could not retrieve json data from redis for key: "STR_FORMAT, STR_FMT(callid)); goto err1; } @@ -1790,6 +1426,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (c->last_signal) goto err2; err = "'call' data incomplete"; + if (json_get_hash(&call, c, "json", -1)) goto err2; err = "'tags' incomplete"; @@ -1820,10 +1457,10 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * c->tos = i; redis_hash_get_time_t(&c->deleted, &call, "deleted"); redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&s, &call, "created_from")) - c->created_from = call_strdup(c, s.s); - if (!redis_hash_get_str(&s, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &s); + if (!redis_hash_get_str(&id, &call, "created_from")) + c->created_from = call_strdup(c, id.s); + if (!redis_hash_get_str(&id, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &id); err = "missing 'redis_hosted_db' value"; if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) @@ -1861,6 +1498,14 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (json_link_maps(r, c, &maps, &sfds)) goto err8; + // presence of this key determines whether we were recording at all + if (!redis_hash_get_str(&s, &call, "recording_meta_prefix")) { + recording_start(c, s.s); + + if (!redis_hash_get_str(&s, &call, "recording_metadata")) + call_str_cpy(c, &c->recording->metadata, &s); + } + err = NULL; err8: @@ -1891,147 +1536,12 @@ err1: if (c) call_destroy(c); else - redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(callid)); + redisCommandNR(m->conf.redis_write->ctx, "DEL " PB, STR(callid)); } if (c) obj_put(c); } -static void redis_restore_recording(struct call *c, struct redis_hash *call) { - str s; - - // presence of this key determines whether we were recording at all - if (redis_hash_get_str(&s, call, "recording_meta_prefix")) - return; - - recording_start(c, s.s); - - if (!redis_hash_get_str(&s, call, "recording_metadata")) - call_str_cpy(c, &c->recording->metadata, &s); -} - - -static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { - struct redis_hash call; - struct redis_list tags, sfds, streams, medias, maps; - struct call *c = NULL; - str s; - const char *err; - int i; - - c = call_get_or_create(id, m, type); - err = "failed to create call struct"; - if (!c) - goto err1; - err = "call already exists"; - if (c->last_signal) - goto err2; - err = "'call' data incomplete"; - if (redis_get_hash(&call, r, "call", id, -1)) - goto err2; - err = "'tags' incomplete"; - if (redis_get_list_hash(&tags, r, "tag", id, &call, "num_tags")) - goto err3; - err = "'sfds' incomplete"; - if (redis_get_list_hash(&sfds, r, "sfd", id, &call, "num_sfds")) - goto err4; - err = "'streams' incomplete"; - if (redis_get_list_hash(&streams, r, "stream", id, &call, "num_streams")) - goto err5; - err = "'medias' incomplete"; - if (redis_get_list_hash(&medias, r, "media", id, &call, "num_medias")) - goto err6; - err = "'maps' incomplete"; - if (redis_get_list_hash(&maps, r, "map", id, &call, "num_maps")) - goto err7; - - err = "missing 'created' timestamp"; - if (redis_hash_get_time_t(&c->created, &call, "created")) - goto err8; - err = "missing 'last signal' timestamp"; - if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) - goto err8; - if (redis_hash_get_int(&i, &call, "tos")) - c->tos = 184; - else - c->tos = i; - redis_hash_get_time_t(&c->deleted, &call, "deleted"); - redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&s, &call, "created_from")) - c->created_from = call_strdup(c, s.s); - if (!redis_hash_get_str(&s, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &s); - - err = "missing 'redis_hosted_db' value"; - if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) - goto err8; - - err = "failed to create sfds"; - if (redis_sfds(c, &sfds)) - goto err8; - err = "failed to create streams"; - if (redis_streams(c, &streams)) - goto err8; - err = "failed to create tags"; - if (redis_tags(c, &tags)) - goto err8; - err = "failed to create medias"; - if (redis_medias(r, c, &medias)) - goto err8; - err = "failed to create maps"; - if (redis_maps(c, &maps)) - goto err8; - - err = "failed to link sfds"; - if (redis_link_sfds(&sfds, &streams)) - goto err8; - err = "failed to link streams"; - if (redis_link_streams(r, c, &streams, &sfds, &medias)) - goto err8; - err = "failed to link tags"; - if (redis_link_tags(r, c, &tags, &medias)) - goto err8; - err = "failed to link medias"; - if (redis_link_medias(r, c, &medias, &streams, &maps, &tags)) - goto err8; - err = "failed to link maps"; - if (redis_link_maps(r, c, &maps, &sfds)) - goto err8; - - redis_restore_recording(c, &call); - - err = NULL; - -err8: - redis_destroy_list(&maps); -err7: - redis_destroy_list(&medias); -err6: - redis_destroy_list(&streams); -err5: - redis_destroy_list(&sfds); -err4: - redis_destroy_list(&tags); -err3: - redis_destroy_hash(&call); -err2: - rwlock_unlock_w(&c->master_lock); -err1: - log_info_clear(); - if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(id), err); - if (c) - call_destroy(c); - else - redisCommandNR(m->conf.redis_write->ctx, "SREM calls "PB"", STR(id)); - } - - if (c) - obj_put(c); -} - - - struct thread_ctx { struct callmaster *m; GQueue r_q; @@ -2043,6 +1553,7 @@ static void restore_thread(void *call_p, void *ctx_p) { redisReply *call = call_p; struct redis *r; str callid; + str_init_len(&callid, call->str, call->len); rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call)); @@ -2050,14 +1561,7 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - str_init_len(&callid, call->str, call->len); - - if (ctx->m->conf.redis_multikey) { - redis_restore_call(r, ctx->m, &callid, CT_OWN_CALL); - } else { - if (str_shift_cmp(&callid, "json-") == 0) - json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); - } + json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -2085,11 +1589,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - if (m->conf.redis_multikey) { - calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); - } else { - calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); - } + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS *"); if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -2173,30 +1673,6 @@ static int json_update_crypto_params(JsonBuilder *builder, const char *pref, return 0; } -static int redis_update_crypto_params(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const char *key, const struct crypto_params *p) -{ - if (!p->crypto_suite) - return -1; - redis_pipe(r, "HMSET %s-"PB"-%u %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB" " - "%s-unenc-srtp %i %s-unenc-srtcp %i %s-unauth-srtp %i", - pref, STR(callid), unique_id, - key, p->crypto_suite->name, - key, S_LEN(p->master_key, sizeof(p->master_key)), - key, S_LEN(p->master_salt, sizeof(p->master_salt)), - key, p->session_params.unencrypted_srtp, - key, p->session_params.unencrypted_srtcp, - key, p->session_params.unauthenticated_srtp); - if (p->mki) - redis_pipe(r, "HMSET %s-"PB"-%u %s-mki "PB"", - pref, STR(callid), unique_id, - key, - S_LEN(p->mki, p->mki_len)); - - return 0; -} - static void json_update_crypto_context(JsonBuilder *builder, const char *pref, unsigned int unique_id, const struct crypto_context *c) @@ -2211,34 +1687,6 @@ static void json_update_crypto_context(JsonBuilder *builder, const char *pref, } -static void redis_update_crypto_context(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const struct crypto_context *c) -{ - if (redis_update_crypto_params(r, pref, callid, unique_id, "", &c->params)) - return; - redis_pipe(r, "HMSET %s-"PB"-%u last_index "UINT64F" ssrc %u", - pref, STR(callid), unique_id, - c->last_index, (unsigned) c->ssrc); -} -static void redis_update_endpoint(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const char *key, const struct endpoint *e) -{ - redis_pipe(r, "HMSET %s-"PB"-%u %s %s", - pref, STR(callid), unique_id, - key, endpoint_print_buf(e)); -} -static void redis_update_stats(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const char *key, const struct stats *s) -{ - redis_pipe(r, "HMSET %s-"PB"-%u %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"", - pref, STR(callid), unique_id, - key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), - key, atomic64_get(&s->errors)); -} - static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, unsigned int unique_id, const struct dtls_fingerprint *f) @@ -2250,17 +1698,6 @@ static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, JSON_SET_SIMPLE_LEN("fingerprint", sizeof(f->digest), (char *) f->digest); } -static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, const str *callid, - unsigned int unique_id, - const struct dtls_fingerprint *f) -{ - if (!f->hash_func) - return; - redis_pipe(r, "HMSET %s-"PB"-%u hash_func %s fingerprint "PB"", - pref, STR(callid), unique_id, - f->hash_func->name, - S_LEN(f->digest, sizeof(f->digest))); -} /** * encodes the few (k,v) pairs for one call under one json structure */ @@ -2276,6 +1713,7 @@ char* redis_encode_json(struct call *c) { struct intf_list *il; struct call_monologue *ml, *ml2; JsonBuilder *builder = json_builder_new (); + struct recording *rec = 0; char tmp[2048]; @@ -2300,6 +1738,10 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE_CSTR("created_from_addr",sockaddr_print_buf(&c->created_from_addr)); JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db); + if ((rec = c->recording)) { + JSON_SET_SIMPLE_STR("recording_metadata",&rec->metadata); + JSON_SET_SIMPLE_CSTR("recording_meta_prefix",rec->meta_prefix); + } } json_builder_end_object (builder); @@ -2586,8 +2028,8 @@ void redis_update_onekey(struct call *c, struct redis *r) { if (!result) goto err; - redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); - redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); + redis_pipe(r, "SET "PB" %s", STR(&c->callid), result); + redis_pipe(r, "EXPIRE "PB" %i", STR(&c->callid), redis_expires_s); redis_consume(r); @@ -2608,328 +2050,6 @@ err: } -static void redis_update_recording(struct redis *r, struct call *c) { - struct recording *rec; - - if (!(rec = c->recording)) - return; - - redis_pipe(r, "HMSET call-"PB" recording_metadata "PB" recording_meta_prefix %s ", - STR(&c->callid), - STR(&rec->metadata), rec->meta_prefix); -} - - -/* - * Redis data structure: - * - * SET: calls %s %s %s ... - * - * HASH: call-$callid num_sfds %u num_streams %u num_medias %u num_tags %u num_maps %u - * - * HASH: sfd-$callid-$num stream %u - * - * HASH: stream-$callid-$num media %u sfd %u rtp_sink %u rtcp_sink %u rtcp_sibling %u - * LIST: stream_sfds-$callid-$num %u %u ... - * - * HASH: tag-$callid-$num - * LIST: other_tags-$callid-$num %u %u ... - * LIST: medias-$callid-$num %u %u ... - * - * HASH: media-$callid-$num tag %u - * LIST: streams-$callid-$num %u %u ... - * LIST: maps-$callid-$num %u %u ... - * - * HASH: map-$callid-$num - * LIST: map_sfds-$callid-$num %u %u ... - */ - -/* must be called lock-free */ -void redis_update(struct call *c, struct redis *r) { - GList *l, *n, *k, *m; - struct call_monologue *ml, *ml2; - - struct call_media *media; - struct packet_stream *ps; - struct stream_fd *sfd; - struct intf_list *il; - struct endpoint_map *ep; - struct rtp_payload_type *pt; - unsigned int redis_expires_s; - - if (!r) - return; - - mutex_lock(&r->lock); - if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { - mutex_unlock(&r->lock); - return ; - } - - rwlock_lock_r(&c->master_lock); - - redis_expires_s = c->callmaster->conf.redis_expires_secs; - - c->redis_hosted_db = r->db; - if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { - rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); - goto err; - } - - redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); - redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); - redis_pipe(r, "DEL call-"PB"", STR(&c->callid)); - redis_pipe(r, "HMSET call-"PB" created %llu last_signal %llu tos %i deleted %llu " - "num_sfds %u num_streams %u num_medias %u num_tags %u " - "num_maps %u " - "ml_deleted %llu created_from %s created_from_addr %s redis_hosted_db %u", - STR(&c->callid), (long long unsigned) c->created, (long long unsigned) c->last_signal, - (int) c->tos, (long long unsigned) c->deleted, - g_queue_get_length(&c->stream_fds), g_queue_get_length(&c->streams), - g_queue_get_length(&c->medias), g_queue_get_length(&c->monologues), - g_queue_get_length(&c->endpoint_maps), - (long long unsigned) c->ml_deleted, - c->created_from, sockaddr_print_buf(&c->created_from_addr), - c->redis_hosted_db); - /* XXX DTLS cert?? */ - - redis_update_recording(r, c); - - redis_pipe(r, "DEL sfd-"PB"-0", STR(&c->callid)); - - for (l = c->stream_fds.head; l; l = l->next) { - sfd = l->data; - - redis_pipe(r, "HMSET sfd-"PB"-%u pref_family %s localport %u logical_intf "PB" " - "local_intf_uid %u " - "stream %u", - STR(&c->callid), sfd->unique_id, - sfd->local_intf->logical->preferred_family->rfc_name, - sfd->socket.local.port, - STR(&sfd->local_intf->logical->name), - sfd->local_intf->unique_id, - sfd->stream->unique_id); - redis_update_crypto_context(r, "sfd", &c->callid, sfd->unique_id, &sfd->crypto); - /* XXX DTLS?? */ - redis_pipe(r, "EXPIRE sfd-"PB"-%u %u", STR(&c->callid), sfd->unique_id, redis_expires_s); - - redis_pipe(r, "DEL sfd-"PB"-%u", STR(&c->callid), sfd->unique_id + 1); - } - - redis_pipe(r, "DEL stream-"PB"-0 stream_sfds-"PB"-0", STR(&c->callid), STR(&c->callid)); - - for (l = c->streams.head; l; l = l->next) { - ps = l->data; - - mutex_lock(&ps->in_lock); - mutex_lock(&ps->out_lock); - - redis_pipe(r, "HMSET stream-"PB"-%u media %u sfd %u rtp_sink %u " - "rtcp_sink %u rtcp_sibling %u last_packet "UINT64F" " - "ps_flags %u component %u", - STR(&c->callid), ps->unique_id, - ps->media->unique_id, - ps->selected_sfd ? ps->selected_sfd->unique_id : -1, - ps->rtp_sink ? ps->rtp_sink->unique_id : -1, - ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1, - ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1, - atomic64_get(&ps->last_packet), - ps->ps_flags, - ps->component); - redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "endpoint", &ps->endpoint); - redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "advertised_endpoint", - &ps->advertised_endpoint); - redis_update_stats(r, "stream", &c->callid, ps->unique_id, "stats", &ps->stats); - redis_update_crypto_context(r, "stream", &c->callid, ps->unique_id, &ps->crypto); - /* XXX DTLS?? */ - - for (k = ps->sfds.head; k; k = k->next) { - sfd = k->data; - redis_pipe(r, "RPUSH stream_sfds-"PB"-%u %u", - STR(&c->callid), ps->unique_id, - sfd->unique_id); - } - - mutex_unlock(&ps->in_lock); - mutex_unlock(&ps->out_lock); - - redis_pipe(r, "EXPIRE stream-"PB"-%u %u", STR(&c->callid), ps->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE stream_sfds-"PB"-%u %u", STR(&c->callid), ps->unique_id, redis_expires_s); - - redis_pipe(r, "DEL stream-"PB"-%u stream_sfds-"PB"-%u", - STR(&c->callid), ps->unique_id + 1, - STR(&c->callid), ps->unique_id + 1); - } - - redis_pipe(r, "DEL tag-"PB"-0 other_tags-"PB"-0 medias-"PB"-0", - STR(&c->callid), STR(&c->callid), STR(&c->callid)); - - for (l = c->monologues.head; l; l = l->next) { - ml = l->data; - - redis_pipe(r, "HMSET tag-"PB"-%u created %llu active %u deleted %llu", - STR(&c->callid), ml->unique_id, - (long long unsigned) ml->created, - ml->active_dialogue ? ml->active_dialogue->unique_id : -1, - (long long unsigned) ml->deleted); - if (ml->tag.s) - redis_pipe(r, "HMSET tag-"PB"-%u tag "PB"", - STR(&c->callid), ml->unique_id, - STR(&ml->tag)); - if (ml->viabranch.s) - redis_pipe(r, "HMSET tag-"PB"-%u via-branch "PB"", - STR(&c->callid), ml->unique_id, - STR(&ml->viabranch)); - - k = g_hash_table_get_values(ml->other_tags); - for (m = k; m; m = m->next) { - ml2 = m->data; - redis_pipe(r, "RPUSH other_tags-"PB"-%u %u", - STR(&c->callid), ml->unique_id, - ml2->unique_id); - } - g_list_free(k); - - for (k = ml->medias.head; k; k = k->next) { - media = k->data; - redis_pipe(r, "RPUSH medias-"PB"-%u %u", - STR(&c->callid), ml->unique_id, - media->unique_id); - } - - redis_pipe(r, "EXPIRE tag-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE other_tags-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE medias-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s); - - redis_pipe(r, "DEL tag-"PB"-%u other_tags-"PB"-%u medias-"PB"-%u", - STR(&c->callid), ml->unique_id + 1, - STR(&c->callid), ml->unique_id + 1, - STR(&c->callid), ml->unique_id + 1); - } - - redis_pipe(r, "DEL media-"PB"-0 streams-"PB"-0 maps-"PB"-0 payload_types-"PB"-0", - STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid)); - - for (l = c->medias.head; l; l = l->next) { - media = l->data; - - redis_pipe(r, "HMSET media-"PB"-%u " - "tag %u " - "index %u " - "type "PB" protocol %s desired_family %s " - "sdes_in_tag %u sdes_out_tag %u logical_intf "PB" " - "media_flags %u", - STR(&c->callid), media->unique_id, - media->monologue->unique_id, - media->index, - STR(&media->type), media->protocol ? media->protocol->name : "", - media->desired_family ? media->desired_family->rfc_name : "", - media->sdes_in.tag, media->sdes_out.tag, - STR(&media->logical_intf->name), - media->media_flags); - redis_update_crypto_params(r, "media", &c->callid, media->unique_id, "sdes_in", - &media->sdes_in.params); - redis_update_crypto_params(r, "media", &c->callid, media->unique_id, "sdes_out", - &media->sdes_out.params); - redis_update_dtls_fingerprint(r, "media", &c->callid, media->unique_id, &media->fingerprint); - - for (m = media->streams.head; m; m = m->next) { - ps = m->data; - redis_pipe(r, "RPUSH streams-"PB"-%u %u", - STR(&c->callid), media->unique_id, - ps->unique_id); - } - - for (m = media->endpoint_maps.head; m; m = m->next) { - ep = m->data; - redis_pipe(r, "RPUSH maps-"PB"-%u %u", - STR(&c->callid), media->unique_id, - ep->unique_id); - } - - k = g_hash_table_get_values(media->rtp_payload_types); - for (m = k; m; m = m->next) { - pt = m->data; - redis_pipe(r, "RPUSH payload_types-"PB"-%u %u/"PB"/%u/"PB"", - STR(&c->callid), media->unique_id, - pt->payload_type, STR(&pt->encoding), - pt->clock_rate, STR(&pt->encoding_parameters)); - } - g_list_free(k); - - redis_pipe(r, "EXPIRE media-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE streams-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE maps-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE payload_types-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s); - - redis_pipe(r, "DEL media-"PB"-%u streams-"PB"-%u maps-"PB"-%u payload_types-"PB"-%u", - STR(&c->callid), media->unique_id + 1, - STR(&c->callid), media->unique_id + 1, - STR(&c->callid), media->unique_id + 1, - STR(&c->callid), media->unique_id + 1); - } - - redis_pipe(r, "DEL map-"PB"-0 map_sfds-"PB"-0", - STR(&c->callid), STR(&c->callid)); - - for (l = c->endpoint_maps.head; l; l = l->next) { - ep = l->data; - - redis_pipe(r, "HMSET map-"PB"-%u wildcard %i num_ports %u intf_preferred_family %s " - "logical_intf "PB"", - STR(&c->callid), ep->unique_id, - ep->wildcard, - ep->num_ports, - ep->logical_intf->preferred_family->rfc_name, - STR(&ep->logical_intf->name)); - redis_update_endpoint(r, "map", &c->callid, ep->unique_id, "endpoint", &ep->endpoint); - - for (m = ep->intf_sfds.head; m; m = m->next) { - il = m->data; - - redis_pipe(r, "RPUSH map_sfds-"PB"-%u loc-%u", - STR(&c->callid), ep->unique_id, - il->local_intf->unique_id); - - for (n = il->list.head; n; n = n->next) { - sfd = n->data; - - redis_pipe(r, "RPUSH map_sfds-"PB"-%u %u", - STR(&c->callid), ep->unique_id, - sfd->unique_id); - } - - } - - redis_pipe(r, "EXPIRE map-"PB"-%u %u", STR(&c->callid), ep->unique_id, redis_expires_s); - redis_pipe(r, "EXPIRE map_sfds-"PB"-%u %u", STR(&c->callid), ep->unique_id, redis_expires_s); - - redis_pipe(r, "DEL map-"PB"-%u map_sfds-"PB"-%u", - STR(&c->callid), ep->unique_id + 1, - STR(&c->callid), ep->unique_id + 1); - } - - redis_pipe(r, "EXPIRE call-"PB" %u", STR(&c->callid), redis_expires_s); - redis_pipe(r, "SADD calls "PB"", STR(&c->callid)); - redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); - redis_pipe(r, "EXPIRE notifier-"PB" %u", STR(&c->callid), redis_expires_s); - - redis_consume(r); - - mutex_unlock(&r->lock); - rwlock_unlock_r(&c->master_lock); - - return; -err: - - mutex_unlock(&r->lock); - rwlock_unlock_r(&c->master_lock); - if (r->ctx->err) - rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); - redisFree(r->ctx); - r->ctx = NULL; -} - /* must be called lock-free */ void redis_delete(struct call *c, struct redis *r) { if (!r) @@ -2945,11 +2065,7 @@ void redis_delete(struct call *c, struct redis *r) { if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) goto err; - if (c->callmaster->conf.redis_multikey) { - redis_delete_call(c, r); - } else { - redis_delete_call_json(c, r); - } + redis_delete_call_json(c, r); rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock); diff --git a/daemon/redis.h b/daemon/redis.h index ce4eddf67..1932ea8d3 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -13,6 +13,7 @@ #include #include #include "call.h" +#include "str.h" #define REDIS_RESTORE_NUM_THREADS 4 @@ -61,10 +62,11 @@ struct redis { int state; int no_redis_required; }; + struct redis_hash { - redisReply *rr; GHashTable *ht; }; + struct redis_list { unsigned int len; struct redis_hash *rh; @@ -72,12 +74,6 @@ struct redis_list { }; - - - - - - #if !GLIB_CHECK_VERSION(2,40,0) INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) { gboolean ret = TRUE; @@ -129,12 +125,12 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a #define define_get_int_type(name, type, func) \ static int redis_hash_get_ ## name(type *out, const struct redis_hash *h, const char *k) { \ - redisReply *r; \ + str* s; \ \ - r = g_hash_table_lookup(h->ht, k); \ - if (!r) \ + s = g_hash_table_lookup(h->ht, k); \ + if (!s) \ return -1; \ - *out = func(r->str, NULL, 10); \ + *out = func(s->s, NULL, 10); \ return 0; \ }