Browse Source

MT#55283 fix/rework poller-per-thread feature

The poller-per-thread feature was broken with a division by zero. Take
the opportunity to rework it and eliminate the poller_map object. Use a
simple array of pollers for media sockets, plus one global poller for
control sockets. In the regular case only one poller is created and
everything points to that poller. In the poller-per-thread case, one
poller per thread is created, plus one poller (also with its own single
thread) for control connections. All control sockets use the single
control poller, while all media sockets get assigned one poller from the
pool in a round-robin fashion.

closes #1801

Change-Id: Iae91a3e10b7206455c6df33b1a472254c700ce21
pull/1802/head
Richard Fuchs 2 years ago
parent
commit
90aa63a97c
12 changed files with 63 additions and 125 deletions
  1. +1
    -1
      daemon/control_ng.c
  2. +31
    -22
      daemon/main.c
  3. +5
    -9
      daemon/media_socket.c
  4. +7
    -7
      daemon/tcp_listener.c
  5. +1
    -1
      daemon/udp_listener.c
  6. +9
    -4
      include/main.h
  7. +0
    -2
      include/tcp_listener.h
  8. +0
    -68
      lib/poller.c
  9. +0
    -5
      lib/poller.h
  10. +1
    -1
      perf-tester/main.c
  11. +4
    -3
      t/test-stats.c
  12. +4
    -2
      t/test-transcode.c

+ 1
- 1
daemon/control_ng.c View File

@ -556,7 +556,7 @@ void control_ng_free(void *p) {
g_hash_table_destroy(rtpe_cngs_hash);
rtpe_cngs_hash = NULL;
}
poller_del_item(rtpe_poller, c->udp_listener.fd);
poller_del_item(rtpe_control_poller, c->udp_listener.fd);
close_socket(&c->udp_listener);
streambuf_listener_shutdown(&c->tcp_listener);
if (tcp_connections_hash)


+ 31
- 22
daemon/main.c View File

@ -60,8 +60,13 @@
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
struct poller **rtpe_pollers;
struct poller *rtpe_control_poller;
static unsigned int num_rtpe_pollers;
static unsigned int num_poller_threads;
unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter;
struct rtpengine_config initial_rtpe_config;
static GQueue rtpe_tcp = G_QUEUE_INIT;
@ -1258,13 +1263,23 @@ static void create_everything(void) {
kernel_setup();
rtpe_poller = poller_new();
if (!rtpe_poller)
die("poller creation failed");
rtpe_poller_map = poller_map_new();
if (!rtpe_poller_map)
die("poller map creation failed");
// either one global poller, or one per thread for media sockets plus one for control sockets
if (!rtpe_config.poller_per_thread) {
num_media_pollers = num_rtpe_pollers = 1;
num_poller_threads = rtpe_config.num_threads;
}
else {
num_media_pollers = rtpe_config.num_threads;
num_rtpe_pollers = num_media_pollers + 1;
num_poller_threads = num_rtpe_pollers;
}
rtpe_pollers = g_malloc(sizeof(*rtpe_pollers) * num_rtpe_pollers);
for (unsigned int i = 0; i < num_rtpe_pollers; i++) {
rtpe_pollers[i] = poller_new();
if (!rtpe_pollers[i])
die("poller creation failed");
}
rtpe_control_poller = rtpe_pollers[num_rtpe_pollers - 1];
if (call_init())
abort();
@ -1365,8 +1380,6 @@ static void do_redis_restore(void) {
int main(int argc, char **argv) {
int idx;
early_init();
options(&argc, &argv);
init_everything();
@ -1426,15 +1439,10 @@ int main(int argc, char **argv) {
service_notify("READY=1\n");
for (idx = 0; idx < rtpe_config.num_threads; ++idx) {
if (!rtpe_config.poller_per_thread)
thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller");
else
thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller");
}
if (rtpe_config.poller_per_thread)
thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller");
for (unsigned int idx = 0; idx < num_poller_threads; ++idx)
thread_create_detach_prio(poller_loop, rtpe_pollers[idx % num_rtpe_pollers],
rtpe_config.scheduling, rtpe_config.priority,
idx < rtpe_config.num_threads ? "poller" : "cpoller");
media_player_launch();
send_timer_launch();
@ -1500,8 +1508,9 @@ int main(int argc, char **argv) {
release_listeners(&rtpe_tcp);
release_listeners(&rtpe_control_ng);
release_listeners(&rtpe_control_ng_tcp);
poller_free(&rtpe_poller);
poller_map_free(&rtpe_poller_map);
for (unsigned int idx = 0; idx < num_rtpe_pollers; ++idx)
poller_free(&rtpe_pollers[idx]);
g_free(rtpe_pollers);
interfaces_free();
#ifndef WITHOUT_NFTABLES
nftables_shutdown(rtpe_config.nftables_chain, rtpe_config.nftables_base_chain,


+ 5
- 9
daemon/media_socket.c View File

@ -3250,7 +3250,6 @@ static void stream_fd_free(void *p) {
stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) {
stream_fd *sfd;
struct poller_item pi;
struct poller *p = rtpe_poller;
sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free);
sfd->unique_id = t_queue_get_length(&call->stream_fds);
@ -3269,14 +3268,11 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) {
pi.closed = stream_fd_closed;
if (sfd->socket.fd != -1) {
if (rtpe_config.poller_per_thread)
p = poller_map_get(rtpe_poller_map);
if (p) {
if (poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
else
sfd->poller = p;
}
struct poller *p = rtpe_get_poller();
if (poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
else
sfd->poller = p;
RWLOCK_W(&local_media_socket_endpoints_lock);
t_hash_table_replace(local_media_socket_endpoints, &sfd->socket.local, obj_get(sfd));


+ 7
- 7
daemon/tcp_listener.c View File

@ -66,7 +66,7 @@ static void __tlc_free(void *p) {
obj_put_o(cb->p);
}
int tcp_listener_init(socket_t *sock, const endpoint_t *ep,
static int tcp_listener_init(socket_t *sock, const endpoint_t *ep,
tcp_listener_callback_t func, struct obj *obj)
{
struct poller_item i;
@ -87,7 +87,7 @@ int tcp_listener_init(socket_t *sock, const endpoint_t *ep,
i.closed = tcp_listener_closed;
i.readable = tcp_listener_incoming;
i.obj = &cb->obj;
if (poller_add_item(rtpe_poller, &i))
if (poller_add_item(rtpe_control_poller, &i))
goto fail;
obj_put(cb);
@ -122,7 +122,7 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) {
mutex_lock(&l->lock);
bool ret = t_hash_table_remove(l->streams, s);
mutex_unlock(&l->lock);
poller_del_item(rtpe_poller, s->sock.fd);
poller_del_item(rtpe_control_poller, s->sock.fd);
if (ret)
obj_put(s);
}
@ -163,8 +163,8 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a
s = obj_alloc0("streambuf_stream", sizeof(*s), streambuf_stream_free);
s->sock = *newsock;
s->inbuf = streambuf_new(rtpe_poller, newsock->fd);
s->outbuf = streambuf_new(rtpe_poller, newsock->fd);
s->inbuf = streambuf_new(rtpe_control_poller, newsock->fd);
s->outbuf = streambuf_new(rtpe_control_poller, newsock->fd);
s->listener = listener;
s->cb = obj_get(cb);
s->parent = obj_get_o(cb->parent);
@ -186,7 +186,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a
t_hash_table_insert(listener->streams, s, s); // hand over ref
mutex_unlock(&listener->lock);
if (poller_add_item(rtpe_poller, &i))
if (poller_add_item(rtpe_control_poller, &i))
goto fail;
obj_put(s);
@ -242,7 +242,7 @@ fail:
void streambuf_listener_shutdown(struct streambuf_listener *listener) {
if (!listener)
return;
poller_del_item(rtpe_poller, listener->listener.fd);
poller_del_item(rtpe_control_poller, listener->listener.fd);
close_socket(&listener->listener);
t_hash_table_destroy_ptr(&listener->streams);
}


+ 1
- 1
daemon/udp_listener.c View File

@ -95,7 +95,7 @@ int udp_listener_init(socket_t *sock, const endpoint_t *ep,
i.closed = udp_listener_closed;
i.readable = udp_listener_incoming;
i.obj = &cb->obj;
if (poller_add_item(rtpe_poller, &i))
if (poller_add_item(rtpe_control_poller, &i))
goto fail;
obj_put(cb);


+ 9
- 4
include/main.h View File

@ -197,7 +197,6 @@ struct rtpengine_config {
struct poller;
struct poller_map;
/**
* Main global poller instance.
@ -205,9 +204,15 @@ struct poller_map;
*
* TODO: convert to struct instead of pointer?
*/
extern struct poller *rtpe_poller;
/* Used when the poller-per-thread option is set */
extern struct poller_map *rtpe_poller_map;
extern struct poller **rtpe_pollers; // at least one poller, in an array
extern struct poller *rtpe_control_poller; // poller for control sockets (maybe rtpe_pollers[0])
extern unsigned int num_media_pollers; // for media sockets, >= 1
extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread
INLINE struct poller *rtpe_get_poller(void) {
// XXX optimise this for num_media_pollers == 1 ?
return rtpe_pollers[g_atomic_int_add(&rtpe_poller_rr_iter, 1) % num_media_pollers];
}
extern struct rtpengine_config rtpe_config;
extern struct rtpengine_config initial_rtpe_config;


+ 0
- 2
include/tcp_listener.h View File

@ -33,8 +33,6 @@ struct streambuf_stream {
};
int tcp_listener_init(socket_t *, const endpoint_t *, tcp_listener_callback_t, struct obj *);
int streambuf_listener_init(struct streambuf_listener *listener, const endpoint_t *ep,
streambuf_callback_t newconn_func,
streambuf_callback_t newdata_func,


+ 0
- 68
lib/poller.c View File

@ -35,66 +35,6 @@ struct poller {
GPtrArray *items;
};
struct poller_map {
mutex_t lock;
GHashTable *table;
};
struct poller_map *poller_map_new(void) {
struct poller_map *p;
p = g_slice_alloc0(sizeof(*p));
mutex_init(&p->lock);
p->table = g_hash_table_new(g_direct_hash, g_direct_equal);
return p;
}
static void poller_map_add(struct poller_map *map) {
pthread_t tid = -1;
struct poller *p;
if (!map)
return;
tid = pthread_self();
LOCK(&map->lock);
p = poller_new();
if (p)
g_hash_table_insert(map->table, (gpointer)tid, p);
}
struct poller *poller_map_get(struct poller_map *map) {
if (!map)
return NULL;
struct poller *p = NULL;
pthread_t tid = pthread_self();
LOCK(&map->lock);
p = g_hash_table_lookup(map->table, (gpointer)tid);
if (!p) {
gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL);
p = g_hash_table_lookup(map->table, arr[ssl_random() % g_hash_table_size(map->table)]);
g_free(arr);
}
return p;
}
static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) {
struct poller *p = (struct poller *)v;
poller_free(&p);
}
void poller_map_free(struct poller_map **map) {
struct poller_map *m = *map;
if (!m)
return;
g_hash_table_foreach(m->table, poller_map_free_poller, NULL);
g_hash_table_destroy(m->table);
mutex_destroy(&m->lock);
g_slice_free1(sizeof(*m), m);
*map = NULL;
}
static void poller_free_item(struct poller_item_int *ele) {
if (ele)
obj_put(ele);
@ -369,14 +309,6 @@ out:
}
void poller_loop(void *d) {
struct poller_map *map = d;
poller_map_add(map);
struct poller *p = poller_map_get(map);
poller_loop2(p);
}
void poller_loop2(void *d) {
struct poller *p = d;
int poller_size = rtpe_common_config_ptr->poller_size;
struct epoll_event *evs;


+ 0
- 5
lib/poller.h View File

@ -27,12 +27,8 @@ struct poller_item {
};
struct poller;
struct poller_map;
struct poller *poller_new(void);
struct poller_map *poller_map_new(void);
struct poller *poller_map_get(struct poller_map *);
void poller_map_free(struct poller_map **);
void poller_free(struct poller **);
int poller_add_item(struct poller *, struct poller_item *);
int poller_del_item(struct poller *, int);
@ -42,7 +38,6 @@ int poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *);
void poller_loop(void *);
void poller_loop2(void *);
#endif

+ 1
- 1
perf-tester/main.c View File

@ -289,7 +289,7 @@ static void *worker(void *p) {
LOCK(&other_threads_lock);
g_hash_table_insert(worker_threads, GINT_TO_POINTER(worker_self->pid), NULL);
}
poller_loop2(rtpe_poller);
poller_loop(rtpe_poller);
return NULL;
}


+ 4
- 3
t/test-stats.c View File

@ -15,8 +15,10 @@ struct rtpengine_config rtpe_config = {
.dtls_rsa_key_size = 2048,
};
struct rtpengine_config initial_rtpe_config;
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
struct poller **rtpe_pollers;
struct poller *rtpe_control_poller;
unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter;
GString *dtmf_logs;
GQueue rtpe_control_ng = G_QUEUE_INIT;
@ -64,7 +66,6 @@ int main(void) {
endpoint_parse_any(&rtpe_config.graphite_ep, "1.2.3.4:4567");
rtpe_ssl_init();
rtpe_poller = poller_new();
call_init();
statistics_init();
call_interfaces_init();


+ 4
- 2
t/test-transcode.c View File

@ -11,8 +11,10 @@ int _log_facility_cdr;
int _log_facility_dtmf;
struct rtpengine_config rtpe_config;
struct rtpengine_config initial_rtpe_config;
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
struct poller **rtpe_pollers;
struct poller *rtpe_control_poller;
unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter;
GString *dtmf_logs;
GQueue rtpe_control_ng = G_QUEUE_INIT;


Loading…
Cancel
Save