From 58cbd2f21cb435ac759e135aff7a8239b10f7a08 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 19 Apr 2024 09:14:46 -0400 Subject: [PATCH] MT#55283 delegate closing sockets to poller To support asynchronous pollers which may hold references on underlying sockets, let the poller close the socket after it has released its references. This prevents cases of file descriptor re-use while an underlying socket is still open. Add reset_socket() to be used in place of close_socket() which does the same thing except the actual closing of the socket. Add poller_del_item_callback() for cases where more action than just closing the file descriptor is needed. Change-Id: Iefda1487ecb89263729120ecb964436dd79b2a0e --- daemon/control_ng.c | 2 +- daemon/media_socket.c | 24 ++++++++++++++++-------- daemon/tcp_listener.c | 4 ++-- lib/poller.c | 10 +++++++++- lib/poller.h | 1 + lib/socket.c | 18 ++++++++++++------ lib/socket.h | 1 + perf-tester/main.c | 2 +- 8 files changed, 43 insertions(+), 19 deletions(-) diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 49adc7774..7170ea96e 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -648,7 +648,7 @@ void control_ng_free(void *p) { rtpe_cngs_hash = NULL; } poller_del_item(rtpe_control_poller, c->udp_listener.fd); - close_socket(&c->udp_listener); + reset_socket(&c->udp_listener); streambuf_listener_shutdown(&c->tcp_listener); if (tcp_connections_hash) g_hash_table_destroy(tcp_connections_hash); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index d3bfecc8c..7629689be 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -934,14 +934,26 @@ static int add_socket(socket_t *r, unsigned int port, struct intf_spec *spec, co /** * Pushing ports into the `ports_to_release` queue. */ -static void release_port(socket_t *r, struct intf_spec *spec) { +static void release_port_push(void *p) { + struct late_port_release *lpr = p; + __C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port); + t_queue_push_tail(&ports_to_release, lpr); +} +static void release_port_poller(socket_t *r, struct intf_spec *spec, struct poller *poller) { if (!r->local.port || r->fd == -1) return; - __C_DBG("Adding the port '%u' to late-release list", r->local.port); struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr)); move_socket(&lpr->socket, r); lpr->spec = spec; - t_queue_push_tail(&ports_to_release, lpr); + if (!poller) + release_port_push(lpr); + else { + __C_DBG("Adding late-release callback for port '%u'", lpr->socket.local.port); + poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr); + } +} +static void release_port(socket_t *r, struct intf_spec *spec) { + release_port_poller(r, spec, NULL); } static void free_port(socket_t *r, struct intf_spec *spec) { release_port(r, spec); @@ -3306,11 +3318,7 @@ void stream_fd_release(stream_fd *sfd) { &sfd->socket.local); // releases reference } - if (sfd->poller) - poller_del_item(sfd->poller, sfd->socket.fd); - sfd->poller = NULL; - - release_port(&sfd->socket, sfd->local_intf->spec); + release_port_poller(&sfd->socket, sfd->local_intf->spec, sfd->poller); } diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index bc17535f6..dc1d8917e 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -101,7 +101,6 @@ fail: static void streambuf_stream_free(void *p) { struct streambuf_stream *s = p; - close_socket(&s->sock); streambuf_destroy(s->inbuf); streambuf_destroy(s->outbuf); obj_put(s->cb); @@ -123,6 +122,7 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) { bool ret = t_hash_table_remove(l->streams, s); mutex_unlock(&l->lock); poller_del_item(rtpe_control_poller, s->sock.fd); + reset_socket(&s->sock); if (ret) obj_put(s); } @@ -243,7 +243,7 @@ void streambuf_listener_shutdown(struct streambuf_listener *listener) { if (!listener) return; poller_del_item(rtpe_control_poller, listener->listener.fd); - close_socket(&listener->listener); + reset_socket(&listener->listener); t_hash_table_destroy_ptr(&listener->streams); } diff --git a/lib/poller.c b/lib/poller.c index fc4e03ab5..fb3843fec 100644 --- a/lib/poller.c +++ b/lib/poller.c @@ -132,7 +132,7 @@ bool poller_add_item(struct poller *p, struct poller_item *i) { } -bool poller_del_item(struct poller *p, int fd) { +bool poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg) { struct poller_item_int *it; if (!p || fd < 0) @@ -158,8 +158,16 @@ bool poller_del_item(struct poller *p, int fd) { obj_put(it); + if (callback) + callback(arg); + else + close(fd); + return true; } +bool poller_del_item(struct poller *p, int fd) { + return poller_del_item_callback(p, fd, NULL, NULL); +} static int poller_poll(struct poller *p, int timeout, struct epoll_event *evs, int poller_size) { diff --git a/lib/poller.h b/lib/poller.h index f79663123..ff415ffcf 100644 --- a/lib/poller.h +++ b/lib/poller.h @@ -32,6 +32,7 @@ struct poller *poller_new(void); void poller_free(struct poller **); bool poller_add_item(struct poller *, struct poller_item *); bool poller_del_item(struct poller *, int); +bool poller_del_item_callback(struct poller *, int, void (*)(void *), void *); void poller_blocked(struct poller *, void *); int poller_isblocked(struct poller *, void *); diff --git a/lib/socket.c b/lib/socket.c index 8c4b14326..dd74077f6 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -874,6 +874,16 @@ int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep) { return connect_socket_retry(r); } +int reset_socket(socket_t *r) { + if (!r) + return -1; + + r->fd = -1; + ZERO(r->local); + ZERO(r->remote); + + return 0; +} int close_socket(socket_t *r) { if (!r) { __C_DBG("close() syscall not called, no socket"); @@ -891,9 +901,7 @@ int close_socket(socket_t *r) { __C_DBG("close() syscall success, fd=%d", r->fd); - r->fd = -1; - ZERO(r->local); - ZERO(r->remote); + reset_socket(r); return 0; } @@ -904,9 +912,7 @@ int close_socket(socket_t *r) { // does not actually close the socket void move_socket(socket_t *dst, socket_t *src) { *dst = *src; - src->fd = -1; - ZERO(src->local); - ZERO(src->remote); + reset_socket(src); } diff --git a/lib/socket.h b/lib/socket.h index 454aab78c..184bca8be 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -247,6 +247,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep); int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress int connect_socket_retry(socket_t *r); // retries connect() while in progress int close_socket(socket_t *r); +int reset_socket(socket_t *r); void move_socket(socket_t *dst, socket_t *src); void dummy_socket(socket_t *r, const sockaddr_t *); diff --git a/perf-tester/main.c b/perf-tester/main.c index 350e49024..e189bc416 100644 --- a/perf-tester/main.c +++ b/perf-tester/main.c @@ -434,7 +434,6 @@ static void kill_threads(uint num) { static void stream_free(void *p) { struct stream *s = p; - close(s->timer_fd); close(s->output_fd); dump_close(s); if (s->encoder) @@ -587,6 +586,7 @@ static void del_stream(void) { return; poller_del_item(rtpe_poller, s->timer_fd); + s->timer_fd = -1; obj_put(s); }