From 1eb0c5e13e74c589fb4069ae7aff4908fec2b800 Mon Sep 17 00:00:00 2001 From: Donat Zenichev Date: Sat, 23 Nov 2024 11:00:43 +0100 Subject: [PATCH] MT#61556 redis: re-resolve when re-connecting When re-connecting to the remote redis server try to re-resolve if the redist hostname was an FQDN and not IP address. Change-Id: Ie80e1d1a1ea76811c54123201ad4fe8cb64fc748 --- daemon/main.c | 58 +++++++++++++++++++++++++++++++++++++--------- daemon/redis.c | 37 ++++++++++++++++++++++------- etc/rtpengine.conf | 1 + include/main.h | 3 +++ include/redis.h | 5 +++- lib/socket.h | 1 - 6 files changed, 84 insertions(+), 21 deletions(-) diff --git a/daemon/main.c b/daemon/main.c index 54f17f58c..3dd7c0ea9 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -364,9 +364,10 @@ static int if_addr_parse(intf_config_q *q, char *s, struct ifaddrs *ifas) { -static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth_env, char *s) { - char *sl; +static int redis_ep_parse(endpoint_t *ep, int *db, char **hostname, char **auth, const char *auth_env, char *s) { + char *sl, *sp; long l; + char buf[255]; // max length due to RFC standards sl = strrchr(s, '@'); if (sl) { @@ -390,6 +391,14 @@ static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth if (l < 0) return -1; *db = l; + + /* copy for the case with re-resolve during re-connections */ + sp = strrchr(s, ':'); /* make sure to not take port into the value of hostname */ + if (sp) + *hostname = g_strdup_printf("%.*s", (int)(sp - s), s); + else + *hostname = g_strdup(s); + if (endpoint_parse_any_getaddrinfo_full(ep, s)) return -1; return 0; @@ -561,6 +570,7 @@ static void options(int *argc, char ***argv) { { "port-max", 'M', 0, G_OPTION_ARG_INT, &rtpe_config.port_max, "Highest port to use for RTP", "INT" }, { "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "[PW@]IP:PORT/INT" }, { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, + { "redis-resolve-on-reconnect", 0,0, G_OPTION_ARG_NONE, &rtpe_config.redis_resolve_on_reconnect, "Re-resolve given FQDN on each re-connect to the redis server.", NULL }, { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_num_threads, "Number of Redis restore threads", "INT" }, { "redis-expires", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_expires_secs, "Expire time in seconds for redis keys", "INT" }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &rtpe_config.no_redis_required, "Start no matter of redis connection state", NULL }, @@ -866,14 +876,21 @@ static void options(int *argc, char ***argv) { if (rtpe_config.rtcp_interval <= 0) rtpe_config.rtcp_interval = 5000; - if (redisps) - if (redis_ep_parse(&rtpe_config.redis_ep, &rtpe_config.redis_db, &rtpe_config.redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps)) + if (redisps) { + if (redis_ep_parse(&rtpe_config.redis_ep, &rtpe_config.redis_db, &rtpe_config.redis_hostname, + &rtpe_config.redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps)) + { die("Invalid Redis endpoint [IP:PORT/INT] '%s' (--redis)", redisps); + } + } - if (redisps_write) - if (redis_ep_parse(&rtpe_config.redis_write_ep, &rtpe_config.redis_write_db, &rtpe_config.redis_write_auth, - "RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write)) + if (redisps_write) { + if (redis_ep_parse(&rtpe_config.redis_write_ep, &rtpe_config.redis_write_db, &rtpe_config.redis_write_hostname, + &rtpe_config.redis_write_auth, "RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write)) + { die("Invalid Redis endpoint [IP:PORT/INT] '%s' (--redis-write)", redisps_write); + } + } if (rtpe_config.fmt > 2) die("Invalid XMLRPC format"); @@ -1358,22 +1375,41 @@ static void create_everything(void) { if (!is_addr_unspecified(&rtpe_config.redis_write_ep.address)) { rtpe_redis_write = redis_new(&rtpe_config.redis_write_ep, - rtpe_config.redis_write_db, rtpe_config.redis_write_auth, - ANY_REDIS_ROLE, rtpe_config.no_redis_required); + rtpe_config.redis_write_db, + rtpe_config.redis_write_hostname, + rtpe_config.redis_write_auth, + ANY_REDIS_ROLE, + rtpe_config.no_redis_required, + rtpe_config.redis_resolve_on_reconnect); + if (!rtpe_redis_write) die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.", endpoint_print_buf(&rtpe_config.redis_write_ep)); } if (!is_addr_unspecified(&rtpe_config.redis_ep.address)) { - rtpe_redis = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required); + rtpe_redis = redis_new(&rtpe_config.redis_ep, + rtpe_config.redis_db, + rtpe_config.redis_hostname, + rtpe_config.redis_auth, + (rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE), + rtpe_config.no_redis_required, + rtpe_config.redis_resolve_on_reconnect); + if (!rtpe_redis) die("Cannot start up without running Redis %s database! " "See also NO_REDIS_REQUIRED parameter.", endpoint_print_buf(&rtpe_config.redis_ep)); if (rtpe_config.redis_subscribed_keyspaces.length) { - rtpe_redis_notify = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required); + rtpe_redis_notify = redis_new(&rtpe_config.redis_ep, + rtpe_config.redis_db, + rtpe_config.redis_hostname, + rtpe_config.redis_auth, + (rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE), + rtpe_config.no_redis_required, + rtpe_config.redis_resolve_on_reconnect); + if (!rtpe_redis_notify) die("Cannot start up without running notification Redis %s database! " "See also NO_REDIS_REQUIRED parameter.", diff --git a/daemon/redis.c b/daemon/redis.c index 99f0706f2..b626a502a 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -112,7 +112,7 @@ static int redis_ports_release_balance = 0; // negative = releasers, positive = static int redis_check_conn(struct redis *r); static void json_restore_call(struct redis *r, const str *id, bool foreign); -static int redis_connect(struct redis *r, int wait); +static int redis_connect(struct redis *r, int wait, bool resolve); static int json_build_ssrc(struct call_monologue *ml, parser_arg arg); @@ -244,7 +244,8 @@ int redis_set_timeout(struct redis* r, int timeout) { int redis_reconnect(struct redis* r) { int rval; LOCK(&r->lock); - rval = redis_connect(r,1); + + rval = redis_connect(r, 1, r->update_resolve); if (rval) r->state = REDIS_STATE_DISCONNECTED; return rval; @@ -261,11 +262,12 @@ static int redis_select_db(struct redis *r, int db) { } /* called with r->lock held if necessary */ -static int redis_connect(struct redis *r, int wait) { +static int redis_connect(struct redis *r, int wait, bool resolve) { struct timeval tv; redisReply *rp; char *s; int cmd_timeout, connect_timeout; + sockaddr_t a; if (r->ctx) redisFree(r->ctx); @@ -277,6 +279,17 @@ static int redis_connect(struct redis *r, int wait) { tv.tv_sec = (int) connect_timeout / 1000; tv.tv_usec = (int) (connect_timeout % 1000) * 1000; + + /* re-resolve if asked */ + if (resolve && r->hostname) { + if (sockaddr_getaddrinfo(&a, r->hostname)) + ilog(LOG_WARN, "Failed to re-resolve remote server hostname: '%s'. Just use older one: '%s'.", + r->hostname, r->host); + else + sockaddr_print(&a, r->host, sizeof(r->host)); + r->endpoint.address = a; + } + r->ctx = redisConnectWithTimeout(r->host, r->endpoint.port, tv); if (!r->ctx) @@ -868,8 +881,8 @@ void redis_notify_loop(void *d) { r->async_ctx = NULL; } -struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, - enum redis_role role, int no_redis_required) { +struct redis *redis_new(const endpoint_t *ep, int db, const char *hostname, const char *auth, + enum redis_role role, int no_redis_required, bool update_resolve) { struct redis *r; r = g_slice_alloc0(sizeof(*r)); @@ -877,14 +890,16 @@ struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, sockaddr_print(&ep->address, r->host, sizeof(r->host)); r->db = db; r->auth = auth; + r->hostname = hostname; r->role = role; r->state = REDIS_STATE_DISCONNECTED; r->no_redis_required = no_redis_required; r->restore_tick = 0; r->consecutive_errors = 0; + r->update_resolve = update_resolve; mutex_init(&r->lock); - if (redis_connect(r, 10)) { + if (redis_connect(r, 10, false)) { if (r->no_redis_required) { rlog(LOG_WARN, "Starting with no initial connection to Redis %s !", endpoint_print_buf(&r->endpoint)); @@ -906,7 +921,13 @@ err: } struct redis *redis_dup(const struct redis *r, int db) { - return redis_new(&r->endpoint, db >= 0 ? db : r->db, r->auth, r->role, r->no_redis_required); + return redis_new(&r->endpoint, + (db >= 0 ? db : r->db), + r->hostname, + r->auth, + r->role, + r->no_redis_required, + r->update_resolve); } void redis_close(struct redis *r) { @@ -972,7 +993,7 @@ static int redis_check_conn(struct redis *r) { } // try redis reconnect => will free current r->ctx - if (redis_connect(r, 1)) { + if (redis_connect(r, 1, r->update_resolve)) { // redis is disconnected redis_count_err_and_disable(r); return REDIS_STATE_DISCONNECTED; diff --git a/etc/rtpengine.conf b/etc/rtpengine.conf index d405b0b90..ec6fc20b9 100644 --- a/etc/rtpengine.conf +++ b/etc/rtpengine.conf @@ -72,6 +72,7 @@ recording-method = proc # redis-disable-time = 10 # redis-cmd-timeout = 0 # redis-connect-timeout = 1000 +# redis-resolve-on-reconnect = false # b2b-url = http://127.0.0.1:8090/ # xmlrpc-format = 0 diff --git a/include/main.h b/include/main.h index ae3cdc87d..eb7f4ee68 100644 --- a/include/main.h +++ b/include/main.h @@ -111,12 +111,15 @@ enum endpoint_learning { X(jb_clock_drift) \ X(player_cache) \ X(poller_per_thread) \ + X(redis_resolve_on_reconnect) \ X(measure_rtp) #define RTPE_CONFIG_CHARP_PARAMS \ X(b2b_url) \ X(redis_auth) \ X(redis_write_auth) \ + X(redis_hostname) \ + X(redis_write_hostname) \ X(spooldir) \ X(rec_method) \ X(rec_format) \ diff --git a/include/redis.h b/include/redis.h index 99373ea57..1e67f0c63 100644 --- a/include/redis.h +++ b/include/redis.h @@ -43,6 +43,7 @@ enum subscribe_action { struct redis { endpoint_t endpoint; char host[64]; + const char *hostname; /* can be a hostname or IP address */ enum redis_role role; redisContext *ctx; @@ -62,6 +63,8 @@ struct redis { mutex_t async_lock; GQueue async_queue; int async_last; + + bool update_resolve; }; struct redis_hash { @@ -88,7 +91,7 @@ void redis_notify_loop(void *d); void redis_delete_async_loop(void *d); -struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int); +struct redis *redis_new(const endpoint_t *, int, const char *, const char *, enum redis_role, int, bool); struct redis *redis_dup(const struct redis *r, int db); void redis_close(struct redis *r); int redis_restore(struct redis *, bool foreign, int db); diff --git a/lib/socket.h b/lib/socket.h index fbb4a4025..f8632762c 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -13,7 +13,6 @@ - enum socket_families { SF_IP4 = 0, SF_IP6,