Browse Source

MT#55283 use abstract parser for Redis restore

Change-Id: Ic4b61bee33b71a759b5a352393e2bc2738d87b86
pull/1853/head
Richard Fuchs 1 year ago
parent
commit
ef787fb119
3 changed files with 123 additions and 144 deletions
  1. +5
    -1
      daemon/control_ng.c
  2. +116
    -143
      daemon/redis.c
  3. +2
    -0
      include/control_ng.h

+ 5
- 1
daemon/control_ng.c View File

@ -270,7 +270,11 @@ static parser_arg json_dict_get_expect(JsonNode *dict, const char *entry, bencod
return (parser_arg) NULL;
switch (type) {
case BENCODE_LIST:
if (json_node_get_value_type(n) != JSON_NODE_ARRAY)
if (json_node_get_node_type(n) != JSON_NODE_ARRAY)
return (parser_arg) NULL;
return (parser_arg) n;
case BENCODE_DICTIONARY:
if (json_node_get_node_type(n) != JSON_NODE_OBJECT)
return (parser_arg) NULL;
return (parser_arg) n;
default:


+ 116
- 143
daemon/redis.c View File

@ -50,6 +50,9 @@ struct redis *rtpe_redis_write_disabled;
struct redis *rtpe_redis_notify;
static const ng_parser_t *redis_parser = &ng_parser_json;
INLINE redisReply *redis_expect(int type, redisReply *r) {
if (!r)
return NULL;
@ -106,7 +109,7 @@ static int redis_ports_release_balance = 0; // negative = releasers, positive =
static int redis_check_conn(struct redis *r);
static void json_restore_call(struct redis *r, const str *id, bool foreign);
static int redis_connect(struct redis *r, int wait);
static int json_build_ssrc(struct call_monologue *ml, JsonReader *root_reader);
static int json_build_ssrc(struct call_monologue *ml, parser_arg arg);
// mutually exclusive multi-A multi-B lock
@ -1005,31 +1008,32 @@ INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const ch
str_uri_encode_len(enc, tmp, len);
json_builder_add_string_value(builder,enc);
}
INLINE str *json_reader_get_string_value_uri_enc(JsonReader *root_reader) {
const char *s = json_reader_get_string_value(root_reader);
if (!s)
return NULL;
str *out = str_uri_decode_len(s, strlen(s));
return out; // must be free'd
}
// XXX rework restore procedure to use functions like this everywhere and eliminate the GHashTable
INLINE long long json_reader_get_ll(JsonReader *root_reader, const char *key) {
if (!json_reader_read_member(root_reader, key))
return -1;
str *ret = json_reader_get_string_value_uri_enc(root_reader);
long long r = strtoll(ret->s, NULL, 10);
free(ret);
json_reader_end_member(root_reader);
return r;
INLINE long long parser_get_ll(parser_arg arg, const char *key) {
return redis_parser->dict_get_int_str(arg, key, -1);
}
static void json_get_hash_iter(const ng_parser_t *parser, str *key, parser_arg val_a, helper_arg arg) {
str val;
if (!parser->get_str(val_a, &val)) {
rlog(LOG_ERROR, "Could not read json member: " STR_FORMAT, STR_FMT(key));
return;
}
// XXX convert to proper str ht
char *tmp = g_memdup2(key->s, key->len + 1);
tmp[key->len] = '\0';
// XXX eliminate string dup? eliminate URI decode?
if (g_hash_table_insert(arg.ht, tmp, str_uri_decode_len(val.s, val.len)) != TRUE)
rlog(LOG_WARNING,"Key %s already exists", tmp);
}
static int json_get_hash(struct redis_hash *out,
const char *key, unsigned int id, JsonReader *root_reader)
const char *key, unsigned int id, parser_arg root)
{
static unsigned int MAXKEYLENGTH = 512;
char key_concatted[MAXKEYLENGTH];
int rc=0;
g_autoptr(char_p) orig_members = NULL;
if (id == -1) {
rc = snprintf(key_concatted, MAXKEYLENGTH, "%s",key);
@ -1038,48 +1042,22 @@ static int json_get_hash(struct redis_hash *out,
}
if (rc>=MAXKEYLENGTH) {
rlog(LOG_ERROR,"Json key too long.");
goto err;
return -1;
}
if (!json_reader_read_member(root_reader, key_concatted)) {
parser_arg dict = redis_parser->dict_get_expect(root, key_concatted, BENCODE_DICTIONARY);
if (!dict.gen) {
rlog(LOG_ERROR, "Could not read json member: %s",key_concatted);
goto err;
return -1;
}
out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
if (!out->ht)
goto err;
gchar **members = json_reader_list_members(root_reader);
orig_members = members;
int nmemb = json_reader_count_members (root_reader);
for (int i=0; i < nmemb; ++i) {
if (!json_reader_read_member(root_reader, *members)) {
rlog(LOG_ERROR, "Could not read json member: %s",*members);
goto err3;
}
str *val = json_reader_get_string_value_uri_enc(root_reader);
char* tmp = strdup(*members);
if (g_hash_table_insert(out->ht, tmp, val) != TRUE) {
rlog(LOG_WARNING,"Key %s already exists", tmp);
goto err3;
}
json_reader_end_member(root_reader);
return -1;
++members;
} // for
json_reader_end_member (root_reader);
redis_parser->dict_iter(redis_parser, dict, json_get_hash_iter, out->ht);
return 0;
err3:
g_hash_table_destroy(out->ht);
err:
return -1;
}
static void json_destroy_hash(struct redis_hash *rh) {
@ -1205,38 +1183,40 @@ static void *redis_list_get_ptr(struct redis_list *list, struct redis_hash *rh,
return redis_list_get_idx_ptr(list, idx);
}
struct cb_iter_ptrs { // XXX remove this?
int (*cb)(str *, callback_arg_t, struct redis_list *, void *);
callback_arg_t cb_arg;
struct redis_list *list;
void *ptr;
};
static void json_build_list_cb_iter(str *val, unsigned int i, helper_arg arg) {
struct cb_iter_ptrs *args = arg.generic;
str *s = str_uri_decode_len(val->s, val->len);
args->cb(s, args->cb_arg, args->list, args->ptr);
g_free(s);
}
static int json_build_list_cb(callback_arg_t q, call_t *c, const char *key,
unsigned int idx, struct redis_list *list,
int (*cb)(str *, callback_arg_t, struct redis_list *, void *), void *ptr, JsonReader *root_reader)
int (*cb)(str *, callback_arg_t, struct redis_list *, void *), void *ptr, parser_arg arg)
{
char key_concatted[256];
snprintf(key_concatted, 256, "%s-%u", key, idx);
if (!json_reader_read_member(root_reader, key_concatted)) {
parser_arg r_list = redis_parser->dict_get_expect(arg, key_concatted, BENCODE_LIST);
if (!r_list.gen) {
rlog(LOG_ERROR,"Key in json not found:%s",key_concatted);
return -1;
}
int nmemb = json_reader_count_elements(root_reader);
for (int jidx=0; jidx < nmemb; ++jidx) {
if (!json_reader_read_element(root_reader,jidx)) {
rlog(LOG_ERROR,"Element in array not found.");
return -1;
}
str *s = json_reader_get_string_value_uri_enc(root_reader);
if (!s) {
rlog(LOG_ERROR,"String in json not found.");
return -1;
}
if (cb(s, q, list, ptr)) {
free(s);
return -1;
}
free(s);
json_reader_end_element(root_reader);
}
json_reader_end_member (root_reader);
struct cb_iter_ptrs args = {
.cb = cb,
.cb_arg = q,
.list = list,
.ptr = ptr,
};
redis_parser->list_iter(redis_parser, r_list, json_build_list_cb_iter, NULL, &args);
return 0;
}
@ -1257,20 +1237,20 @@ static int rbpa_cb_simple(str *s, callback_arg_t pap, struct redis_list *list, v
}
static int json_build_list(callback_arg_t q, call_t *c, const char *key,
unsigned int idx, struct redis_list *list, JsonReader *root_reader)
unsigned int idx, struct redis_list *list, parser_arg arg)
{
return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL, root_reader);
return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL, arg);
}
static int json_build_ptra(medias_arr *q, call_t *c, const char *key,
unsigned int idx, struct redis_list *list, JsonReader *root_reader)
unsigned int idx, struct redis_list *list, parser_arg arg)
{
return json_build_list_cb(q, c, key, idx, list, rbpa_cb_simple, NULL, root_reader);
return json_build_list_cb(q, c, key, idx, list, rbpa_cb_simple, NULL, arg);
}
static int json_get_list_hash(struct redis_list *out,
const char *key,
const struct redis_hash *rh, const char *rh_num_key, JsonReader *root_reader)
const struct redis_hash *rh, const char *rh_num_key, parser_arg arg)
{
unsigned int i;
@ -1284,7 +1264,7 @@ static int json_get_list_hash(struct redis_list *out,
goto err1;
for (i = 0; i < out->len; i++) {
if (json_get_hash(&out->rh[i], key, i, root_reader))
if (json_get_hash(&out->rh[i], key, i, arg))
goto err2;
}
@ -1476,7 +1456,7 @@ static int redis_streams(call_t *c, struct redis_list *streams) {
return 0;
}
static int redis_tags(call_t *c, struct redis_list *tags, JsonReader *root_reader) {
static int redis_tags(call_t *c, struct redis_list *tags, parser_arg arg) {
unsigned int i;
int ii;
atomic64 a64;
@ -1564,7 +1544,7 @@ static int redis_tags(call_t *c, struct redis_list *tags, JsonReader *root_reade
ml->logical_intf = get_logical_interface(NULL, ml->desired_family, 0);
}
if (json_build_ssrc(ml, root_reader))
if (json_build_ssrc(ml, arg))
return -1;
tags->ptrs[i] = ml;
@ -1594,7 +1574,7 @@ static int rbl_cb_plts_r(str *s, callback_arg_t dummy, struct redis_list *list,
return 0;
}
static int json_medias(call_t *c, struct redis_list *medias, struct redis_list *tags,
JsonReader *root_reader)
parser_arg arg)
{
unsigned int i;
int ii;
@ -1653,7 +1633,7 @@ static int json_medias(call_t *c, struct redis_list *medias, struct redis_list *
med->bandwidth_rr = (!redis_hash_get_int(&ii, rh, "bandwidth_rr")) ? ii : -1;
med->bandwidth_rs = (!redis_hash_get_int(&ii, rh, "bandwidth_rs")) ? ii : -1;
json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts_r, med, root_reader);
json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts_r, med, arg);
/* XXX dtls */
/* link monologue */
@ -1760,7 +1740,7 @@ static int rbl_subs_cb(str *s, callback_arg_t dummy, struct redis_list *list, vo
return 0;
}
static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list *medias, JsonReader *root_reader)
static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list *medias, parser_arg arg)
{
unsigned int i;
struct call_monologue *ml, *other_ml;
@ -1777,20 +1757,17 @@ static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list
snprintf(key_subscriptions_noa, 256, "subscriptions-noa-%u", i);
/* Legacy */
if (json_reader_read_member(root_reader, key_subscriptions))
if (redis_parser->dict_contains(arg, key_subscriptions))
rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped.");
json_reader_end_member(root_reader);
if (json_reader_read_member(root_reader, key_subscriptions_oa))
if (redis_parser->dict_contains(arg, key_subscriptions_oa))
rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped.");
json_reader_end_member(root_reader);
if (json_reader_read_member(root_reader, key_subscriptions_noa))
if (redis_parser->dict_contains(arg, key_subscriptions_noa))
rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped.");
json_reader_end_member(root_reader);
/* associated tags */
if (json_build_list(&q, c, "associated_tags", i, tags, root_reader))
if (json_build_list(&q, c, "associated_tags", i, tags, arg))
return -1;
for (l = q.head; l; l = l->next)
{
@ -1801,7 +1778,7 @@ static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list
}
g_queue_clear(&q);
if (json_build_ptra(ml->medias, c, "medias", i, medias, root_reader))
if (json_build_ptra(ml->medias, c, "medias", i, medias, arg))
return -1;
}
@ -1827,7 +1804,7 @@ static struct media_subscription *__find_media_subscriber(struct call_media *med
}
static int json_link_streams(call_t *c, struct redis_list *streams,
struct redis_list *sfds, struct redis_list *medias, JsonReader *root_reader)
struct redis_list *sfds, struct redis_list *medias, parser_arg arg)
{
unsigned int i;
struct packet_stream *ps;
@ -1842,10 +1819,10 @@ static int json_link_streams(call_t *c, struct redis_list *streams,
ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd");
ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling");
if (json_build_list(&ps->sfds, c, "stream_sfds", i, sfds, root_reader))
if (json_build_list(&ps->sfds, c, "stream_sfds", i, sfds, arg))
return -1;
if (json_build_list(&q, c, "rtp_sinks", i, streams, root_reader))
if (json_build_list(&q, c, "rtp_sinks", i, streams, arg))
return -1;
for (l = q.head; l; l = l->next) {
struct packet_stream *sink = l->data;
@ -1866,7 +1843,7 @@ static int json_link_streams(call_t *c, struct redis_list *streams,
__add_sink_handler(&ps->rtp_sinks, sink, NULL);
}
if (json_build_list(&q, c, "rtcp_sinks", i, streams, root_reader))
if (json_build_list(&q, c, "rtcp_sinks", i, streams, arg))
return -1;
for (l = q.head; l; l = l->next) {
struct packet_stream *sink = l->data;
@ -1893,16 +1870,16 @@ static int json_link_streams(call_t *c, struct redis_list *streams,
}
static int json_link_medias(call_t *c, struct redis_list *medias,
struct redis_list *streams, struct redis_list *maps, JsonReader *root_reader)
struct redis_list *streams, struct redis_list *maps, parser_arg arg)
{
for (unsigned int i = 0; i < medias->len; i++)
{
struct call_media *med = medias->ptrs[i];
if (!med || !med->monologue)
continue;
if (json_build_list(&med->streams, c, "streams", i, streams, root_reader))
if (json_build_list(&med->streams, c, "streams", i, streams, arg))
return -1;
if (json_build_list(&med->endpoint_maps, c, "maps", i, maps, root_reader))
if (json_build_list(&med->endpoint_maps, c, "maps", i, maps, arg))
return -1;
if (med->media_id.s)
@ -1910,7 +1887,7 @@ static int json_link_medias(call_t *c, struct redis_list *medias,
/* find the pair media to subscribe */
if (!json_build_list_cb(NULL, c, "media-subscriptions", med->unique_id,
medias, rbl_subs_cb, med, root_reader))
medias, rbl_subs_cb, med, arg))
{
rlog(LOG_DEBUG, "Restored media subscriptions for: '" STR_FORMAT_M "'", STR_FMT_M(&med->monologue->tag));
}
@ -1949,7 +1926,7 @@ static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list,
}
static int json_link_maps(call_t *c, struct redis_list *maps,
struct redis_list *sfds, JsonReader *root_reader)
struct redis_list *sfds, parser_arg arg)
{
unsigned int i;
struct endpoint_map *em;
@ -1958,41 +1935,39 @@ static int json_link_maps(call_t *c, struct redis_list *maps,
em = maps->ptrs[i];
if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", em->unique_id, sfds,
rbl_cb_intf_sfds, em, root_reader))
rbl_cb_intf_sfds, em, arg))
return -1;
}
return 0;
}
static int json_build_ssrc(struct call_monologue *ml, JsonReader *root_reader) {
static void json_build_ssrc_iter(const ng_parser_t *parser, parser_arg dict, helper_arg arg) {
struct call_monologue *ml = arg.ml;
uint32_t ssrc = parser_get_ll(dict, "ssrc");
struct ssrc_entry_call *se = get_ssrc(ssrc, ml->ssrc_hash);
if (!se)
return;
atomic_set_na(&se->input_ctx.stats->ext_seq, parser_get_ll(dict, "in_srtp_index"));
atomic_set_na(&se->input_ctx.stats->rtcp_seq, parser_get_ll(dict, "in_srtcp_index"));
payload_tracker_add(&se->input_ctx.tracker, parser_get_ll(dict, "in_payload_type"));
atomic_set_na(&se->output_ctx.stats->ext_seq, parser_get_ll(dict, "out_srtp_index"));
atomic_set_na(&se->output_ctx.stats->rtcp_seq, parser_get_ll(dict, "out_srtcp_index"));
payload_tracker_add(&se->output_ctx.tracker, parser_get_ll(dict, "out_payload_type"));
obj_put(&se->h);
}
static int json_build_ssrc(struct call_monologue *ml, parser_arg arg) {
char tmp[2048];
snprintf(tmp, sizeof(tmp), "ssrc_table-%u", ml->unique_id);
if (!json_reader_read_member(root_reader, "ssrc_table")) {
parser_arg list = redis_parser->dict_get_expect(arg, tmp, BENCODE_LIST);
if (!list.gen) {
// non-fatal for backwards compatibility
json_reader_end_member(root_reader);
return 0;
}
int nmemb = json_reader_count_elements(root_reader);
for (int jidx=0; jidx < nmemb; ++jidx) {
if (!json_reader_read_element(root_reader, jidx))
return -1;
uint32_t ssrc = json_reader_get_ll(root_reader, "ssrc");
struct ssrc_entry_call *se = get_ssrc(ssrc, ml->ssrc_hash);
if (!se)
goto next;
atomic_set_na(&se->input_ctx.stats->ext_seq, json_reader_get_ll(root_reader, "in_srtp_index"));
atomic_set_na(&se->input_ctx.stats->rtcp_seq, json_reader_get_ll(root_reader, "in_srtcp_index"));
payload_tracker_add(&se->input_ctx.tracker, json_reader_get_ll(root_reader, "in_payload_type"));
atomic_set_na(&se->output_ctx.stats->ext_seq, json_reader_get_ll(root_reader, "out_srtp_index"));
atomic_set_na(&se->output_ctx.stats->rtcp_seq, json_reader_get_ll(root_reader, "out_srtcp_index"));
payload_tracker_add(&se->output_ctx.tracker, json_reader_get_ll(root_reader, "out_payload_type"));
obj_put(&se->h);
next:
json_reader_end_element(root_reader);
}
json_reader_end_member (root_reader);
redis_parser->list_iter(redis_parser, list, NULL, json_build_ssrc_iter, ml);
return 0;
}
@ -2007,7 +1982,7 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
const char *err = 0;
int i;
atomic64 a64;
JsonReader *root_reader =0;
JsonNode *json_root = NULL;
JsonParser *parser =0;
mutex_lock(&r->lock);
@ -2025,11 +2000,11 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
err = "could not parse JSON data";
if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL))
goto err1;
root_reader = json_reader_new (json_parser_get_root (parser));
json_root = json_parser_get_root(parser);
err = "could not read JSON data";
if (!root_reader)
if (!json_root)
goto err1;
parser_arg root = {.json = json_root};
c = call_get_or_create(callid, false);
err = "failed to create call struct";
@ -2037,7 +2012,7 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
goto err1;
err = "'call' data incomplete";
if (json_get_hash(&call, "json", -1, root_reader))
if (json_get_hash(&call, "json", -1, root))
goto err2;
err = "missing 'last signal' timestamp";
@ -2056,19 +2031,19 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
}
err = "'tags' incomplete";
if (json_get_list_hash(&tags, "tag", &call, "num_tags", root_reader))
if (json_get_list_hash(&tags, "tag", &call, "num_tags", root))
goto err3;
err = "'sfds' incomplete";
if (json_get_list_hash(&sfds, "sfd", &call, "num_sfds", root_reader))
if (json_get_list_hash(&sfds, "sfd", &call, "num_sfds", root))
goto err4;
err = "'streams' incomplete";
if (json_get_list_hash(&streams, "stream", &call, "num_streams", root_reader))
if (json_get_list_hash(&streams, "stream", &call, "num_streams", root))
goto err5;
err = "'medias' incomplete";
if (json_get_list_hash(&medias, "media", &call, "num_medias", root_reader))
if (json_get_list_hash(&medias, "media", &call, "num_medias", root))
goto err6;
err = "'maps' incomplete";
if (json_get_list_hash(&maps, "map", &call, "num_maps", root_reader))
if (json_get_list_hash(&maps, "map", &call, "num_maps", root))
goto err7;
err = "missing 'created' timestamp";
@ -2102,10 +2077,10 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
if (redis_streams(c, &streams))
goto err8;
err = "failed to create tags";
if (redis_tags(c, &tags, root_reader))
if (redis_tags(c, &tags, root))
goto err8;
err = "failed to create medias";
if (json_medias(c, &medias, &tags, root_reader))
if (json_medias(c, &medias, &tags, root))
goto err8;
err = "failed to create maps";
if (redis_maps(c, &maps))
@ -2115,16 +2090,16 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
if (redis_link_sfds(&sfds, &streams))
goto err8;
err = "failed to link streams";
if (json_link_streams(c, &streams, &sfds, &medias, root_reader))
if (json_link_streams(c, &streams, &sfds, &medias, root))
goto err8;
err = "failed to link tags";
if (json_link_tags(c, &tags, &medias, root_reader))
if (json_link_tags(c, &tags, &medias, root))
goto err8;
err = "failed to link medias";
if (json_link_medias(c, &medias, &streams, &maps, root_reader))
if (json_link_medias(c, &medias, &streams, &maps, root))
goto err8;
err = "failed to link maps";
if (json_link_maps(c, &maps, &sfds, root_reader))
if (json_link_maps(c, &maps, &sfds, root))
goto err8;
// presence of this key determines whether we were recording at all
@ -2168,8 +2143,6 @@ err3:
err2:
rwlock_unlock_w(&c->master_lock);
err1:
if (root_reader)
g_object_unref (root_reader);
if (parser)
g_object_unref (parser);
if (rr_jsonStr)


+ 2
- 0
include/control_ng.h View File

@ -112,6 +112,8 @@ typedef union {
str *strs;
sdp_ng_flags *flags;
void (**call_fn)(call_t *);
GHashTable *ht;
struct call_monologue *ml;
void *generic;
} helper_arg __attribute__ ((__transparent_union__));


Loading…
Cancel
Save