diff --git a/daemon/main.c b/daemon/main.c index 79341fc24..61f1d7685 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -53,6 +53,7 @@ struct poller *rtpe_poller; +struct poller_map *rtpe_poller_map; struct rtpengine_config initial_rtpe_config; static struct control_tcp *rtpe_tcp; @@ -478,6 +479,7 @@ static void options(int *argc, char ***argv) { { "https-key", 0,0, G_OPTION_ARG_STRING, &rtpe_config.https_key, "Private key for HTTPS and WSS","FILE"}, { "http-threads", 0,0, G_OPTION_ARG_INT, &rtpe_config.http_threads,"Number of worker threads for HTTP and WS","INT"}, { "software-id", 0,0, G_OPTION_ARG_STRING, &rtpe_config.software_id,"Identification string of this software presented to external systems","STRING"}, + { "poller-per-thread", 0,0, G_OPTION_ARG_NONE, &rtpe_config.poller_per_thread, "Use poller per thread", NULL }, #ifdef WITH_TRANSCODING { "dtx-delay", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_delay, "Delay in milliseconds to trigger DTX handling","INT"}, { "max-dtx", 0,0, G_OPTION_ARG_INT, &rtpe_config.max_dtx, "Maximum duration of DTX handling", "INT"}, @@ -930,6 +932,10 @@ no_kernel: if (!rtpe_poller) die("poller creation failed"); + rtpe_poller_map = poller_map_new(); + if (!rtpe_poller_map) + die("poller map creation failed"); + dtls_timer(rtpe_poller); if (call_init()) @@ -1075,9 +1081,15 @@ int main(int argc, char **argv) { service_notify("READY=1\n"); - for (idx = 0; idx < rtpe_config.num_threads; ++idx) - thread_create_detach_prio(poller_loop, rtpe_poller, rtpe_config.scheduling, - rtpe_config.priority, "poller"); + for (idx = 0; idx < rtpe_config.num_threads; ++idx) { + if (!rtpe_config.poller_per_thread) + thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); + else + thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller"); + } + + if (!rtpe_config.poller_per_thread) + thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); if (rtpe_config.media_num_threads < 0) rtpe_config.media_num_threads = rtpe_config.num_threads; @@ -1135,7 +1147,6 @@ int main(int argc, char **argv) { codeclib_free(); statistics_free(); call_interfaces_free(); - interfaces_free(); ice_free(); dtls_cert_free(); control_ng_cleanup(); @@ -1157,6 +1168,8 @@ int main(int argc, char **argv) { obj_release(rtpe_tcp); obj_release(rtpe_control_ng); poller_free(&rtpe_poller); + poller_map_free(&rtpe_poller_map); + interfaces_free(); return 0; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 8a3b65b2f..495e18581 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2223,6 +2223,7 @@ static void stream_fd_free(void *p) { struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif) { struct stream_fd *sfd; struct poller_item pi; + struct poller *p = rtpe_poller; sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); sfd->unique_id = g_queue_get_length(&call->stream_fds); @@ -2240,8 +2241,12 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo pi.readable = stream_fd_readable; pi.closed = stream_fd_closed; - if (poller_add_item(rtpe_poller, &pi)) - ilog(LOG_ERR, "Failed to add stream_fd to poller"); + if (rtpe_config.poller_per_thread) + p = poller_map_get(rtpe_poller_map); + if (p) { + if (poller_add_item(p, &pi)) + ilog(LOG_ERR, "Failed to add stream_fd to poller"); + } return sfd; } diff --git a/daemon/poller.c b/daemon/poller.c index 535b8a1eb..2b1a1de32 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -50,9 +50,69 @@ struct poller { GSList *timers_del; }; +struct poller_map { + mutex_t lock; + GHashTable *table; +}; + +struct poller_map *poller_map_new(void) { + struct poller_map *p; + + p = malloc(sizeof(*p)); + memset(p, 0, sizeof(*p)); + mutex_init(&p->lock); + p->table = g_hash_table_new(g_direct_hash, g_direct_equal); + + return p; +} + +void poller_map_add(struct poller_map *map) { + pthread_t tid = -1; + struct poller *p; + if (!map) + return; + tid = pthread_self(); + mutex_lock(&map->lock); + p = poller_new(); + g_hash_table_insert(map->table, (gpointer)tid, p); + mutex_unlock(&map->lock); +} +struct poller *poller_map_get(struct poller_map *map) { + if (!map) + return NULL; + + struct poller *p = NULL; + pthread_t tid = pthread_self(); + mutex_lock(&map->lock); + p = g_hash_table_lookup(map->table, (gpointer)tid); + if (!p) { + gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL); + p = g_hash_table_lookup(map->table, arr[ssl_random() % g_hash_table_size(map->table)]); + g_free(arr); + } + mutex_unlock(&map->lock); + return p; +} +static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) { + struct poller *p = (struct poller *)v; + poller_free(&p); +} + +void poller_map_free(struct poller_map **map) { + struct poller_map *m = *map; + if (!m) + return; + mutex_lock(&m->lock); + g_hash_table_foreach(m->table, poller_map_free_poller, NULL); + g_hash_table_destroy(m->table); + mutex_unlock(&m->lock); + mutex_destroy(&m->lock); + free(m); + *map = NULL; +} struct poller *poller_new(void) { struct poller *p; @@ -538,8 +598,19 @@ now: } void poller_loop(void *d) { + struct poller_map *map = d; + poller_map_add(map); + struct poller *p = poller_map_get(map); + + poller_loop2(p); +} + +void poller_loop2(void *d) { struct poller *p = d; - while (!rtpe_shutdown) - poller_poll(p, 100); + while (!rtpe_shutdown) { + int ret = poller_poll(p, 100); + if (ret < 0) + usleep(20 * 1000); + } } diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index ba49fa7f3..fb0427ede 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -848,6 +848,14 @@ information. Always sets the option B in answer messages as described in the F. +=item B<--poller-per-thread> + +Enable 'poller per thread' functionality: for every worker thread (see the +--num-threads option) a poller will be created. With this option on, it is +guaranteed that only a single thread will ever read from a particular socket, +thus maintaining the order of the packets. Might help when having issues with +DTMF packets (RFC 2833). + =back =head1 INTERFACES diff --git a/include/main.h b/include/main.h index 280d09c5e..f861e453e 100644 --- a/include/main.h +++ b/include/main.h @@ -116,12 +116,15 @@ struct rtpengine_config { str cn_payload; int reorder_codecs; char *software_id; + int poller_per_thread; }; struct poller; -extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer? +struct poller_map; +extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer? +extern struct poller_map *rtpe_poller_map; extern struct rtpengine_config rtpe_config; extern struct rtpengine_config initial_rtpe_config; diff --git a/include/poller.h b/include/poller.h index d708eb7e8..8aa15942f 100644 --- a/include/poller.h +++ b/include/poller.h @@ -28,9 +28,12 @@ struct poller_item { }; struct poller; - +struct poller_map; struct poller *poller_new(void); +struct poller_map *poller_map_new(void); +struct poller *poller_map_get(struct poller_map *); +void poller_map_free(struct poller_map **); void poller_free(struct poller **); int poller_add_item(struct poller *, struct poller_item *); int poller_update_item(struct poller *, struct poller_item *); @@ -43,6 +46,7 @@ void poller_error(struct poller *, void *); int poller_poll(struct poller *, int); void poller_timer_loop(void *); void poller_loop(void *); +void poller_loop2(void *); int poller_add_timer(struct poller *, void (*)(void *), struct obj *); int poller_del_timer(struct poller *, void (*)(void *), struct obj *); diff --git a/t/test-transcode.c b/t/test-transcode.c index bd1c1bb6d..d3b49b248 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -10,6 +10,7 @@ int _log_facility_cdr; int _log_facility_dtmf; struct rtpengine_config rtpe_config; struct poller *rtpe_poller; +struct poller_map *rtpe_poller_map; GString *dtmf_logs; static str *sdup(char *s) {