From e553660e23fab81e0c999c25c811d4bfb5f084cd Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 19 Apr 2024 12:53:49 -0400 Subject: [PATCH] MT#55283 virtualise poller methods This allows us to use other poller implementations Change-Id: Iac7c194c86ecaac550aae4baa047cab205c507b9 --- daemon/control_ng.c | 2 +- daemon/main.c | 6 ++++++ daemon/media_socket.c | 4 ++-- daemon/tcp_listener.c | 8 ++++---- daemon/udp_listener.c | 2 +- include/main.h | 8 ++++++++ t/test-stats.c | 5 +++++ t/test-transcode.c | 5 +++++ 8 files changed, 32 insertions(+), 8 deletions(-) diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 7170ea96e..9c0649ad6 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -647,7 +647,7 @@ void control_ng_free(void *p) { g_hash_table_destroy(rtpe_cngs_hash); rtpe_cngs_hash = NULL; } - poller_del_item(rtpe_control_poller, c->udp_listener.fd); + rtpe_poller_del_item(rtpe_control_poller, c->udp_listener.fd); reset_socket(&c->udp_listener); streambuf_listener_shutdown(&c->tcp_listener); if (tcp_connections_hash) diff --git a/daemon/main.c b/daemon/main.c index 579f8cc9c..8391377e0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -67,6 +67,12 @@ static unsigned int num_poller_threads; unsigned int num_media_pollers; unsigned int rtpe_poller_rr_iter; +bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; +bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; +bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; +void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; +void (*rtpe_poller_error)(struct poller *, void *) = poller_error; + struct rtpengine_config initial_rtpe_config; static GQueue rtpe_tcp = G_QUEUE_INIT; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index f0a38a514..5a4730286 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -949,7 +949,7 @@ static void release_port_poller(socket_t *r, struct intf_spec *spec, struct poll release_port_push(lpr); else { __C_DBG("Adding late-release callback for port '%u'", lpr->socket.local.port); - poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr); + rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr); } } static void release_port(socket_t *r, struct intf_spec *spec) { @@ -3283,7 +3283,7 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { if (sfd->socket.fd != -1) { struct poller *p = call->poller; - if (!poller_add_item(p, &pi)) + if (!rtpe_poller_add_item(p, &pi)) ilog(LOG_ERR, "Failed to add stream_fd to poller"); else sfd->poller = p; diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index 09d82e2ec..c58a34616 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -87,7 +87,7 @@ static int tcp_listener_init(socket_t *sock, const endpoint_t *ep, i.closed = tcp_listener_closed; i.readable = tcp_listener_incoming; i.obj = &cb->obj; - if (!poller_add_item(rtpe_control_poller, &i)) + if (!rtpe_poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(cb); @@ -121,7 +121,7 @@ static void streambuf_stream_closed(int fd, void *p) { mutex_lock(&l->lock); bool ret = t_hash_table_remove(l->streams, s); mutex_unlock(&l->lock); - poller_del_item(rtpe_control_poller, s->sock.fd); + rtpe_poller_del_item(rtpe_control_poller, s->sock.fd); reset_socket(&s->sock); if (ret) obj_put(s); @@ -186,7 +186,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a t_hash_table_insert(listener->streams, s, s); // hand over ref mutex_unlock(&listener->lock); - if (!poller_add_item(rtpe_control_poller, &i)) + if (!rtpe_poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(s); @@ -242,7 +242,7 @@ fail: void streambuf_listener_shutdown(struct streambuf_listener *listener) { if (!listener) return; - poller_del_item(rtpe_control_poller, listener->listener.fd); + rtpe_poller_del_item(rtpe_control_poller, listener->listener.fd); reset_socket(&listener->listener); t_hash_table_destroy_ptr(&listener->streams); } diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 39764f1e1..8cad47f69 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -95,7 +95,7 @@ int udp_listener_init(socket_t *sock, const endpoint_t *ep, i.closed = udp_listener_closed; i.readable = udp_listener_incoming; i.obj = &cb->obj; - if (!poller_add_item(rtpe_control_poller, &i)) + if (!rtpe_poller_add_item(rtpe_control_poller, &i)) goto fail; obj_put(cb); diff --git a/include/main.h b/include/main.h index 15010b827..dfcbedbf6 100644 --- a/include/main.h +++ b/include/main.h @@ -213,6 +213,14 @@ extern struct poller *rtpe_control_poller; // poller for control sockets (maybe extern unsigned int num_media_pollers; // for media sockets, >= 1 extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread +struct poller_item; +extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *); +extern bool (*rtpe_poller_del_item)(struct poller *, int); +extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *); +extern void (*rtpe_poller_blocked)(struct poller *, void *); +extern void (*rtpe_poller_error)(struct poller *, void *); + + INLINE struct poller *rtpe_get_poller(void) { // XXX optimise this for num_media_pollers == 1 ? return rtpe_pollers[g_atomic_int_add(&rtpe_poller_rr_iter, 1) % num_media_pollers]; diff --git a/t/test-stats.c b/t/test-stats.c index 842015a02..de566d442 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -19,6 +19,11 @@ struct poller **rtpe_pollers = (struct poller *[]) {NULL}; struct poller *rtpe_control_poller; unsigned int num_media_pollers = 1; unsigned int rtpe_poller_rr_iter; +bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; +bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; +bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; +void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; +void (*rtpe_poller_error)(struct poller *, void *) = poller_error; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT; diff --git a/t/test-transcode.c b/t/test-transcode.c index b7f724b72..50dfc157e 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -15,6 +15,11 @@ struct poller **rtpe_pollers; struct poller *rtpe_control_poller; unsigned int num_media_pollers; unsigned int rtpe_poller_rr_iter; +bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; +bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; +bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; +void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; +void (*rtpe_poller_error)(struct poller *, void *) = poller_error; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT;