Browse Source

MT#57268 Move release closed sockets to a separate thread

To do the work more efficiently and not be dependent on
the `call_timer` runs by poller, we should move
the releasing of sockets to a separate thread, to make it
faster and not be dependent on what happens in the `call_timer`
at all. Since it has nothing to do with the call timers.

Since now we have two queues:
- thread scope (local): ports_to_release
- global one: ports_to_release_glob

`sockets_releaser()` uses the ports_to_release_glob,
meanwhile appending in the `call_timer()` happens using the
ports_to_release.

Change-Id: Iadd966ac895b2dd64f81269d4fdf5d83747fe0b7
pull/1646/head
Donat Zenichev 3 years ago
parent
commit
a236f465e5
4 changed files with 62 additions and 8 deletions
  1. +2
    -1
      daemon/call.c
  2. +12
    -1
      daemon/main.c
  3. +46
    -6
      daemon/media_socket.c
  4. +2
    -0
      include/media_socket.h

+ 2
- 1
daemon/call.c View File

@ -759,7 +759,8 @@ next:
ilog(LOG_INFO, "Decreasing timer run interval to %llu seconds", interval / 1000000);
}
release_closed_sockets();
/* add thread scope (local) sockets to the global list, in order to release them later */
append_thread_lpr_to_glob_lpr();
}
#undef DS


+ 12
- 1
daemon/main.c View File

@ -1335,9 +1335,20 @@ int main(int argc, char **argv) {
ilog(LOG_INFO, "Startup complete, version %s", RTPENGINE_VERSION);
thread_create_detach(sighandler, NULL, "signal handler");
/* a single thread which is running a poller_timer_loop,
* it calls each second a loop and if it finds something it does some work.
*/
thread_create_detach_prio(poller_timer_loop, rtpe_poller, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "poller timer");
thread_create_detach_prio(load_thread, NULL, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "load monitor");
/* load monitoring thread */
thread_create_detach_prio(load_thread, NULL, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "load monitor");
/* separate thread for releasing ports (sockets), which are scheduled for clearing */
thread_create_detach_prio(sockets_releaser, NULL, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "release closed sockets");
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async)
thread_create_detach(redis_delete_async_loop, NULL, "redis async");


+ 46
- 6
daemon/media_socket.c View File

@ -86,8 +86,11 @@ struct interface_stats_interval {
};
/* thread scope (local) queue for sockets to be released, only appending here */
static __thread GQueue ports_to_release = G_QUEUE_INIT;
/* global queue for sockets to be released, releasing by `sockets_releaser()` is done using that */
static GQueue ports_to_release_glob = G_QUEUE_INIT;
mutex_t ports_to_release_glob_lock = MUTEX_STATIC_INIT;
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *);
@ -952,11 +955,49 @@ static void release_port_now(socket_t *r, struct intf_spec *spec) {
ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port);
}
}
/**
* Sockets releaser.
*/
void release_closed_sockets(void) {
struct late_port_release *lpr;
while ((lpr = g_queue_pop_head(&ports_to_release))) {
release_port_now(&lpr->socket, lpr->spec);
g_slice_free1(sizeof(*lpr), lpr);
struct late_port_release * lpr;
/* for the separate releaser thread (one working with `sockets_releaser()`)
* it does no job. But only for those threads related to calls processing.
*/
if (ports_to_release.head)
append_thread_lpr_to_glob_lpr();
if (ports_to_release_glob.head) {
mutex_lock(&ports_to_release_glob_lock);
GQueue ports_left = ports_to_release_glob;
g_queue_init(&ports_to_release_glob);
mutex_unlock(&ports_to_release_glob_lock);
while ((lpr = g_queue_pop_head(&ports_left))) {
release_port_now(&lpr->socket, lpr->spec);
g_slice_free1(sizeof(*lpr), lpr);
}
}
}
/**
* Appends thread scope (local) sockets to the global releasing list.
*/
void append_thread_lpr_to_glob_lpr(void) {
mutex_lock(&ports_to_release_glob_lock);
g_queue_move(&ports_to_release_glob, &ports_to_release); /* dst, src */
mutex_unlock(&ports_to_release_glob_lock);
}
/**
* Separate thread for releasing sockets scheduled for closing.
*/
void sockets_releaser(void * dummy) {
while (!rtpe_shutdown) {
release_closed_sockets();
thread_cancel_enable();
usleep(1000000); /* sleep for 1 second in each iteration */
thread_cancel_disable();
}
}
@ -3052,7 +3093,6 @@ done:
static void stream_fd_free(void *p) {
struct stream_fd *f = p;
release_port(&f->socket, f->local_intf->spec);
crypto_cleanup(&f->crypto);
dtls_connection_cleanup(&f->dtls);


+ 2
- 0
include/media_socket.h View File

@ -299,6 +299,8 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_in
struct stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(struct stream_fd *);
void release_closed_sockets(void);
void sockets_releaser(void * dummy);
void append_thread_lpr_to_glob_lpr(void);
void free_intf_list(struct intf_list *il);
void free_release_intf_list(struct intf_list *il);


Loading…
Cancel
Save