From 9196bb5f5d7f2a125c29eddf63cd1029eebbb25d Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 25 Nov 2025 08:21:39 -0400 Subject: [PATCH] MT#55283 track poller stats Count number of wakeups and number of items per wakeup Change-Id: I75eb53804f9690722a3908339bf9f606c8a8f4a9 --- daemon/main.c | 21 +++++++++++++++++---- lib/poller.c | 8 +++++++- lib/poller.h | 13 ++++++++++++- lib/uring.c | 4 +++- lib/uring.h | 2 +- perf-tester/main.c | 3 ++- 6 files changed, 42 insertions(+), 9 deletions(-) diff --git a/daemon/main.c b/daemon/main.c index ff5a9fef8..6e80875b0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -69,6 +69,7 @@ struct poller **rtpe_pollers; struct poller *rtpe_control_poller; static unsigned int num_rtpe_pollers; static unsigned int num_poller_threads; +struct poller_thread *rtpe_poller_threads; unsigned int num_media_pollers; unsigned int rtpe_poller_rr_iter; @@ -1682,6 +1683,8 @@ static void create_everything(void) { } rtpe_control_poller = rtpe_pollers[num_rtpe_pollers - 1]; + rtpe_poller_threads = g_new0(struct poller_thread, num_poller_threads); + if (call_init()) abort(); @@ -1840,7 +1843,10 @@ static void uring_thread_waker(struct thread_waker *wk) { struct poller *p = wk->arg; uring_poller_wake(p); } -static void uring_poller_loop(struct poller *p) { +static void uring_poller_loop(struct poller_thread *pt) { + struct poller *p = pt->poller; + pt->pid = gettid(); + uring_poller_add_waker(p); struct thread_waker wk = {.func = uring_thread_waker, .arg = p}; @@ -1848,9 +1854,12 @@ static void uring_poller_loop(struct poller *p) { while (!rtpe_shutdown) { rtpe_now = now_us(); - uring_poller_poll(p); + unsigned int events = uring_poller_poll(p); append_thread_lpr_to_glob_lpr(); log_info_reset(); + + atomic64_inc_na(&pt->wakeups); + atomic64_add_na(&pt->items, events); } thread_waker_del(&wk); uring_poller_clear(p); @@ -1927,15 +1936,18 @@ int main(int argc, char **argv) { websocket_start(); - for (unsigned int idx = 0; idx < num_poller_threads; ++idx) + for (unsigned int idx = 0; idx < num_poller_threads; ++idx) { + rtpe_poller_threads[idx].poller = rtpe_pollers[idx % num_rtpe_pollers]; + thread_create_detach_prio( #ifdef HAVE_LIBURING rtpe_config.common.io_uring ? uring_poller_loop : #endif poller_loop, - rtpe_pollers[idx % num_rtpe_pollers], + &rtpe_poller_threads[idx], rtpe_config.scheduling, rtpe_config.priority, idx < rtpe_config.num_threads ? "poller" : "cpoller"); + } media_player_launch(); send_timer_launch(); @@ -2011,6 +2023,7 @@ int main(int argc, char **argv) { #endif poller_free(&rtpe_pollers[idx]); g_free(rtpe_pollers); + g_free(rtpe_poller_threads); release_closed_sockets(); interfaces_free(); #ifndef WITHOUT_NFTABLES diff --git a/lib/poller.c b/lib/poller.c index ac87efd67..e9339fa15 100644 --- a/lib/poller.c +++ b/lib/poller.c @@ -325,7 +325,9 @@ out: return ret; } -void poller_loop(struct poller *p) { +void poller_loop(struct poller_thread *pt) { + struct poller *p = pt->poller; + pt->pid = gettid(); int poller_size = rtpe_common_config_ptr->poller_size; struct epoll_event *evs; @@ -337,6 +339,10 @@ void poller_loop(struct poller *p) { int ret = poller_poll(p, thread_sleep_time, evs, poller_size); if (ret < 0) usleep(20 * 1000); + else { + atomic64_inc_na(&pt->wakeups); + atomic64_add_na(&pt->items, ret); + } uring_methods.thread_loop(); } diff --git a/lib/poller.h b/lib/poller.h index 19b2cc09f..2967f57c1 100644 --- a/lib/poller.h +++ b/lib/poller.h @@ -9,6 +9,7 @@ #include #include #include "compat.h" +#include "auxlib.h" #define MAX_RTP_PACKET_SIZE 8192 @@ -35,6 +36,16 @@ struct poller_item { struct poller; +struct poller_thread { + pid_t pid; + + struct poller *poller; + + // stats + atomic64 wakeups; + atomic64 items; // per wakeup +}; + struct poller *poller_new(void); void poller_free(struct poller **); bool poller_add_item(struct poller *, struct poller_item *); @@ -45,7 +56,7 @@ void poller_blocked(struct poller *, void *); bool poller_isblocked(struct poller *, void *); void poller_error(struct poller *, void *); -void poller_loop(struct poller *); +void poller_loop(struct poller_thread *); extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *); extern bool (*rtpe_poller_del_item)(struct poller *, int); diff --git a/lib/uring.c b/lib/uring.c index fa5074742..965576c85 100644 --- a/lib/uring.c +++ b/lib/uring.c @@ -593,7 +593,7 @@ void uring_poller_add_waker(struct poller *p) { }); } -void uring_poller_poll(struct poller *p) { +unsigned int uring_poller_poll(struct poller *p) { uring_poller_do_reqs(p); unsigned int events = __uring_thread_loop(); @@ -604,6 +604,8 @@ void uring_poller_poll(struct poller *p) { io_uring_wait_cqe(&rtpe_uring, &cqe); // maybe not a cancellation point thread_cancel_disable(); } + + return events; } void uring_poller_clear(struct poller *p) { diff --git a/lib/uring.h b/lib/uring.h index 33661f337..55b979ae2 100644 --- a/lib/uring.h +++ b/lib/uring.h @@ -47,7 +47,7 @@ struct poller *uring_poller_new(void); void uring_poller_free(struct poller **pp); void uring_poller_add_waker(struct poller *p); void uring_poller_wake(struct poller *p); -void uring_poller_poll(struct poller *); +unsigned int uring_poller_poll(struct poller *); void uring_poller_clear(struct poller *); bool uring_poller_add_item(struct poller *p, struct poller_item *i); diff --git a/perf-tester/main.c b/perf-tester/main.c index db9e4f0c7..a6b0c8c6a 100644 --- a/perf-tester/main.c +++ b/perf-tester/main.c @@ -325,7 +325,8 @@ static void *worker(struct worker *p) { LOCK(&other_threads_lock); g_hash_table_insert(worker_threads, GINT_TO_POINTER(worker_self->pid), NULL); } - poller_loop(rtpe_poller); + struct poller_thread pt = { .poller = rtpe_poller }; + poller_loop(&pt); return NULL; }