diff --git a/daemon/call.c b/daemon/call.c index 352a23d99..be318de6b 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -99,6 +99,7 @@ static int call_timer_delete_monologues(struct call *c) { struct call_monologue *ml; int ret = 0; time_t min_deleted = 0; + bool update = false; /* we need a write lock here */ rwlock_unlock_r(&c->master_lock); @@ -119,12 +120,15 @@ static int call_timer_delete_monologues(struct call *c) { ret = 1; /* destroy call */ goto out; } + update = true; } out: c->ml_deleted = min_deleted; rwlock_unlock_w(&c->master_lock); + if (update) + redis_update_onekey(c, rtpe_redis_write); rwlock_lock_r(&c->master_lock); // coverity[missing_unlock : FALSE] @@ -736,6 +740,8 @@ next: interval /= 2; ilog(LOG_INFO, "Decreasing timer run interval to %llu seconds", interval / 1000000); } + + release_closed_sockets(); } #undef DS @@ -4195,6 +4201,7 @@ int call_delete_branch(const str *callid, const str *branch, int ret; const str *match_tag; GList *i; + bool update = false; if (delete_delay < 0) delete_delay = rtpe_config.delete_delay; @@ -4273,6 +4280,7 @@ do_delete: STR_FMT_M(&ml->tag), STR_FMT0_M(branch)); if (monologue_destroy(ml)) goto del_all; + update = true; } goto success_unlock; @@ -4309,7 +4317,10 @@ err: goto out; out: - if (c) + if (c) { + if (update) + redis_update_onekey(c, rtpe_redis_write); obj_put(c); + } return ret; } diff --git a/daemon/cli.c b/daemon/cli.c index 9d002aabe..373012ab1 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -1243,6 +1243,7 @@ static void cli_stream_readable(struct streambuf_stream *s) { void cli_handle(str *instr, struct cli_writer *cw) { ilogs(control, LOG_INFO, "Got CLI command: " STR_FORMAT_M, STR_FMT_M(instr)); cli_handler_do(cli_top_handlers, instr, cw); + release_closed_sockets(); } static void cli_free(void *p) { diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 6d3c02b2e..3673c8363 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -398,6 +398,7 @@ send_only: out: ng_buffer_release(ngbuf); + release_closed_sockets(); log_info_pop(); return funcret; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index d9adfe44d..3c9dbf0d5 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -76,6 +76,13 @@ struct packet_handler_ctx { // output: struct media_packet mp; // passed to handlers }; +struct late_port_release { + socket_t socket; + struct intf_spec *spec; +}; + + +static __thread GQueue ports_to_release = G_QUEUE_INIT; static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *); @@ -817,7 +824,7 @@ static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec, cons return 0; } -static void release_port(socket_t *r, struct intf_spec *spec) { +static void release_port_now(socket_t *r, struct intf_spec *spec) { unsigned int port = r->local.port; struct port_pool *pp = &spec->port_pool; @@ -840,10 +847,26 @@ static void release_port(socket_t *r, struct intf_spec *spec) { __C_DBG("port %u is NOT released", port); } } +static void release_port(socket_t *r, struct intf_spec *spec) { + if (!r->local.port || r->fd == -1) + return; + __C_DBG("adding port %u to late-release list", r->local.port); + struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr)); + move_socket(&lpr->socket, r); + lpr->spec = spec; + g_queue_push_tail(&ports_to_release, lpr); +} static void free_port(socket_t *r, struct intf_spec *spec) { release_port(r, spec); g_slice_free1(sizeof(*r), r); } +void release_closed_sockets(void) { + struct late_port_release *lpr; + while ((lpr = g_queue_pop_head(&ports_to_release))) { + release_port_now(&lpr->socket, lpr->spec); + g_slice_free1(sizeof(*lpr), lpr); + } +} diff --git a/daemon/redis.c b/daemon/redis.c index 28c1b1734..c3633115a 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -376,6 +376,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) if (IS_FOREIGN_CALL(c)) { c->redis_hosted_db = rtpe_redis_write->db; // don't delete from foreign DB call_destroy(c); + release_closed_sockets(); } else { rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: " STR_FORMAT "\n", STR_FMT(&callid)); @@ -411,6 +412,7 @@ err: obj_put(c); mutex_unlock(&r->lock); + release_closed_sockets(); log_info_reset(); } @@ -2026,6 +2028,7 @@ static void restore_thread(void *call_p, void *ctx_p) { mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); mutex_unlock(&ctx->r_m); + release_closed_sockets(); } int redis_restore(struct redis *r, bool foreign, int db) { diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index 80cc5da9a..7d04138ca 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -8,6 +8,7 @@ #include "aux.h" #include "log.h" #include "streambuf.h" +#include "media_socket.h" struct tcp_listener_callback { struct obj obj; @@ -138,6 +139,8 @@ static void streambuf_stream_readable(int fd, void *p, uintptr_t u) { if (ret == -2) goto close; + release_closed_sockets(); + return; close: diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 16a53903c..ff97a43bb 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -60,6 +60,8 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) { obj_put(udp_buf); udp_buf = NULL; } + + release_closed_sockets(); } obj_put(udp_buf); } diff --git a/daemon/websocket.c b/daemon/websocket.c index 56620bec7..89f6a417c 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -758,6 +758,8 @@ static int websocket_http(struct lws *wsi, enum lws_callback_reasons reason, voi break; } + release_closed_sockets(); + return 0; } @@ -827,6 +829,8 @@ static int websocket_protocol(struct lws *wsi, enum lws_callback_reasons reason, break; } + release_closed_sockets(); + return 0; } diff --git a/include/media_socket.h b/include/media_socket.h index dd27f0046..084151254 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -177,6 +177,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif); struct stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(struct stream_fd *); +void release_closed_sockets(void); void free_intf_list(struct intf_list *il); void free_release_intf_list(struct intf_list *il); diff --git a/lib/socket.c b/lib/socket.c index 38057b548..a740840c5 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -800,6 +800,17 @@ int close_socket(socket_t *r) { return 0; } +// moves the contents of the socket object: +// dst must be initialised +// src will be reset and cleared, as if it was closed +// does not actually close the socket +void move_socket(socket_t *dst, socket_t *src) { + *dst = *src; + src->fd = -1; + ZERO(src->local); + ZERO(src->remote); +} + diff --git a/lib/socket.h b/lib/socket.h index d46533945..d57b30b31 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -217,6 +217,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep); int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress int connect_socket_retry(socket_t *r); // retries connect() while in progress int close_socket(socket_t *r); +void move_socket(socket_t *dst, socket_t *src); void dummy_socket(socket_t *r, const sockaddr_t *); sockfamily_t *get_socket_family_rfc(const str *s);