From 24550a1c4e8874c7f2871096a77ad8dc321df46f Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 8 Jun 2022 12:40:10 -0400 Subject: [PATCH] TT#156900 fix master/slave race condition with early closed ports When ports are closed early (while the call is still running), we must first update a slave rtpengine with this new information (that these ports are now closed) before actually releasing the ports ourselves. Not doing so leads to a race condition where the master instance re-uses a port that was just closed before the slave instance knows about the port being closed. We implement this using a thread-local list to keep track of ports that were released while processing a control message, and process this list to actually close the ports only after Redis has been updated. Additional calls to the function to close the ports are placed in strategic locations to make sure this is triggered in every code path. closes #1495 Change-Id: I803f4594f30ca315da0b84c6e76893f54ca3a7c9 (cherry picked from commit 17bda4b1e865b4b535690b0a397226fdc2f1915a) --- daemon/call.c | 13 ++++++++++++- daemon/cli.c | 1 + daemon/control_ng.c | 1 + daemon/media_socket.c | 25 ++++++++++++++++++++++++- daemon/redis.c | 3 +++ daemon/tcp_listener.c | 3 +++ daemon/udp_listener.c | 2 ++ daemon/websocket.c | 4 ++++ include/media_socket.h | 1 + lib/socket.c | 11 +++++++++++ lib/socket.h | 1 + 11 files changed, 63 insertions(+), 2 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 352a23d99..be318de6b 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -99,6 +99,7 @@ static int call_timer_delete_monologues(struct call *c) { struct call_monologue *ml; int ret = 0; time_t min_deleted = 0; + bool update = false; /* we need a write lock here */ rwlock_unlock_r(&c->master_lock); @@ -119,12 +120,15 @@ static int call_timer_delete_monologues(struct call *c) { ret = 1; /* destroy call */ goto out; } + update = true; } out: c->ml_deleted = min_deleted; rwlock_unlock_w(&c->master_lock); + if (update) + redis_update_onekey(c, rtpe_redis_write); rwlock_lock_r(&c->master_lock); // coverity[missing_unlock : FALSE] @@ -736,6 +740,8 @@ next: interval /= 2; ilog(LOG_INFO, "Decreasing timer run interval to %llu seconds", interval / 1000000); } + + release_closed_sockets(); } #undef DS @@ -4195,6 +4201,7 @@ int call_delete_branch(const str *callid, const str *branch, int ret; const str *match_tag; GList *i; + bool update = false; if (delete_delay < 0) delete_delay = rtpe_config.delete_delay; @@ -4273,6 +4280,7 @@ do_delete: STR_FMT_M(&ml->tag), STR_FMT0_M(branch)); if (monologue_destroy(ml)) goto del_all; + update = true; } goto success_unlock; @@ -4309,7 +4317,10 @@ err: goto out; out: - if (c) + if (c) { + if (update) + redis_update_onekey(c, rtpe_redis_write); obj_put(c); + } return ret; } diff --git a/daemon/cli.c b/daemon/cli.c index 9d002aabe..373012ab1 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -1243,6 +1243,7 @@ static void cli_stream_readable(struct streambuf_stream *s) { void cli_handle(str *instr, struct cli_writer *cw) { ilogs(control, LOG_INFO, "Got CLI command: " STR_FORMAT_M, STR_FMT_M(instr)); cli_handler_do(cli_top_handlers, instr, cw); + release_closed_sockets(); } static void cli_free(void *p) { diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 6d3c02b2e..3673c8363 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -398,6 +398,7 @@ send_only: out: ng_buffer_release(ngbuf); + release_closed_sockets(); log_info_pop(); return funcret; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index d9adfe44d..3c9dbf0d5 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -76,6 +76,13 @@ struct packet_handler_ctx { // output: struct media_packet mp; // passed to handlers }; +struct late_port_release { + socket_t socket; + struct intf_spec *spec; +}; + + +static __thread GQueue ports_to_release = G_QUEUE_INIT; static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *); @@ -817,7 +824,7 @@ static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec, cons return 0; } -static void release_port(socket_t *r, struct intf_spec *spec) { +static void release_port_now(socket_t *r, struct intf_spec *spec) { unsigned int port = r->local.port; struct port_pool *pp = &spec->port_pool; @@ -840,10 +847,26 @@ static void release_port(socket_t *r, struct intf_spec *spec) { __C_DBG("port %u is NOT released", port); } } +static void release_port(socket_t *r, struct intf_spec *spec) { + if (!r->local.port || r->fd == -1) + return; + __C_DBG("adding port %u to late-release list", r->local.port); + struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr)); + move_socket(&lpr->socket, r); + lpr->spec = spec; + g_queue_push_tail(&ports_to_release, lpr); +} static void free_port(socket_t *r, struct intf_spec *spec) { release_port(r, spec); g_slice_free1(sizeof(*r), r); } +void release_closed_sockets(void) { + struct late_port_release *lpr; + while ((lpr = g_queue_pop_head(&ports_to_release))) { + release_port_now(&lpr->socket, lpr->spec); + g_slice_free1(sizeof(*lpr), lpr); + } +} diff --git a/daemon/redis.c b/daemon/redis.c index 28c1b1734..c3633115a 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -376,6 +376,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) if (IS_FOREIGN_CALL(c)) { c->redis_hosted_db = rtpe_redis_write->db; // don't delete from foreign DB call_destroy(c); + release_closed_sockets(); } else { rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: " STR_FORMAT "\n", STR_FMT(&callid)); @@ -411,6 +412,7 @@ err: obj_put(c); mutex_unlock(&r->lock); + release_closed_sockets(); log_info_reset(); } @@ -2026,6 +2028,7 @@ static void restore_thread(void *call_p, void *ctx_p) { mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); mutex_unlock(&ctx->r_m); + release_closed_sockets(); } int redis_restore(struct redis *r, bool foreign, int db) { diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index 80cc5da9a..7d04138ca 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -8,6 +8,7 @@ #include "aux.h" #include "log.h" #include "streambuf.h" +#include "media_socket.h" struct tcp_listener_callback { struct obj obj; @@ -138,6 +139,8 @@ static void streambuf_stream_readable(int fd, void *p, uintptr_t u) { if (ret == -2) goto close; + release_closed_sockets(); + return; close: diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 16a53903c..ff97a43bb 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -60,6 +60,8 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) { obj_put(udp_buf); udp_buf = NULL; } + + release_closed_sockets(); } obj_put(udp_buf); } diff --git a/daemon/websocket.c b/daemon/websocket.c index 56620bec7..89f6a417c 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -758,6 +758,8 @@ static int websocket_http(struct lws *wsi, enum lws_callback_reasons reason, voi break; } + release_closed_sockets(); + return 0; } @@ -827,6 +829,8 @@ static int websocket_protocol(struct lws *wsi, enum lws_callback_reasons reason, break; } + release_closed_sockets(); + return 0; } diff --git a/include/media_socket.h b/include/media_socket.h index dd27f0046..084151254 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -177,6 +177,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif); struct stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(struct stream_fd *); +void release_closed_sockets(void); void free_intf_list(struct intf_list *il); void free_release_intf_list(struct intf_list *il); diff --git a/lib/socket.c b/lib/socket.c index 38057b548..a740840c5 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -800,6 +800,17 @@ int close_socket(socket_t *r) { return 0; } +// moves the contents of the socket object: +// dst must be initialised +// src will be reset and cleared, as if it was closed +// does not actually close the socket +void move_socket(socket_t *dst, socket_t *src) { + *dst = *src; + src->fd = -1; + ZERO(src->local); + ZERO(src->remote); +} + diff --git a/lib/socket.h b/lib/socket.h index d46533945..d57b30b31 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -217,6 +217,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep); int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress int connect_socket_retry(socket_t *r); // retries connect() while in progress int close_socket(socket_t *r); +void move_socket(socket_t *dst, socket_t *src); void dummy_socket(socket_t *r, const sockaddr_t *); sockfamily_t *get_socket_family_rfc(const str *s);