diff --git a/daemon/main.c b/daemon/main.c index 79341fc24..d8018ffe7 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -53,6 +53,7 @@ struct poller *rtpe_poller; +struct poller_map *rtpe_poller_map; struct rtpengine_config initial_rtpe_config; static struct control_tcp *rtpe_tcp; @@ -930,6 +931,10 @@ no_kernel: if (!rtpe_poller) die("poller creation failed"); + rtpe_poller_map = poller_map_new(); + if (!rtpe_poller_map) + die("poller map creation failed"); + dtls_timer(rtpe_poller); if (call_init()) @@ -1076,8 +1081,9 @@ int main(int argc, char **argv) { service_notify("READY=1\n"); for (idx = 0; idx < rtpe_config.num_threads; ++idx) - thread_create_detach_prio(poller_loop, rtpe_poller, rtpe_config.scheduling, - rtpe_config.priority, "poller"); + thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller"); + + thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); if (rtpe_config.media_num_threads < 0) rtpe_config.media_num_threads = rtpe_config.num_threads; @@ -1135,7 +1141,6 @@ int main(int argc, char **argv) { codeclib_free(); statistics_free(); call_interfaces_free(); - interfaces_free(); ice_free(); dtls_cert_free(); control_ng_cleanup(); @@ -1157,6 +1162,8 @@ int main(int argc, char **argv) { obj_release(rtpe_tcp); obj_release(rtpe_control_ng); poller_free(&rtpe_poller); + poller_map_free(&rtpe_poller_map); + interfaces_free(); return 0; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 8a3b65b2f..a9520b56d 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2223,6 +2223,7 @@ static void stream_fd_free(void *p) { struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif) { struct stream_fd *sfd; struct poller_item pi; + struct poller *p; sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); sfd->unique_id = g_queue_get_length(&call->stream_fds); @@ -2240,8 +2241,11 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo pi.readable = stream_fd_readable; pi.closed = stream_fd_closed; - if (poller_add_item(rtpe_poller, &pi)) - ilog(LOG_ERR, "Failed to add stream_fd to poller"); + p = poller_map_get(rtpe_poller_map); + if (p) { + if (poller_add_item(p, &pi)) + ilog(LOG_ERR, "Failed to add stream_fd to poller"); + } return sfd; } diff --git a/daemon/poller.c b/daemon/poller.c index 535b8a1eb..71b4dc46a 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -50,9 +51,71 @@ struct poller { GSList *timers_del; }; +struct poller_map { + mutex_t lock; + GHashTable *table; +}; + +struct poller_map *poller_map_new(void) { + struct poller_map *p; + + p = malloc(sizeof(*p)); + memset(p, 0, sizeof(*p)); + mutex_init(&p->lock); + p->table = g_hash_table_new(g_direct_hash, g_direct_equal); + return p; +} + +long poller_map_add(struct poller_map *map) { + long tid = -1; + struct poller *p; + if (!map) + return tid; + tid = syscall(SYS_gettid); + + mutex_lock(&map->lock); + p = poller_new(); + g_hash_table_insert(map->table, (gpointer)tid, p); + mutex_unlock(&map->lock); + return tid; +} + +struct poller *poller_map_get(struct poller_map *map) { + if (!map) + return NULL; + + struct poller *p = NULL; + long tid = syscall(SYS_gettid); + mutex_lock(&map->lock); + p = g_hash_table_lookup(map->table, (gpointer)tid); + if (!p) { + gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL); + GRand *rnd = g_rand_new(); + p = g_hash_table_lookup(map->table, arr[g_rand_int_range(rnd, 0, g_hash_table_size(map->table))]); + g_rand_free(rnd); + } + mutex_unlock(&map->lock); + return p; +} +static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) { + struct poller *p = (struct poller *)v; + poller_free(&p); +} +void poller_map_free(struct poller_map **map) { + struct poller_map *m = *map; + if (!m) + return; + mutex_lock(&m->lock); + g_hash_table_foreach(m->table, poller_map_free_poller, NULL); + g_hash_table_destroy(m->table); + mutex_unlock(&m->lock); + mutex_destroy(&m->lock); + free(m); + *map = NULL; +} struct poller *poller_new(void) { struct poller *p; @@ -537,7 +600,31 @@ now: } } +static void sleep_ms(int ms) { + struct timespec deadline; + long next_tick; + clock_gettime(CLOCK_MONOTONIC, &deadline); + + next_tick = (deadline.tv_sec * 1000000000L + deadline.tv_nsec) + ms * 1000000; + deadline.tv_sec = next_tick / 1000000000L; + deadline.tv_nsec = next_tick % 1000000000L; + + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &deadline, NULL); +} + void poller_loop(void *d) { + struct poller_map *map = d; + poller_map_add(map); + struct poller *p = poller_map_get(map); + + while (!rtpe_shutdown) { + int ret = poller_poll(p, 100); + if (ret < 0) + sleep_ms(10); + } +} + +void poller_loop2(void *d) { struct poller *p = d; while (!rtpe_shutdown) diff --git a/include/main.h b/include/main.h index 280d09c5e..8bebd5351 100644 --- a/include/main.h +++ b/include/main.h @@ -120,8 +120,10 @@ struct rtpengine_config { struct poller; -extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer? +struct poller_map; +extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer? +extern struct poller_map *rtpe_poller_map; extern struct rtpengine_config rtpe_config; extern struct rtpengine_config initial_rtpe_config; diff --git a/include/poller.h b/include/poller.h index d708eb7e8..8aa15942f 100644 --- a/include/poller.h +++ b/include/poller.h @@ -28,9 +28,12 @@ struct poller_item { }; struct poller; - +struct poller_map; struct poller *poller_new(void); +struct poller_map *poller_map_new(void); +struct poller *poller_map_get(struct poller_map *); +void poller_map_free(struct poller_map **); void poller_free(struct poller **); int poller_add_item(struct poller *, struct poller_item *); int poller_update_item(struct poller *, struct poller_item *); @@ -43,6 +46,7 @@ void poller_error(struct poller *, void *); int poller_poll(struct poller *, int); void poller_timer_loop(void *); void poller_loop(void *); +void poller_loop2(void *); int poller_add_timer(struct poller *, void (*)(void *), struct obj *); int poller_del_timer(struct poller *, void (*)(void *), struct obj *); diff --git a/t/test-transcode.c b/t/test-transcode.c index bd1c1bb6d..d3b49b248 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -10,6 +10,7 @@ int _log_facility_cdr; int _log_facility_dtmf; struct rtpengine_config rtpe_config; struct poller *rtpe_poller; +struct poller_map *rtpe_poller_map; GString *dtmf_logs; static str *sdup(char *s) {