From 90aa63a97c9c43907a6f93201735262fcb81e667 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 28 Feb 2024 16:31:14 -0500 Subject: [PATCH] MT#55283 fix/rework poller-per-thread feature The poller-per-thread feature was broken with a division by zero. Take the opportunity to rework it and eliminate the poller_map object. Use a simple array of pollers for media sockets, plus one global poller for control sockets. In the regular case only one poller is created and everything points to that poller. In the poller-per-thread case, one poller per thread is created, plus one poller (also with its own single thread) for control connections. All control sockets use the single control poller, while all media sockets get assigned one poller from the pool in a round-robin fashion. closes #1801 Change-Id: Iae91a3e10b7206455c6df33b1a472254c700ce21 --- daemon/control_ng.c | 2 +- daemon/main.c | 53 ++++++++++++++++++-------------- daemon/media_socket.c | 14 ++++----- daemon/tcp_listener.c | 14 ++++----- daemon/udp_listener.c | 2 +- include/main.h | 13 +++++--- include/tcp_listener.h | 2 -- lib/poller.c | 68 ------------------------------------------ lib/poller.h | 5 ---- perf-tester/main.c | 2 +- t/test-stats.c | 7 +++-- t/test-transcode.c | 6 ++-- 12 files changed, 63 insertions(+), 125 deletions(-) diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 00daf4699..a7fba21cd 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -556,7 +556,7 @@ void control_ng_free(void *p) { g_hash_table_destroy(rtpe_cngs_hash); rtpe_cngs_hash = NULL; } - poller_del_item(rtpe_poller, c->udp_listener.fd); + poller_del_item(rtpe_control_poller, c->udp_listener.fd); close_socket(&c->udp_listener); streambuf_listener_shutdown(&c->tcp_listener); if (tcp_connections_hash) diff --git a/daemon/main.c b/daemon/main.c index fd83c658f..59c98f048 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -60,8 +60,13 @@ -struct poller *rtpe_poller; -struct poller_map *rtpe_poller_map; +struct poller **rtpe_pollers; +struct poller *rtpe_control_poller; +static unsigned int num_rtpe_pollers; +static unsigned int num_poller_threads; +unsigned int num_media_pollers; +unsigned int rtpe_poller_rr_iter; + struct rtpengine_config initial_rtpe_config; static GQueue rtpe_tcp = G_QUEUE_INIT; @@ -1258,13 +1263,23 @@ static void create_everything(void) { kernel_setup(); - rtpe_poller = poller_new(); - if (!rtpe_poller) - die("poller creation failed"); - - rtpe_poller_map = poller_map_new(); - if (!rtpe_poller_map) - die("poller map creation failed"); + // either one global poller, or one per thread for media sockets plus one for control sockets + if (!rtpe_config.poller_per_thread) { + num_media_pollers = num_rtpe_pollers = 1; + num_poller_threads = rtpe_config.num_threads; + } + else { + num_media_pollers = rtpe_config.num_threads; + num_rtpe_pollers = num_media_pollers + 1; + num_poller_threads = num_rtpe_pollers; + } + rtpe_pollers = g_malloc(sizeof(*rtpe_pollers) * num_rtpe_pollers); + for (unsigned int i = 0; i < num_rtpe_pollers; i++) { + rtpe_pollers[i] = poller_new(); + if (!rtpe_pollers[i]) + die("poller creation failed"); + } + rtpe_control_poller = rtpe_pollers[num_rtpe_pollers - 1]; if (call_init()) abort(); @@ -1365,8 +1380,6 @@ static void do_redis_restore(void) { int main(int argc, char **argv) { - int idx; - early_init(); options(&argc, &argv); init_everything(); @@ -1426,15 +1439,10 @@ int main(int argc, char **argv) { service_notify("READY=1\n"); - for (idx = 0; idx < rtpe_config.num_threads; ++idx) { - if (!rtpe_config.poller_per_thread) - thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); - else - thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller"); - } - - if (rtpe_config.poller_per_thread) - thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); + for (unsigned int idx = 0; idx < num_poller_threads; ++idx) + thread_create_detach_prio(poller_loop, rtpe_pollers[idx % num_rtpe_pollers], + rtpe_config.scheduling, rtpe_config.priority, + idx < rtpe_config.num_threads ? "poller" : "cpoller"); media_player_launch(); send_timer_launch(); @@ -1500,8 +1508,9 @@ int main(int argc, char **argv) { release_listeners(&rtpe_tcp); release_listeners(&rtpe_control_ng); release_listeners(&rtpe_control_ng_tcp); - poller_free(&rtpe_poller); - poller_map_free(&rtpe_poller_map); + for (unsigned int idx = 0; idx < num_rtpe_pollers; ++idx) + poller_free(&rtpe_pollers[idx]); + g_free(rtpe_pollers); interfaces_free(); #ifndef WITHOUT_NFTABLES nftables_shutdown(rtpe_config.nftables_chain, rtpe_config.nftables_base_chain, diff --git a/daemon/media_socket.c b/daemon/media_socket.c index dd49d63ac..f17eb4779 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -3250,7 +3250,6 @@ static void stream_fd_free(void *p) { stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { stream_fd *sfd; struct poller_item pi; - struct poller *p = rtpe_poller; sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); sfd->unique_id = t_queue_get_length(&call->stream_fds); @@ -3269,14 +3268,11 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { pi.closed = stream_fd_closed; if (sfd->socket.fd != -1) { - if (rtpe_config.poller_per_thread) - p = poller_map_get(rtpe_poller_map); - if (p) { - if (poller_add_item(p, &pi)) - ilog(LOG_ERR, "Failed to add stream_fd to poller"); - else - sfd->poller = p; - } + struct poller *p = rtpe_get_poller(); + if (poller_add_item(p, &pi)) + ilog(LOG_ERR, "Failed to add stream_fd to poller"); + else + sfd->poller = p; RWLOCK_W(&local_media_socket_endpoints_lock); t_hash_table_replace(local_media_socket_endpoints, &sfd->socket.local, obj_get(sfd)); diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index d41a41f54..7f4eab8e7 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -66,7 +66,7 @@ static void __tlc_free(void *p) { obj_put_o(cb->p); } -int tcp_listener_init(socket_t *sock, const endpoint_t *ep, +static int tcp_listener_init(socket_t *sock, const endpoint_t *ep, tcp_listener_callback_t func, struct obj *obj) { struct poller_item i; @@ -87,7 +87,7 @@ int tcp_listener_init(socket_t *sock, const endpoint_t *ep, i.closed = tcp_listener_closed; i.readable = tcp_listener_incoming; i.obj = &cb->obj; - if (poller_add_item(rtpe_poller, &i)) + if (poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(cb); @@ -122,7 +122,7 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) { mutex_lock(&l->lock); bool ret = t_hash_table_remove(l->streams, s); mutex_unlock(&l->lock); - poller_del_item(rtpe_poller, s->sock.fd); + poller_del_item(rtpe_control_poller, s->sock.fd); if (ret) obj_put(s); } @@ -163,8 +163,8 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a s = obj_alloc0("streambuf_stream", sizeof(*s), streambuf_stream_free); s->sock = *newsock; - s->inbuf = streambuf_new(rtpe_poller, newsock->fd); - s->outbuf = streambuf_new(rtpe_poller, newsock->fd); + s->inbuf = streambuf_new(rtpe_control_poller, newsock->fd); + s->outbuf = streambuf_new(rtpe_control_poller, newsock->fd); s->listener = listener; s->cb = obj_get(cb); s->parent = obj_get_o(cb->parent); @@ -186,7 +186,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a t_hash_table_insert(listener->streams, s, s); // hand over ref mutex_unlock(&listener->lock); - if (poller_add_item(rtpe_poller, &i)) + if (poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(s); @@ -242,7 +242,7 @@ fail: void streambuf_listener_shutdown(struct streambuf_listener *listener) { if (!listener) return; - poller_del_item(rtpe_poller, listener->listener.fd); + poller_del_item(rtpe_control_poller, listener->listener.fd); close_socket(&listener->listener); t_hash_table_destroy_ptr(&listener->streams); } diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index de7d385a7..632566b86 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -95,7 +95,7 @@ int udp_listener_init(socket_t *sock, const endpoint_t *ep, i.closed = udp_listener_closed; i.readable = udp_listener_incoming; i.obj = &cb->obj; - if (poller_add_item(rtpe_poller, &i)) + if (poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(cb); diff --git a/include/main.h b/include/main.h index 703fcc3e6..bdc68868f 100644 --- a/include/main.h +++ b/include/main.h @@ -197,7 +197,6 @@ struct rtpengine_config { struct poller; -struct poller_map; /** * Main global poller instance. @@ -205,9 +204,15 @@ struct poller_map; * * TODO: convert to struct instead of pointer? */ -extern struct poller *rtpe_poller; -/* Used when the poller-per-thread option is set */ -extern struct poller_map *rtpe_poller_map; +extern struct poller **rtpe_pollers; // at least one poller, in an array +extern struct poller *rtpe_control_poller; // poller for control sockets (maybe rtpe_pollers[0]) +extern unsigned int num_media_pollers; // for media sockets, >= 1 +extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread + +INLINE struct poller *rtpe_get_poller(void) { + // XXX optimise this for num_media_pollers == 1 ? + return rtpe_pollers[g_atomic_int_add(&rtpe_poller_rr_iter, 1) % num_media_pollers]; +} extern struct rtpengine_config rtpe_config; extern struct rtpengine_config initial_rtpe_config; diff --git a/include/tcp_listener.h b/include/tcp_listener.h index 374695a3e..7d244a759 100644 --- a/include/tcp_listener.h +++ b/include/tcp_listener.h @@ -33,8 +33,6 @@ struct streambuf_stream { }; -int tcp_listener_init(socket_t *, const endpoint_t *, tcp_listener_callback_t, struct obj *); - int streambuf_listener_init(struct streambuf_listener *listener, const endpoint_t *ep, streambuf_callback_t newconn_func, streambuf_callback_t newdata_func, diff --git a/lib/poller.c b/lib/poller.c index 9f75bd7ca..05eb156c3 100644 --- a/lib/poller.c +++ b/lib/poller.c @@ -35,66 +35,6 @@ struct poller { GPtrArray *items; }; -struct poller_map { - mutex_t lock; - GHashTable *table; -}; - -struct poller_map *poller_map_new(void) { - struct poller_map *p; - - p = g_slice_alloc0(sizeof(*p)); - mutex_init(&p->lock); - p->table = g_hash_table_new(g_direct_hash, g_direct_equal); - - return p; -} - -static void poller_map_add(struct poller_map *map) { - pthread_t tid = -1; - struct poller *p; - if (!map) - return; - tid = pthread_self(); - - LOCK(&map->lock); - p = poller_new(); - if (p) - g_hash_table_insert(map->table, (gpointer)tid, p); -} - -struct poller *poller_map_get(struct poller_map *map) { - if (!map) - return NULL; - - struct poller *p = NULL; - pthread_t tid = pthread_self(); - LOCK(&map->lock); - p = g_hash_table_lookup(map->table, (gpointer)tid); - if (!p) { - gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL); - p = g_hash_table_lookup(map->table, arr[ssl_random() % g_hash_table_size(map->table)]); - g_free(arr); - } - return p; -} - -static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) { - struct poller *p = (struct poller *)v; - poller_free(&p); -} - -void poller_map_free(struct poller_map **map) { - struct poller_map *m = *map; - if (!m) - return; - g_hash_table_foreach(m->table, poller_map_free_poller, NULL); - g_hash_table_destroy(m->table); - mutex_destroy(&m->lock); - g_slice_free1(sizeof(*m), m); - *map = NULL; -} - static void poller_free_item(struct poller_item_int *ele) { if (ele) obj_put(ele); @@ -369,14 +309,6 @@ out: } void poller_loop(void *d) { - struct poller_map *map = d; - poller_map_add(map); - struct poller *p = poller_map_get(map); - - poller_loop2(p); -} - -void poller_loop2(void *d) { struct poller *p = d; int poller_size = rtpe_common_config_ptr->poller_size; struct epoll_event *evs; diff --git a/lib/poller.h b/lib/poller.h index dde9bf40d..98eec1c01 100644 --- a/lib/poller.h +++ b/lib/poller.h @@ -27,12 +27,8 @@ struct poller_item { }; struct poller; -struct poller_map; struct poller *poller_new(void); -struct poller_map *poller_map_new(void); -struct poller *poller_map_get(struct poller_map *); -void poller_map_free(struct poller_map **); void poller_free(struct poller **); int poller_add_item(struct poller *, struct poller_item *); int poller_del_item(struct poller *, int); @@ -42,7 +38,6 @@ int poller_isblocked(struct poller *, void *); void poller_error(struct poller *, void *); void poller_loop(void *); -void poller_loop2(void *); #endif diff --git a/perf-tester/main.c b/perf-tester/main.c index cba514459..7d9e5594a 100644 --- a/perf-tester/main.c +++ b/perf-tester/main.c @@ -289,7 +289,7 @@ static void *worker(void *p) { LOCK(&other_threads_lock); g_hash_table_insert(worker_threads, GINT_TO_POINTER(worker_self->pid), NULL); } - poller_loop2(rtpe_poller); + poller_loop(rtpe_poller); return NULL; } diff --git a/t/test-stats.c b/t/test-stats.c index 552290f20..52a91042f 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -15,8 +15,10 @@ struct rtpengine_config rtpe_config = { .dtls_rsa_key_size = 2048, }; struct rtpengine_config initial_rtpe_config; -struct poller *rtpe_poller; -struct poller_map *rtpe_poller_map; +struct poller **rtpe_pollers; +struct poller *rtpe_control_poller; +unsigned int num_media_pollers; +unsigned int rtpe_poller_rr_iter; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT; @@ -64,7 +66,6 @@ int main(void) { endpoint_parse_any(&rtpe_config.graphite_ep, "1.2.3.4:4567"); rtpe_ssl_init(); - rtpe_poller = poller_new(); call_init(); statistics_init(); call_interfaces_init(); diff --git a/t/test-transcode.c b/t/test-transcode.c index 3d6f576eb..b7f724b72 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -11,8 +11,10 @@ int _log_facility_cdr; int _log_facility_dtmf; struct rtpengine_config rtpe_config; struct rtpengine_config initial_rtpe_config; -struct poller *rtpe_poller; -struct poller_map *rtpe_poller_map; +struct poller **rtpe_pollers; +struct poller *rtpe_control_poller; +unsigned int num_media_pollers; +unsigned int rtpe_poller_rr_iter; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT;