diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 00daf4699..a7fba21cd 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -556,7 +556,7 @@ void control_ng_free(void *p) { g_hash_table_destroy(rtpe_cngs_hash); rtpe_cngs_hash = NULL; } - poller_del_item(rtpe_poller, c->udp_listener.fd); + poller_del_item(rtpe_control_poller, c->udp_listener.fd); close_socket(&c->udp_listener); streambuf_listener_shutdown(&c->tcp_listener); if (tcp_connections_hash) diff --git a/daemon/main.c b/daemon/main.c index fd83c658f..59c98f048 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -60,8 +60,13 @@ -struct poller *rtpe_poller; -struct poller_map *rtpe_poller_map; +struct poller **rtpe_pollers; +struct poller *rtpe_control_poller; +static unsigned int num_rtpe_pollers; +static unsigned int num_poller_threads; +unsigned int num_media_pollers; +unsigned int rtpe_poller_rr_iter; + struct rtpengine_config initial_rtpe_config; static GQueue rtpe_tcp = G_QUEUE_INIT; @@ -1258,13 +1263,23 @@ static void create_everything(void) { kernel_setup(); - rtpe_poller = poller_new(); - if (!rtpe_poller) - die("poller creation failed"); - - rtpe_poller_map = poller_map_new(); - if (!rtpe_poller_map) - die("poller map creation failed"); + // either one global poller, or one per thread for media sockets plus one for control sockets + if (!rtpe_config.poller_per_thread) { + num_media_pollers = num_rtpe_pollers = 1; + num_poller_threads = rtpe_config.num_threads; + } + else { + num_media_pollers = rtpe_config.num_threads; + num_rtpe_pollers = num_media_pollers + 1; + num_poller_threads = num_rtpe_pollers; + } + rtpe_pollers = g_malloc(sizeof(*rtpe_pollers) * num_rtpe_pollers); + for (unsigned int i = 0; i < num_rtpe_pollers; i++) { + rtpe_pollers[i] = poller_new(); + if (!rtpe_pollers[i]) + die("poller creation failed"); + } + rtpe_control_poller = rtpe_pollers[num_rtpe_pollers - 1]; if (call_init()) abort(); @@ -1365,8 +1380,6 @@ static void do_redis_restore(void) { int main(int argc, char **argv) { - int idx; - early_init(); options(&argc, &argv); init_everything(); @@ -1426,15 +1439,10 @@ int main(int argc, char **argv) { service_notify("READY=1\n"); - for (idx = 0; idx < rtpe_config.num_threads; ++idx) { - if (!rtpe_config.poller_per_thread) - thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); - else - thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller"); - } - - if (rtpe_config.poller_per_thread) - thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); + for (unsigned int idx = 0; idx < num_poller_threads; ++idx) + thread_create_detach_prio(poller_loop, rtpe_pollers[idx % num_rtpe_pollers], + rtpe_config.scheduling, rtpe_config.priority, + idx < rtpe_config.num_threads ? "poller" : "cpoller"); media_player_launch(); send_timer_launch(); @@ -1500,8 +1508,9 @@ int main(int argc, char **argv) { release_listeners(&rtpe_tcp); release_listeners(&rtpe_control_ng); release_listeners(&rtpe_control_ng_tcp); - poller_free(&rtpe_poller); - poller_map_free(&rtpe_poller_map); + for (unsigned int idx = 0; idx < num_rtpe_pollers; ++idx) + poller_free(&rtpe_pollers[idx]); + g_free(rtpe_pollers); interfaces_free(); #ifndef WITHOUT_NFTABLES nftables_shutdown(rtpe_config.nftables_chain, rtpe_config.nftables_base_chain, diff --git a/daemon/media_socket.c b/daemon/media_socket.c index dd49d63ac..f17eb4779 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -3250,7 +3250,6 @@ static void stream_fd_free(void *p) { stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { stream_fd *sfd; struct poller_item pi; - struct poller *p = rtpe_poller; sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); sfd->unique_id = t_queue_get_length(&call->stream_fds); @@ -3269,14 +3268,11 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { pi.closed = stream_fd_closed; if (sfd->socket.fd != -1) { - if (rtpe_config.poller_per_thread) - p = poller_map_get(rtpe_poller_map); - if (p) { - if (poller_add_item(p, &pi)) - ilog(LOG_ERR, "Failed to add stream_fd to poller"); - else - sfd->poller = p; - } + struct poller *p = rtpe_get_poller(); + if (poller_add_item(p, &pi)) + ilog(LOG_ERR, "Failed to add stream_fd to poller"); + else + sfd->poller = p; RWLOCK_W(&local_media_socket_endpoints_lock); t_hash_table_replace(local_media_socket_endpoints, &sfd->socket.local, obj_get(sfd)); diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index d41a41f54..7f4eab8e7 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -66,7 +66,7 @@ static void __tlc_free(void *p) { obj_put_o(cb->p); } -int tcp_listener_init(socket_t *sock, const endpoint_t *ep, +static int tcp_listener_init(socket_t *sock, const endpoint_t *ep, tcp_listener_callback_t func, struct obj *obj) { struct poller_item i; @@ -87,7 +87,7 @@ int tcp_listener_init(socket_t *sock, const endpoint_t *ep, i.closed = tcp_listener_closed; i.readable = tcp_listener_incoming; i.obj = &cb->obj; - if (poller_add_item(rtpe_poller, &i)) + if (poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(cb); @@ -122,7 +122,7 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) { mutex_lock(&l->lock); bool ret = t_hash_table_remove(l->streams, s); mutex_unlock(&l->lock); - poller_del_item(rtpe_poller, s->sock.fd); + poller_del_item(rtpe_control_poller, s->sock.fd); if (ret) obj_put(s); } @@ -163,8 +163,8 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a s = obj_alloc0("streambuf_stream", sizeof(*s), streambuf_stream_free); s->sock = *newsock; - s->inbuf = streambuf_new(rtpe_poller, newsock->fd); - s->outbuf = streambuf_new(rtpe_poller, newsock->fd); + s->inbuf = streambuf_new(rtpe_control_poller, newsock->fd); + s->outbuf = streambuf_new(rtpe_control_poller, newsock->fd); s->listener = listener; s->cb = obj_get(cb); s->parent = obj_get_o(cb->parent); @@ -186,7 +186,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a t_hash_table_insert(listener->streams, s, s); // hand over ref mutex_unlock(&listener->lock); - if (poller_add_item(rtpe_poller, &i)) + if (poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(s); @@ -242,7 +242,7 @@ fail: void streambuf_listener_shutdown(struct streambuf_listener *listener) { if (!listener) return; - poller_del_item(rtpe_poller, listener->listener.fd); + poller_del_item(rtpe_control_poller, listener->listener.fd); close_socket(&listener->listener); t_hash_table_destroy_ptr(&listener->streams); } diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index de7d385a7..632566b86 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -95,7 +95,7 @@ int udp_listener_init(socket_t *sock, const endpoint_t *ep, i.closed = udp_listener_closed; i.readable = udp_listener_incoming; i.obj = &cb->obj; - if (poller_add_item(rtpe_poller, &i)) + if (poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(cb); diff --git a/include/main.h b/include/main.h index 703fcc3e6..bdc68868f 100644 --- a/include/main.h +++ b/include/main.h @@ -197,7 +197,6 @@ struct rtpengine_config { struct poller; -struct poller_map; /** * Main global poller instance. @@ -205,9 +204,15 @@ struct poller_map; * * TODO: convert to struct instead of pointer? */ -extern struct poller *rtpe_poller; -/* Used when the poller-per-thread option is set */ -extern struct poller_map *rtpe_poller_map; +extern struct poller **rtpe_pollers; // at least one poller, in an array +extern struct poller *rtpe_control_poller; // poller for control sockets (maybe rtpe_pollers[0]) +extern unsigned int num_media_pollers; // for media sockets, >= 1 +extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread + +INLINE struct poller *rtpe_get_poller(void) { + // XXX optimise this for num_media_pollers == 1 ? + return rtpe_pollers[g_atomic_int_add(&rtpe_poller_rr_iter, 1) % num_media_pollers]; +} extern struct rtpengine_config rtpe_config; extern struct rtpengine_config initial_rtpe_config; diff --git a/include/tcp_listener.h b/include/tcp_listener.h index 374695a3e..7d244a759 100644 --- a/include/tcp_listener.h +++ b/include/tcp_listener.h @@ -33,8 +33,6 @@ struct streambuf_stream { }; -int tcp_listener_init(socket_t *, const endpoint_t *, tcp_listener_callback_t, struct obj *); - int streambuf_listener_init(struct streambuf_listener *listener, const endpoint_t *ep, streambuf_callback_t newconn_func, streambuf_callback_t newdata_func, diff --git a/lib/poller.c b/lib/poller.c index 9f75bd7ca..05eb156c3 100644 --- a/lib/poller.c +++ b/lib/poller.c @@ -35,66 +35,6 @@ struct poller { GPtrArray *items; }; -struct poller_map { - mutex_t lock; - GHashTable *table; -}; - -struct poller_map *poller_map_new(void) { - struct poller_map *p; - - p = g_slice_alloc0(sizeof(*p)); - mutex_init(&p->lock); - p->table = g_hash_table_new(g_direct_hash, g_direct_equal); - - return p; -} - -static void poller_map_add(struct poller_map *map) { - pthread_t tid = -1; - struct poller *p; - if (!map) - return; - tid = pthread_self(); - - LOCK(&map->lock); - p = poller_new(); - if (p) - g_hash_table_insert(map->table, (gpointer)tid, p); -} - -struct poller *poller_map_get(struct poller_map *map) { - if (!map) - return NULL; - - struct poller *p = NULL; - pthread_t tid = pthread_self(); - LOCK(&map->lock); - p = g_hash_table_lookup(map->table, (gpointer)tid); - if (!p) { - gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL); - p = g_hash_table_lookup(map->table, arr[ssl_random() % g_hash_table_size(map->table)]); - g_free(arr); - } - return p; -} - -static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) { - struct poller *p = (struct poller *)v; - poller_free(&p); -} - -void poller_map_free(struct poller_map **map) { - struct poller_map *m = *map; - if (!m) - return; - g_hash_table_foreach(m->table, poller_map_free_poller, NULL); - g_hash_table_destroy(m->table); - mutex_destroy(&m->lock); - g_slice_free1(sizeof(*m), m); - *map = NULL; -} - static void poller_free_item(struct poller_item_int *ele) { if (ele) obj_put(ele); @@ -369,14 +309,6 @@ out: } void poller_loop(void *d) { - struct poller_map *map = d; - poller_map_add(map); - struct poller *p = poller_map_get(map); - - poller_loop2(p); -} - -void poller_loop2(void *d) { struct poller *p = d; int poller_size = rtpe_common_config_ptr->poller_size; struct epoll_event *evs; diff --git a/lib/poller.h b/lib/poller.h index dde9bf40d..98eec1c01 100644 --- a/lib/poller.h +++ b/lib/poller.h @@ -27,12 +27,8 @@ struct poller_item { }; struct poller; -struct poller_map; struct poller *poller_new(void); -struct poller_map *poller_map_new(void); -struct poller *poller_map_get(struct poller_map *); -void poller_map_free(struct poller_map **); void poller_free(struct poller **); int poller_add_item(struct poller *, struct poller_item *); int poller_del_item(struct poller *, int); @@ -42,7 +38,6 @@ int poller_isblocked(struct poller *, void *); void poller_error(struct poller *, void *); void poller_loop(void *); -void poller_loop2(void *); #endif diff --git a/perf-tester/main.c b/perf-tester/main.c index cba514459..7d9e5594a 100644 --- a/perf-tester/main.c +++ b/perf-tester/main.c @@ -289,7 +289,7 @@ static void *worker(void *p) { LOCK(&other_threads_lock); g_hash_table_insert(worker_threads, GINT_TO_POINTER(worker_self->pid), NULL); } - poller_loop2(rtpe_poller); + poller_loop(rtpe_poller); return NULL; } diff --git a/t/test-stats.c b/t/test-stats.c index 552290f20..52a91042f 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -15,8 +15,10 @@ struct rtpengine_config rtpe_config = { .dtls_rsa_key_size = 2048, }; struct rtpengine_config initial_rtpe_config; -struct poller *rtpe_poller; -struct poller_map *rtpe_poller_map; +struct poller **rtpe_pollers; +struct poller *rtpe_control_poller; +unsigned int num_media_pollers; +unsigned int rtpe_poller_rr_iter; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT; @@ -64,7 +66,6 @@ int main(void) { endpoint_parse_any(&rtpe_config.graphite_ep, "1.2.3.4:4567"); rtpe_ssl_init(); - rtpe_poller = poller_new(); call_init(); statistics_init(); call_interfaces_init(); diff --git a/t/test-transcode.c b/t/test-transcode.c index 3d6f576eb..b7f724b72 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -11,8 +11,10 @@ int _log_facility_cdr; int _log_facility_dtmf; struct rtpengine_config rtpe_config; struct rtpengine_config initial_rtpe_config; -struct poller *rtpe_poller; -struct poller_map *rtpe_poller_map; +struct poller **rtpe_pollers; +struct poller *rtpe_control_poller; +unsigned int num_media_pollers; +unsigned int rtpe_poller_rr_iter; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT;