diff --git a/daemon/call.c b/daemon/call.c index f56fa4885..f700cfea4 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -759,7 +759,8 @@ next: ilog(LOG_INFO, "Decreasing timer run interval to %llu seconds", interval / 1000000); } - release_closed_sockets(); + /* add thread scope (local) sockets to the global list, in order to release them later */ + append_thread_lpr_to_glob_lpr(); } #undef DS diff --git a/daemon/main.c b/daemon/main.c index 28f4f4cab..f16d069db 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1335,9 +1335,20 @@ int main(int argc, char **argv) { ilog(LOG_INFO, "Startup complete, version %s", RTPENGINE_VERSION); thread_create_detach(sighandler, NULL, "signal handler"); + + /* a single thread which is running a poller_timer_loop, + * it calls each second a loop and if it finds something it does some work. + */ thread_create_detach_prio(poller_timer_loop, rtpe_poller, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "poller timer"); - thread_create_detach_prio(load_thread, NULL, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "load monitor"); + + /* load monitoring thread */ + thread_create_detach_prio(load_thread, NULL, rtpe_config.idle_scheduling, + rtpe_config.idle_priority, "load monitor"); + + /* separate thread for releasing ports (sockets), which are scheduled for clearing */ + thread_create_detach_prio(sockets_releaser, NULL, rtpe_config.idle_scheduling, + rtpe_config.idle_priority, "release closed sockets"); if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async) thread_create_detach(redis_delete_async_loop, NULL, "redis async"); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index ca2fe07c6..a03c89118 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -86,8 +86,11 @@ struct interface_stats_interval { }; +/* thread scope (local) queue for sockets to be released, only appending here */ static __thread GQueue ports_to_release = G_QUEUE_INIT; - +/* global queue for sockets to be released, releasing by `sockets_releaser()` is done using that */ +static GQueue ports_to_release_glob = G_QUEUE_INIT; +mutex_t ports_to_release_glob_lock = MUTEX_STATIC_INIT; static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *); @@ -952,11 +955,49 @@ static void release_port_now(socket_t *r, struct intf_spec *spec) { ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port); } } +/** + * Sockets releaser. + */ void release_closed_sockets(void) { - struct late_port_release *lpr; - while ((lpr = g_queue_pop_head(&ports_to_release))) { - release_port_now(&lpr->socket, lpr->spec); - g_slice_free1(sizeof(*lpr), lpr); + struct late_port_release * lpr; + + /* for the separate releaser thread (one working with `sockets_releaser()`) + * it does no job. But only for those threads related to calls processing. + */ + if (ports_to_release.head) + append_thread_lpr_to_glob_lpr(); + + if (ports_to_release_glob.head) { + mutex_lock(&ports_to_release_glob_lock); + GQueue ports_left = ports_to_release_glob; + g_queue_init(&ports_to_release_glob); + mutex_unlock(&ports_to_release_glob_lock); + + while ((lpr = g_queue_pop_head(&ports_left))) { + release_port_now(&lpr->socket, lpr->spec); + g_slice_free1(sizeof(*lpr), lpr); + } + } +} +/** + * Appends thread scope (local) sockets to the global releasing list. + */ +void append_thread_lpr_to_glob_lpr(void) { + mutex_lock(&ports_to_release_glob_lock); + g_queue_move(&ports_to_release_glob, &ports_to_release); /* dst, src */ + mutex_unlock(&ports_to_release_glob_lock); +} + +/** + * Separate thread for releasing sockets scheduled for closing. + */ +void sockets_releaser(void * dummy) { + while (!rtpe_shutdown) { + release_closed_sockets(); + + thread_cancel_enable(); + usleep(1000000); /* sleep for 1 second in each iteration */ + thread_cancel_disable(); } } @@ -3052,7 +3093,6 @@ done: static void stream_fd_free(void *p) { struct stream_fd *f = p; - release_port(&f->socket, f->local_intf->spec); crypto_cleanup(&f->crypto); dtls_connection_cleanup(&f->dtls); diff --git a/include/media_socket.h b/include/media_socket.h index a322ab1b5..23ce1a4dd 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -299,6 +299,8 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_in struct stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(struct stream_fd *); void release_closed_sockets(void); +void sockets_releaser(void * dummy); +void append_thread_lpr_to_glob_lpr(void); void free_intf_list(struct intf_list *il); void free_release_intf_list(struct intf_list *il);