From d96dbdea17d0a181bea423d44d01a95f2fc7a221 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 27 Feb 2019 11:49:27 -0500 Subject: [PATCH] TT#50652 split timer thread functions into separate file Change-Id: I445f3e2556ce647bf7e245e9612f4babcf04e388 --- daemon/Makefile | 2 +- daemon/ice.c | 116 ++++++++++-------------------------------- daemon/log_funcs.h | 4 +- daemon/timerthread.c | 87 +++++++++++++++++++++++++++++++ include/ice.h | 6 +-- include/timerthread.h | 33 ++++++++++++ t/.gitignore | 1 + t/Makefile | 4 +- 8 files changed, 155 insertions(+), 98 deletions(-) create mode 100644 daemon/timerthread.c create mode 100644 include/timerthread.h diff --git a/daemon/Makefile b/daemon/Makefile index 1ec226c51..ddbfb4d07 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -123,7 +123,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c load.c dtmf.c + codec.c load.c dtmf.c timerthread.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/ice.c b/daemon/ice.c index 8fc0d04f7..350119145 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -10,6 +10,7 @@ #include "stun.h" #include "poller.h" #include "log_funcs.h" +#include "timerthread.h" @@ -50,14 +51,13 @@ static void __agent_schedule_abs(struct ice_agent *ag, const struct timeval *tv) static void __agent_deschedule(struct ice_agent *ag); static void __ice_agent_free_components(struct ice_agent *ag); static void __agent_shutdown(struct ice_agent *ag); +static void ice_agents_timer_run(void *); static u_int64_t tie_breaker; -static mutex_t ice_agents_timers_lock = MUTEX_STATIC_INIT; -static cond_t ice_agents_timers_cond = COND_STATIC_INIT; -static GTree *ice_agents_timers; +static struct timerthread ice_agents_timer_thread; static const char ice_chars[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; @@ -248,6 +248,7 @@ static struct ice_agent *__ice_agent_new(struct call_media *media) { struct call *call = media->call; ag = obj_alloc0("ice_agent", sizeof(*ag), __ice_agent_free); + ag->tt_obj.tt = &ice_agents_timer_thread; ag->call = obj_get(call); ag->media = media; mutex_init(&ag->lock); @@ -280,7 +281,7 @@ static void __ice_reset(struct ice_agent *ag) { __ice_agent_free_components(ag); ZERO(ag->active_components); ZERO(ag->start_nominating); - ZERO(ag->last_run); + ZERO(ag->tt_obj.last_run); __ice_agent_initialize(ag); } @@ -478,7 +479,7 @@ void ice_shutdown(struct ice_agent **agp) { __agent_deschedule(ag); *agp = NULL; - obj_put(ag); + obj_put(&ag->tt_obj); } static void __ice_agent_free_components(struct ice_agent *ag) { if (!ag) { @@ -527,56 +528,29 @@ static void __agent_schedule_abs(struct ice_agent *ag, const struct timeval *tv) struct timeval nxt; long long diff; - if (!ag) { - ilog(LOG_ERR, "ice ag is NULL"); + if (!ag) return; - } nxt = *tv; - mutex_lock(&ice_agents_timers_lock); - if (ag->last_run.tv_sec) { + mutex_lock(&ice_agents_timer_thread.lock); + if (ag->tt_obj.last_run.tv_sec) { /* make sure we don't run more often than we should */ - diff = timeval_diff(&nxt, &ag->last_run); + diff = timeval_diff(&nxt, &ag->tt_obj.last_run); if (diff < TIMER_RUN_INTERVAL * 1000) timeval_add_usec(&nxt, TIMER_RUN_INTERVAL * 1000 - diff); } - if (ag->next_check.tv_sec && timeval_cmp(&ag->next_check, &nxt) <= 0) - goto nope; /* already scheduled sooner */ - if (!g_tree_remove(ice_agents_timers, ag)) - obj_hold(ag); /* if it wasn't removed, we make a new reference */ - ag->next_check = nxt; - g_tree_insert(ice_agents_timers, ag, ag); - cond_broadcast(&ice_agents_timers_cond); -nope: - mutex_unlock(&ice_agents_timers_lock); + timerthread_obj_schedule_abs_nl(&ag->tt_obj, &nxt); + mutex_unlock(&ice_agents_timer_thread.lock); } static void __agent_deschedule(struct ice_agent *ag) { - int ret; - - if (!ag) { - ilog(LOG_ERR, "ice ag is NULL"); - return; - } - - mutex_lock(&ice_agents_timers_lock); - if (!ag->next_check.tv_sec) - goto nope; /* already descheduled */ - ret = g_tree_remove(ice_agents_timers, ag); - ZERO(ag->next_check); - if (ret) - obj_put(ag); -nope: - mutex_unlock(&ice_agents_timers_lock); + if (ag) + timerthread_obj_deschedule(&ag->tt_obj); } -static int __ice_agent_timer_cmp(const void *a, const void *b) { - const struct ice_agent *A = a, *B = b; - return timeval_cmp_ptr(&A->next_check, &B->next_check); -} void ice_init(void) { random_string((void *) &tie_breaker, sizeof(tie_breaker)); - ice_agents_timers = g_tree_new(__ice_agent_timer_cmp); + timerthread_init(&ice_agents_timer_thread, ice_agents_timer_run); } @@ -1299,58 +1273,22 @@ err: void ice_thread_run(void *p) { - struct ice_agent *ag; + timerthread_run(&ice_agents_timer_thread); +} +static void ice_agents_timer_run(void *ptr) { + struct ice_agent *ag = ptr; struct call *call; - long long sleeptime; - struct timeval tv; - - mutex_lock(&ice_agents_timers_lock); - - while (!rtpe_shutdown) { - gettimeofday(&rtpe_now, NULL); - - /* lock our list and get the first element */ - ag = g_tree_find_first(ice_agents_timers, NULL, NULL); - /* scheduled to run? if not, we just go to sleep, otherwise we remove it from the tree, - * steal the reference and run it */ - if (!ag) - goto sleep; - if (timeval_cmp(&rtpe_now, &ag->next_check) < 0) - goto sleep; - g_tree_remove(ice_agents_timers, ag); - ZERO(ag->next_check); - ag->last_run = rtpe_now; - mutex_unlock(&ice_agents_timers_lock); + call = ag->call; + log_info_ice_agent(ag); + rwlock_lock_r(&call->master_lock); - /* this agent is scheduled to run right now */ - - /* lock the call */ - call = ag->call; - log_info_ice_agent(ag); - rwlock_lock_r(&call->master_lock); - - /* and run our checks */ - __do_ice_checks(ag); - - /* finally, release our reference and start over */ - log_info_clear(); - rwlock_unlock_r(&call->master_lock); - obj_put(ag); - mutex_lock(&ice_agents_timers_lock); - continue; - -sleep: - /* figure out how long we should sleep */ - sleeptime = ag ? timeval_diff(&ag->next_check, &rtpe_now) : 100000; - sleeptime = MIN(100000, sleeptime); /* 100 ms at the most */ - tv = rtpe_now; - timeval_add_usec(&tv, sleeptime); - cond_timedwait(&ice_agents_timers_cond, &ice_agents_timers_lock, &tv); - continue; - } + /* and run our checks */ + __do_ice_checks(ag); - mutex_unlock(&ice_agents_timers_lock); + /* finally, release our reference and start over */ + log_info_clear(); + rwlock_unlock_r(&call->master_lock); } static void random_ice_string(char *buf, int len) { diff --git a/daemon/log_funcs.h b/daemon/log_funcs.h index a8330ca07..cfea369e1 100644 --- a/daemon/log_funcs.h +++ b/daemon/log_funcs.h @@ -19,7 +19,7 @@ INLINE void log_info_clear(void) { obj_put(log_info.u.stream_fd); break; case LOG_INFO_ICE_AGENT: - obj_put(log_info.u.ice_agent); + obj_put(&log_info.u.ice_agent->tt_obj); break; case LOG_INFO_STR: case LOG_INFO_C_STRING: @@ -61,7 +61,7 @@ INLINE void log_info_ice_agent(struct ice_agent *ag) { if (!ag) return; log_info.e = LOG_INFO_ICE_AGENT; - log_info.u.ice_agent = obj_get(ag); + log_info.u.ice_agent = obj_get(&ag->tt_obj); } diff --git a/daemon/timerthread.c b/daemon/timerthread.c new file mode 100644 index 000000000..86e6ccca5 --- /dev/null +++ b/daemon/timerthread.c @@ -0,0 +1,87 @@ +#include "timerthread.h" +#include "aux.h" + + +static int tt_obj_cmp(const void *a, const void *b) { + const struct timerthread_obj *A = a, *B = b; + return timeval_cmp_ptr(&A->next_check, &B->next_check); +} + +void timerthread_init(struct timerthread *tt, void (*func)(void *)) { + tt->tree = g_tree_new(tt_obj_cmp); + mutex_init(&tt->lock); + cond_init(&tt->cond); + tt->func = func; +} + +void timerthread_run(void *p) { + struct timerthread *tt = p; + + mutex_lock(&tt->lock); + + while (!rtpe_shutdown) { + gettimeofday(&rtpe_now, NULL); + + /* lock our list and get the first element */ + struct timerthread_obj *tt_obj = g_tree_find_first(tt->tree, NULL, NULL); + /* scheduled to run? if not, we just go to sleep, otherwise we remove it from the tree, + * steal the reference and run it */ + if (!tt_obj) + goto sleep; + if (timeval_cmp(&rtpe_now, &tt_obj->next_check) < 0) + goto sleep; + + // steal reference + g_tree_remove(tt->tree, tt_obj); + ZERO(tt_obj->next_check); + tt_obj->last_run = rtpe_now; + mutex_unlock(&tt->lock); + + // run and release + tt->func(tt_obj); + obj_put(tt_obj); + + mutex_lock(&tt->lock); + continue; + +sleep:; + /* figure out how long we should sleep */ + long long sleeptime = tt_obj ? timeval_diff(&tt_obj->next_check, &rtpe_now) : 100000; + sleeptime = MIN(100000, sleeptime); /* 100 ms at the most */ + struct timeval tv = rtpe_now; + timeval_add_usec(&tv, sleeptime); + cond_timedwait(&tt->cond, &tt->lock, &tv); + } + + mutex_unlock(&tt->lock); +} + +void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, const struct timeval *tv) { + if (!tt_obj) + return; + + struct timerthread *tt = tt_obj->tt; + if (tt_obj->next_check.tv_sec && timeval_cmp(&tt_obj->next_check, tv) <= 0) + return; /* already scheduled sooner */ + if (!g_tree_remove(tt->tree, tt_obj)) + obj_hold(tt_obj); /* if it wasn't removed, we make a new reference */ + tt_obj->next_check = *tv; + g_tree_insert(tt->tree, tt_obj, tt_obj); + cond_broadcast(&tt->cond); +} + +void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) { + if (!tt_obj) + return; + + struct timerthread *tt = tt_obj->tt; + mutex_lock(&tt->lock); + if (!tt_obj->next_check.tv_sec) + goto nope; /* already descheduled */ + int ret = g_tree_remove(tt->tree, tt_obj); + ZERO(tt_obj->next_check); + if (ret) + obj_put(tt_obj); +nope: + mutex_unlock(&tt->lock); +} diff --git a/include/ice.h b/include/ice.h index 660333164..b76a5dfaf 100644 --- a/include/ice.h +++ b/include/ice.h @@ -12,6 +12,7 @@ #include "aux.h" #include "media_socket.h" #include "socket.h" +#include "timerthread.h" @@ -101,7 +102,7 @@ struct ice_candidate_pair { /* these are protected by the call's master_lock */ struct ice_agent { - struct obj obj; + struct timerthread_obj tt_obj; struct call *call; /* main reference */ struct call_media *media; const struct logical_intf *logical_intf; @@ -128,9 +129,6 @@ struct ice_agent { str ufrag[2]; /* 0 = remote, 1 = local */ str pwd[2]; /* ditto */ volatile unsigned int agent_flags; - - struct timeval next_check; /* protected by ice_agents_timers_lock */ - struct timeval last_run; /* ditto */ }; diff --git a/include/timerthread.h b/include/timerthread.h new file mode 100644 index 000000000..7ea4f299b --- /dev/null +++ b/include/timerthread.h @@ -0,0 +1,33 @@ +#ifndef _TIMERTHREAD_H_ +#define _TIMERTHREAD_H_ + +#include "obj.h" +#include +#include +#include "auxlib.h" + + +struct timerthread { + GTree *tree; + mutex_t lock; + cond_t cond; + void (*func)(void *); +}; + +struct timerthread_obj { + struct obj obj; + + struct timerthread *tt; + struct timeval next_check; /* protected by ->lock */ + struct timeval last_run; /* ditto */ +}; + + +void timerthread_init(struct timerthread *, void (*)(void *)); +void timerthread_run(void *); + +void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *); +void timerthread_obj_deschedule(struct timerthread_obj *); + + +#endif diff --git a/t/.gitignore b/t/.gitignore index 84d496f41..e377840f1 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -46,3 +46,4 @@ payload-tracker-test dtmf.c const_str_hash-test.strhash tests-preload.so +timerthread.c diff --git a/t/Makefile b/t/Makefile index ab425e92d..8d24eeab4 100644 --- a/t/Makefile +++ b/t/Makefile @@ -65,7 +65,7 @@ endif LIBSRCS+= codeclib.c resample.c socket.c streambuf.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ - cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c + cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -116,7 +116,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \ rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ - streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o + streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o