diff --git a/daemon/aux.c b/daemon/aux.c index 02b6cfc93..6e6aa65fc 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -34,7 +34,7 @@ struct scheduler { int nice; }; struct looper_thread { - void (*f)(void); + enum thread_looper_action (*f)(void); const char *name; long long interval_us; }; @@ -322,7 +322,7 @@ static void thread_looper_helper(void *fp) { while (!rtpe_shutdown) { gettimeofday(&rtpe_now, NULL); - lh.f(); + enum thread_looper_action ret = lh.f(); struct timeval stop; gettimeofday(&stop, NULL); @@ -335,13 +335,17 @@ static void thread_looper_helper(void *fp) { warn_limit_pct, warn_limit_us / 1000000, warn_limit_us % 1000000); + if (ret == TLA_BREAK) + break; + thread_cancel_enable(); usleep(interval_us); thread_cancel_disable(); } } -void thread_create_looper(void (*f)(void), const char *scheduler, int priority, const char *name, +void thread_create_looper(enum thread_looper_action (*f)(void), const char *scheduler, int priority, + const char *name, long long interval_us) { struct looper_thread *lh = g_slice_alloc(sizeof(*lh)); diff --git a/daemon/ice.c b/daemon/ice.c index ae2c9f050..0368a8bb7 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -758,8 +758,9 @@ void ice_free(void) { mutex_destroy(&sdp_fragments_lock); } -void ice_slow_timer(void) { +enum thread_looper_action ice_slow_timer(void) { fragments_cleanup(false); + return TLA_CONTINUE; } static void __fail_pair(struct ice_candidate_pair *pair) { diff --git a/daemon/media_socket.c b/daemon/media_socket.c index d0c0dea57..393119e69 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -972,7 +972,7 @@ static void release_port_now(socket_t *r, struct intf_spec *spec) { /** * Sockets releaser. */ -void release_closed_sockets(void) { +enum thread_looper_action release_closed_sockets(void) { struct late_port_release * lpr; /* for the separate releaser thread (one working with `sockets_releaser()`) @@ -992,6 +992,8 @@ void release_closed_sockets(void) { g_slice_free1(sizeof(*lpr), lpr); } } + + return TLA_CONTINUE; } /** * Appends thread scope (local) sockets to the global releasing list. @@ -3327,7 +3329,7 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_ /** * Ports iterations (stats update from the kernel) functionality. */ -void kernel_stats_updater(void) { +enum thread_looper_action kernel_stats_updater(void) { struct rtpengine_list_entry *ke; struct packet_stream *ps; int j; @@ -3493,4 +3495,5 @@ next: log_info_pop(); } + return TLA_CONTINUE; } diff --git a/include/aux.h b/include/aux.h index fb684b9ba..31660448c 100644 --- a/include/aux.h +++ b/include/aux.h @@ -322,12 +322,17 @@ struct thread_waker { mutex_t *lock; cond_t *cond; }; +enum thread_looper_action { + TLA_CONTINUE, + TLA_BREAK, +}; void thread_waker_add(struct thread_waker *); void thread_waker_del(struct thread_waker *); void threads_join_all(bool cancel); void thread_create_detach_prio(void (*)(void *), void *, const char *, int, const char *); -void thread_create_looper(void (*f)(void), const char *scheduler, int priority, const char *name, long long); +void thread_create_looper(enum thread_looper_action (*f)(void), const char *scheduler, int priority, + const char *name, long long); INLINE void thread_create_detach(void (*f)(void *), void *a, const char *name) { thread_create_detach_prio(f, a, NULL, 0, name); } diff --git a/include/ice.h b/include/ice.h index 8a4af0fed..7a9fcf58e 100644 --- a/include/ice.h +++ b/include/ice.h @@ -174,7 +174,7 @@ void dequeue_sdp_fragments(struct call_monologue *); bool trickle_ice_update(struct ng_buffer *ngbuf, struct call *call, struct sdp_ng_flags *flags, GQueue *streams); -void ice_slow_timer(void); +enum thread_looper_action ice_slow_timer(void); #include "call.h" diff --git a/include/media_socket.h b/include/media_socket.h index 45d911da4..574ca49ad 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -299,7 +299,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_intf *lif); struct stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(struct stream_fd *); -void release_closed_sockets(void); +enum thread_looper_action release_closed_sockets(void); void append_thread_lpr_to_glob_lpr(void); void free_intf_list(struct intf_list *il); @@ -333,7 +333,7 @@ const struct transport_protocol *transport_protocol(const str *s); //void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered); void play_buffered(struct jb_packet *cp); -void kernel_stats_updater(void); +enum thread_looper_action kernel_stats_updater(void); INLINE int proto_is_rtp(const struct transport_protocol *protocol) { // known to be RTP? therefore unknown is not RTP