Browse Source

MT#55283 track poller stats

Count number of wakeups and number of items per wakeup

Change-Id: I75eb53804f9690722a3908339bf9f606c8a8f4a9
pull/1993/merge
Richard Fuchs 2 weeks ago
parent
commit
9196bb5f5d
6 changed files with 42 additions and 9 deletions
  1. +17
    -4
      daemon/main.c
  2. +7
    -1
      lib/poller.c
  3. +12
    -1
      lib/poller.h
  4. +3
    -1
      lib/uring.c
  5. +1
    -1
      lib/uring.h
  6. +2
    -1
      perf-tester/main.c

+ 17
- 4
daemon/main.c View File

@ -69,6 +69,7 @@ struct poller **rtpe_pollers;
struct poller *rtpe_control_poller;
static unsigned int num_rtpe_pollers;
static unsigned int num_poller_threads;
struct poller_thread *rtpe_poller_threads;
unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter;
@ -1682,6 +1683,8 @@ static void create_everything(void) {
}
rtpe_control_poller = rtpe_pollers[num_rtpe_pollers - 1];
rtpe_poller_threads = g_new0(struct poller_thread, num_poller_threads);
if (call_init())
abort();
@ -1840,7 +1843,10 @@ static void uring_thread_waker(struct thread_waker *wk) {
struct poller *p = wk->arg;
uring_poller_wake(p);
}
static void uring_poller_loop(struct poller *p) {
static void uring_poller_loop(struct poller_thread *pt) {
struct poller *p = pt->poller;
pt->pid = gettid();
uring_poller_add_waker(p);
struct thread_waker wk = {.func = uring_thread_waker, .arg = p};
@ -1848,9 +1854,12 @@ static void uring_poller_loop(struct poller *p) {
while (!rtpe_shutdown) {
rtpe_now = now_us();
uring_poller_poll(p);
unsigned int events = uring_poller_poll(p);
append_thread_lpr_to_glob_lpr();
log_info_reset();
atomic64_inc_na(&pt->wakeups);
atomic64_add_na(&pt->items, events);
}
thread_waker_del(&wk);
uring_poller_clear(p);
@ -1927,15 +1936,18 @@ int main(int argc, char **argv) {
websocket_start();
for (unsigned int idx = 0; idx < num_poller_threads; ++idx)
for (unsigned int idx = 0; idx < num_poller_threads; ++idx) {
rtpe_poller_threads[idx].poller = rtpe_pollers[idx % num_rtpe_pollers];
thread_create_detach_prio(
#ifdef HAVE_LIBURING
rtpe_config.common.io_uring ? uring_poller_loop :
#endif
poller_loop,
rtpe_pollers[idx % num_rtpe_pollers],
&rtpe_poller_threads[idx],
rtpe_config.scheduling, rtpe_config.priority,
idx < rtpe_config.num_threads ? "poller" : "cpoller");
}
media_player_launch();
send_timer_launch();
@ -2011,6 +2023,7 @@ int main(int argc, char **argv) {
#endif
poller_free(&rtpe_pollers[idx]);
g_free(rtpe_pollers);
g_free(rtpe_poller_threads);
release_closed_sockets();
interfaces_free();
#ifndef WITHOUT_NFTABLES


+ 7
- 1
lib/poller.c View File

@ -325,7 +325,9 @@ out:
return ret;
}
void poller_loop(struct poller *p) {
void poller_loop(struct poller_thread *pt) {
struct poller *p = pt->poller;
pt->pid = gettid();
int poller_size = rtpe_common_config_ptr->poller_size;
struct epoll_event *evs;
@ -337,6 +339,10 @@ void poller_loop(struct poller *p) {
int ret = poller_poll(p, thread_sleep_time, evs, poller_size);
if (ret < 0)
usleep(20 * 1000);
else {
atomic64_inc_na(&pt->wakeups);
atomic64_add_na(&pt->items, ret);
}
uring_methods.thread_loop();
}


+ 12
- 1
lib/poller.h View File

@ -9,6 +9,7 @@
#include <glib.h>
#include <stdbool.h>
#include "compat.h"
#include "auxlib.h"
#define MAX_RTP_PACKET_SIZE 8192
@ -35,6 +36,16 @@ struct poller_item {
struct poller;
struct poller_thread {
pid_t pid;
struct poller *poller;
// stats
atomic64 wakeups;
atomic64 items; // per wakeup
};
struct poller *poller_new(void);
void poller_free(struct poller **);
bool poller_add_item(struct poller *, struct poller_item *);
@ -45,7 +56,7 @@ void poller_blocked(struct poller *, void *);
bool poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *);
void poller_loop(struct poller *);
void poller_loop(struct poller_thread *);
extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *);
extern bool (*rtpe_poller_del_item)(struct poller *, int);


+ 3
- 1
lib/uring.c View File

@ -593,7 +593,7 @@ void uring_poller_add_waker(struct poller *p) {
});
}
void uring_poller_poll(struct poller *p) {
unsigned int uring_poller_poll(struct poller *p) {
uring_poller_do_reqs(p);
unsigned int events = __uring_thread_loop();
@ -604,6 +604,8 @@ void uring_poller_poll(struct poller *p) {
io_uring_wait_cqe(&rtpe_uring, &cqe); // maybe not a cancellation point
thread_cancel_disable();
}
return events;
}
void uring_poller_clear(struct poller *p) {


+ 1
- 1
lib/uring.h View File

@ -47,7 +47,7 @@ struct poller *uring_poller_new(void);
void uring_poller_free(struct poller **pp);
void uring_poller_add_waker(struct poller *p);
void uring_poller_wake(struct poller *p);
void uring_poller_poll(struct poller *);
unsigned int uring_poller_poll(struct poller *);
void uring_poller_clear(struct poller *);
bool uring_poller_add_item(struct poller *p, struct poller_item *i);


+ 2
- 1
perf-tester/main.c View File

@ -325,7 +325,8 @@ static void *worker(struct worker *p) {
LOCK(&other_threads_lock);
g_hash_table_insert(worker_threads, GINT_TO_POINTER(worker_self->pid), NULL);
}
poller_loop(rtpe_poller);
struct poller_thread pt = { .poller = rtpe_poller };
poller_loop(&pt);
return NULL;
}


Loading…
Cancel
Save