Browse Source

Implement poller per thread in order to fix packet order.

pull/1200/head
Damir Nedžibović 5 years ago
parent
commit
eded168a38
6 changed files with 112 additions and 7 deletions
  1. +10
    -3
      daemon/main.c
  2. +6
    -2
      daemon/media_socket.c
  3. +87
    -0
      daemon/poller.c
  4. +3
    -1
      include/main.h
  5. +5
    -1
      include/poller.h
  6. +1
    -0
      t/test-transcode.c

+ 10
- 3
daemon/main.c View File

@ -53,6 +53,7 @@
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
struct rtpengine_config initial_rtpe_config;
static struct control_tcp *rtpe_tcp;
@ -930,6 +931,10 @@ no_kernel:
if (!rtpe_poller)
die("poller creation failed");
rtpe_poller_map = poller_map_new();
if (!rtpe_poller_map)
die("poller map creation failed");
dtls_timer(rtpe_poller);
if (call_init())
@ -1076,8 +1081,9 @@ int main(int argc, char **argv) {
service_notify("READY=1\n");
for (idx = 0; idx < rtpe_config.num_threads; ++idx)
thread_create_detach_prio(poller_loop, rtpe_poller, rtpe_config.scheduling,
rtpe_config.priority, "poller");
thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller");
thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller");
if (rtpe_config.media_num_threads < 0)
rtpe_config.media_num_threads = rtpe_config.num_threads;
@ -1135,7 +1141,6 @@ int main(int argc, char **argv) {
codeclib_free();
statistics_free();
call_interfaces_free();
interfaces_free();
ice_free();
dtls_cert_free();
control_ng_cleanup();
@ -1157,6 +1162,8 @@ int main(int argc, char **argv) {
obj_release(rtpe_tcp);
obj_release(rtpe_control_ng);
poller_free(&rtpe_poller);
poller_map_free(&rtpe_poller_map);
interfaces_free();
return 0;
}

+ 6
- 2
daemon/media_socket.c View File

@ -2223,6 +2223,7 @@ static void stream_fd_free(void *p) {
struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif) {
struct stream_fd *sfd;
struct poller_item pi;
struct poller *p;
sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free);
sfd->unique_id = g_queue_get_length(&call->stream_fds);
@ -2240,8 +2241,11 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo
pi.readable = stream_fd_readable;
pi.closed = stream_fd_closed;
if (poller_add_item(rtpe_poller, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
p = poller_map_get(rtpe_poller_map);
if (p) {
if (poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
}
return sfd;
}


+ 87
- 0
daemon/poller.c View File

@ -11,6 +11,7 @@
#include <errno.h>
#include <sys/epoll.h>
#include <glib.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <main.h>
#include <redis.h>
@ -50,9 +51,71 @@ struct poller {
GSList *timers_del;
};
struct poller_map {
mutex_t lock;
GHashTable *table;
};
struct poller_map *poller_map_new(void) {
struct poller_map *p;
p = malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
mutex_init(&p->lock);
p->table = g_hash_table_new(g_direct_hash, g_direct_equal);
return p;
}
long poller_map_add(struct poller_map *map) {
long tid = -1;
struct poller *p;
if (!map)
return tid;
tid = syscall(SYS_gettid);
mutex_lock(&map->lock);
p = poller_new();
g_hash_table_insert(map->table, (gpointer)tid, p);
mutex_unlock(&map->lock);
return tid;
}
struct poller *poller_map_get(struct poller_map *map) {
if (!map)
return NULL;
struct poller *p = NULL;
long tid = syscall(SYS_gettid);
mutex_lock(&map->lock);
p = g_hash_table_lookup(map->table, (gpointer)tid);
if (!p) {
gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL);
GRand *rnd = g_rand_new();
p = g_hash_table_lookup(map->table, arr[g_rand_int_range(rnd, 0, g_hash_table_size(map->table))]);
g_rand_free(rnd);
}
mutex_unlock(&map->lock);
return p;
}
static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) {
struct poller *p = (struct poller *)v;
poller_free(&p);
}
void poller_map_free(struct poller_map **map) {
struct poller_map *m = *map;
if (!m)
return;
mutex_lock(&m->lock);
g_hash_table_foreach(m->table, poller_map_free_poller, NULL);
g_hash_table_destroy(m->table);
mutex_unlock(&m->lock);
mutex_destroy(&m->lock);
free(m);
*map = NULL;
}
struct poller *poller_new(void) {
struct poller *p;
@ -537,7 +600,31 @@ now:
}
}
static void sleep_ms(int ms) {
struct timespec deadline;
long next_tick;
clock_gettime(CLOCK_MONOTONIC, &deadline);
next_tick = (deadline.tv_sec * 1000000000L + deadline.tv_nsec) + ms * 1000000;
deadline.tv_sec = next_tick / 1000000000L;
deadline.tv_nsec = next_tick % 1000000000L;
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &deadline, NULL);
}
void poller_loop(void *d) {
struct poller_map *map = d;
poller_map_add(map);
struct poller *p = poller_map_get(map);
while (!rtpe_shutdown) {
int ret = poller_poll(p, 100);
if (ret < 0)
sleep_ms(10);
}
}
void poller_loop2(void *d) {
struct poller *p = d;
while (!rtpe_shutdown)


+ 3
- 1
include/main.h View File

@ -120,8 +120,10 @@ struct rtpengine_config {
struct poller;
extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer?
struct poller_map;
extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer?
extern struct poller_map *rtpe_poller_map;
extern struct rtpengine_config rtpe_config;
extern struct rtpengine_config initial_rtpe_config;


+ 5
- 1
include/poller.h View File

@ -28,9 +28,12 @@ struct poller_item {
};
struct poller;
struct poller_map;
struct poller *poller_new(void);
struct poller_map *poller_map_new(void);
struct poller *poller_map_get(struct poller_map *);
void poller_map_free(struct poller_map **);
void poller_free(struct poller **);
int poller_add_item(struct poller *, struct poller_item *);
int poller_update_item(struct poller *, struct poller_item *);
@ -43,6 +46,7 @@ void poller_error(struct poller *, void *);
int poller_poll(struct poller *, int);
void poller_timer_loop(void *);
void poller_loop(void *);
void poller_loop2(void *);
int poller_add_timer(struct poller *, void (*)(void *), struct obj *);
int poller_del_timer(struct poller *, void (*)(void *), struct obj *);


+ 1
- 0
t/test-transcode.c View File

@ -10,6 +10,7 @@ int _log_facility_cdr;
int _log_facility_dtmf;
struct rtpengine_config rtpe_config;
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
GString *dtmf_logs;
static str *sdup(char *s) {


Loading…
Cancel
Save