Browse Source

MT#55283 looper thread bool return type

Make it possible for a looper thread function to break out of the loop
by returning an appropriate status code.

Change-Id: I22e7789270eed4bf3340e7dae941929de58700ea
pull/1676/head
Richard Fuchs 3 years ago
parent
commit
b90edf0ecf
6 changed files with 23 additions and 10 deletions
  1. +7
    -3
      daemon/aux.c
  2. +2
    -1
      daemon/ice.c
  3. +5
    -2
      daemon/media_socket.c
  4. +6
    -1
      include/aux.h
  5. +1
    -1
      include/ice.h
  6. +2
    -2
      include/media_socket.h

+ 7
- 3
daemon/aux.c View File

@ -34,7 +34,7 @@ struct scheduler {
int nice; int nice;
}; };
struct looper_thread { struct looper_thread {
void (*f)(void);
enum thread_looper_action (*f)(void);
const char *name; const char *name;
long long interval_us; long long interval_us;
}; };
@ -322,7 +322,7 @@ static void thread_looper_helper(void *fp) {
while (!rtpe_shutdown) { while (!rtpe_shutdown) {
gettimeofday(&rtpe_now, NULL); gettimeofday(&rtpe_now, NULL);
lh.f();
enum thread_looper_action ret = lh.f();
struct timeval stop; struct timeval stop;
gettimeofday(&stop, NULL); gettimeofday(&stop, NULL);
@ -335,13 +335,17 @@ static void thread_looper_helper(void *fp) {
warn_limit_pct, warn_limit_pct,
warn_limit_us / 1000000, warn_limit_us % 1000000); warn_limit_us / 1000000, warn_limit_us % 1000000);
if (ret == TLA_BREAK)
break;
thread_cancel_enable(); thread_cancel_enable();
usleep(interval_us); usleep(interval_us);
thread_cancel_disable(); thread_cancel_disable();
} }
} }
void thread_create_looper(void (*f)(void), const char *scheduler, int priority, const char *name,
void thread_create_looper(enum thread_looper_action (*f)(void), const char *scheduler, int priority,
const char *name,
long long interval_us) long long interval_us)
{ {
struct looper_thread *lh = g_slice_alloc(sizeof(*lh)); struct looper_thread *lh = g_slice_alloc(sizeof(*lh));


+ 2
- 1
daemon/ice.c View File

@ -758,8 +758,9 @@ void ice_free(void) {
mutex_destroy(&sdp_fragments_lock); mutex_destroy(&sdp_fragments_lock);
} }
void ice_slow_timer(void) {
enum thread_looper_action ice_slow_timer(void) {
fragments_cleanup(false); fragments_cleanup(false);
return TLA_CONTINUE;
} }
static void __fail_pair(struct ice_candidate_pair *pair) { static void __fail_pair(struct ice_candidate_pair *pair) {


+ 5
- 2
daemon/media_socket.c View File

@ -972,7 +972,7 @@ static void release_port_now(socket_t *r, struct intf_spec *spec) {
/** /**
* Sockets releaser. * Sockets releaser.
*/ */
void release_closed_sockets(void) {
enum thread_looper_action release_closed_sockets(void) {
struct late_port_release * lpr; struct late_port_release * lpr;
/* for the separate releaser thread (one working with `sockets_releaser()`) /* for the separate releaser thread (one working with `sockets_releaser()`)
@ -992,6 +992,8 @@ void release_closed_sockets(void) {
g_slice_free1(sizeof(*lpr), lpr); g_slice_free1(sizeof(*lpr), lpr);
} }
} }
return TLA_CONTINUE;
} }
/** /**
* Appends thread scope (local) sockets to the global releasing list. * Appends thread scope (local) sockets to the global releasing list.
@ -3327,7 +3329,7 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_
/** /**
* Ports iterations (stats update from the kernel) functionality. * Ports iterations (stats update from the kernel) functionality.
*/ */
void kernel_stats_updater(void) {
enum thread_looper_action kernel_stats_updater(void) {
struct rtpengine_list_entry *ke; struct rtpengine_list_entry *ke;
struct packet_stream *ps; struct packet_stream *ps;
int j; int j;
@ -3493,4 +3495,5 @@ next:
log_info_pop(); log_info_pop();
} }
return TLA_CONTINUE;
} }

+ 6
- 1
include/aux.h View File

@ -322,12 +322,17 @@ struct thread_waker {
mutex_t *lock; mutex_t *lock;
cond_t *cond; cond_t *cond;
}; };
enum thread_looper_action {
TLA_CONTINUE,
TLA_BREAK,
};
void thread_waker_add(struct thread_waker *); void thread_waker_add(struct thread_waker *);
void thread_waker_del(struct thread_waker *); void thread_waker_del(struct thread_waker *);
void threads_join_all(bool cancel); void threads_join_all(bool cancel);
void thread_create_detach_prio(void (*)(void *), void *, const char *, int, const char *); void thread_create_detach_prio(void (*)(void *), void *, const char *, int, const char *);
void thread_create_looper(void (*f)(void), const char *scheduler, int priority, const char *name, long long);
void thread_create_looper(enum thread_looper_action (*f)(void), const char *scheduler, int priority,
const char *name, long long);
INLINE void thread_create_detach(void (*f)(void *), void *a, const char *name) { INLINE void thread_create_detach(void (*f)(void *), void *a, const char *name) {
thread_create_detach_prio(f, a, NULL, 0, name); thread_create_detach_prio(f, a, NULL, 0, name);
} }


+ 1
- 1
include/ice.h View File

@ -174,7 +174,7 @@ void dequeue_sdp_fragments(struct call_monologue *);
bool trickle_ice_update(struct ng_buffer *ngbuf, struct call *call, struct sdp_ng_flags *flags, bool trickle_ice_update(struct ng_buffer *ngbuf, struct call *call, struct sdp_ng_flags *flags,
GQueue *streams); GQueue *streams);
void ice_slow_timer(void);
enum thread_looper_action ice_slow_timer(void);
#include "call.h" #include "call.h"


+ 2
- 2
include/media_socket.h View File

@ -299,7 +299,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_
struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_intf *lif); struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_intf *lif);
struct stream_fd *stream_fd_lookup(const endpoint_t *); struct stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(struct stream_fd *); void stream_fd_release(struct stream_fd *);
void release_closed_sockets(void);
enum thread_looper_action release_closed_sockets(void);
void append_thread_lpr_to_glob_lpr(void); void append_thread_lpr_to_glob_lpr(void);
void free_intf_list(struct intf_list *il); void free_intf_list(struct intf_list *il);
@ -333,7 +333,7 @@ const struct transport_protocol *transport_protocol(const str *s);
//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered); //void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered);
void play_buffered(struct jb_packet *cp); void play_buffered(struct jb_packet *cp);
void kernel_stats_updater(void);
enum thread_looper_action kernel_stats_updater(void);
INLINE int proto_is_rtp(const struct transport_protocol *protocol) { INLINE int proto_is_rtp(const struct transport_protocol *protocol) {
// known to be RTP? therefore unknown is not RTP // known to be RTP? therefore unknown is not RTP


Loading…
Cancel
Save