diff --git a/daemon/call.c b/daemon/call.c index 067f685f5..464fc2f33 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -741,7 +741,7 @@ next: kill_calls_timer(hlp.del_scheduled, NULL); kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url); - call_interfaces_timer(); + ice_slow_timer(); struct timeval tv_stop; gettimeofday(&tv_stop, NULL); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index ce3d4a919..026ce1efa 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -33,17 +33,6 @@ #include "dtmf.h" -struct fragment_key { - str call_id; - str from_tag; -}; -struct sdp_fragment { - struct ng_buffer *ngbuf; - struct timeval received; - GQueue streams; - struct sdp_ng_flags flags; -}; - static pcre *info_re; static pcre_extra *info_ree; static pcre *streams_re; @@ -52,9 +41,6 @@ static pcre_extra *streams_ree; int trust_address_def; int dtls_passive_def; -static mutex_t sdp_fragments_lock; -static GHashTable *sdp_fragments; - INLINE int call_ng_flags_prefix(struct sdp_ng_flags *out, str *s_ori, const char *prefix, void (*cb)(struct sdp_ng_flags *, str *, void *), void *ptr); @@ -1938,98 +1924,6 @@ static enum load_limit_reasons call_offer_session_limit(void) { return ret; } -static void fragment_free(struct sdp_fragment *frag) { - sdp_streams_free(&frag->streams); - call_ng_free_flags(&frag->flags); - obj_put(frag->ngbuf); - g_slice_free1(sizeof(*frag), frag); -} -static void fragment_key_free(void *p) { - struct fragment_key *k = p; - g_free(k->call_id.s); - g_free(k->from_tag.s); - g_slice_free1(sizeof(*k), k); -} -static void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct sdp_ng_flags *flags) { - ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, - STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag)); - - struct fragment_key *k = g_slice_alloc0(sizeof(*k)); - str_init_dup_str(&k->call_id, &flags->call_id); - str_init_dup_str(&k->from_tag, &flags->from_tag); - - struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag)); - frag->received = rtpe_now; - frag->ngbuf = obj_get(ngbuf); - frag->streams = *streams; - frag->flags = *flags; - g_queue_init(streams); - ZERO(*flags); - - mutex_lock(&sdp_fragments_lock); - GQueue *frags = g_hash_table_lookup_queue_new(sdp_fragments, k, fragment_key_free); - g_queue_push_tail(frags, frag); - mutex_unlock(&sdp_fragments_lock); -} -#define MAX_FRAG_AGE 3000000 -static void dequeue_sdp_fragments(struct call_monologue *dialogue[2]) { - struct fragment_key k; - ZERO(k); - k.call_id = dialogue[0]->call->callid; - k.from_tag = dialogue[0]->tag; - - GQueue *frags = NULL; - - { - LOCK(&sdp_fragments_lock); - g_hash_table_steal_extended(sdp_fragments, &k, NULL, (void **) &frags); - if (!frags) - return; - - // we own the queue now - } - - struct sdp_fragment *frag; - while ((frag = g_queue_pop_head(frags))) { - if (timeval_diff(&rtpe_now, &frag->received) > MAX_FRAG_AGE) - goto next; - - ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, - STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag)); - - monologue_offer_answer(dialogue, &frag->streams, &frag->flags); - -next: - fragment_free(frag); - } - - g_queue_free(frags); -} -static gboolean fragment_check_cleanup(void *k, void *v, void *p) { - int all = GPOINTER_TO_INT(p); - struct fragment_key *key = k; - GQueue *frags = v; - if (!key || !frags) - return TRUE; - while (frags->length) { - struct sdp_fragment *frag = frags->head->data; - if (!all && timeval_diff(&rtpe_now, &frag->received) <= MAX_FRAG_AGE) - break; - g_queue_pop_head(frags); - fragment_free(frag); - } - if (!frags->length) { - g_queue_free(frags); - return TRUE; - } - return FALSE; -} -static void fragments_cleanup(int all) { - mutex_lock(&sdp_fragments_lock); - g_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all)); - mutex_unlock(&sdp_fragments_lock); -} - void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams) { str_free_dup(&ml->last_in_sdp); @@ -3659,26 +3553,6 @@ void call_interfaces_free() { pcre_free_study(streams_ree); streams_ree = NULL; } - - fragments_cleanup(1); - g_hash_table_destroy(sdp_fragments); - sdp_fragments = NULL; - mutex_destroy(&sdp_fragments_lock); -} - -void call_interfaces_timer() { - fragments_cleanup(0); -} - -static unsigned int frag_key_hash(const void *A) { - const struct fragment_key *a = A; - return str_hash(&a->call_id) ^ str_hash(&a->from_tag); -} -static int frag_key_eq(const void *A, const void *B) { - const struct fragment_key *a = A; - const struct fragment_key *b = B; - return str_equal(&a->call_id, &b->call_id) - && str_equal(&a->from_tag, &b->from_tag); } int call_interfaces_init() { @@ -3695,8 +3569,5 @@ int call_interfaces_init() { return -1; streams_ree = pcre_study(streams_re, 0, &errptr); - sdp_fragments = g_hash_table_new_full(frag_key_hash, frag_key_eq, fragment_key_free, NULL); - mutex_init(&sdp_fragments_lock); - return 0; } diff --git a/daemon/ice.c b/daemon/ice.c index ee0f8e59d..1a1f6a163 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -11,6 +11,7 @@ #include "poller.h" #include "log_funcs.h" #include "timerthread.h" +#include "call_interfaces.h" @@ -38,6 +39,19 @@ +struct fragment_key { + str call_id; + str from_tag; +}; +struct sdp_fragment { + struct ng_buffer *ngbuf; + struct timeval received; + GQueue streams; + struct sdp_ng_flags flags; +}; + + + static void __ice_agent_free(void *p); static void create_random_ice_string(struct call *call, str *s, int len); static void __do_ice_checks(struct ice_agent *ag); @@ -79,6 +93,114 @@ const char * const ice_type_strings[] = { +static mutex_t sdp_fragments_lock; +static GHashTable *sdp_fragments; + + + +static unsigned int frag_key_hash(const void *A) { + const struct fragment_key *a = A; + return str_hash(&a->call_id) ^ str_hash(&a->from_tag); +} +static int frag_key_eq(const void *A, const void *B) { + const struct fragment_key *a = A; + const struct fragment_key *b = B; + return str_equal(&a->call_id, &b->call_id) + && str_equal(&a->from_tag, &b->from_tag); +} + +static void fragment_free(struct sdp_fragment *frag) { + sdp_streams_free(&frag->streams); + call_ng_free_flags(&frag->flags); + obj_put(frag->ngbuf); + g_slice_free1(sizeof(*frag), frag); +} +static void fragment_key_free(void *p) { + struct fragment_key *k = p; + g_free(k->call_id.s); + g_free(k->from_tag.s); + g_slice_free1(sizeof(*k), k); +} +void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct sdp_ng_flags *flags) { + ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, + STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag)); + + struct fragment_key *k = g_slice_alloc0(sizeof(*k)); + str_init_dup_str(&k->call_id, &flags->call_id); + str_init_dup_str(&k->from_tag, &flags->from_tag); + + struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag)); + frag->received = rtpe_now; + frag->ngbuf = obj_get(ngbuf); + frag->streams = *streams; + frag->flags = *flags; + g_queue_init(streams); + ZERO(*flags); + + mutex_lock(&sdp_fragments_lock); + GQueue *frags = g_hash_table_lookup_queue_new(sdp_fragments, k, fragment_key_free); + g_queue_push_tail(frags, frag); + mutex_unlock(&sdp_fragments_lock); +} +#define MAX_FRAG_AGE 3000000 +void dequeue_sdp_fragments(struct call_monologue *dialogue[2]) { + struct fragment_key k; + ZERO(k); + k.call_id = dialogue[0]->call->callid; + k.from_tag = dialogue[0]->tag; + + GQueue *frags = NULL; + + { + LOCK(&sdp_fragments_lock); + g_hash_table_steal_extended(sdp_fragments, &k, NULL, (void **) &frags); + if (!frags) + return; + + // we own the queue now + } + + struct sdp_fragment *frag; + while ((frag = g_queue_pop_head(frags))) { + if (timeval_diff(&rtpe_now, &frag->received) > MAX_FRAG_AGE) + goto next; + + ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, + STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag)); + + monologue_offer_answer(dialogue, &frag->streams, &frag->flags); + +next: + fragment_free(frag); + } + + g_queue_free(frags); +} +static gboolean fragment_check_cleanup(void *k, void *v, void *p) { + bool all = GPOINTER_TO_INT(p); + struct fragment_key *key = k; + GQueue *frags = v; + if (!key || !frags) + return TRUE; + while (frags->length) { + struct sdp_fragment *frag = frags->head->data; + if (!all && timeval_diff(&rtpe_now, &frag->received) <= MAX_FRAG_AGE) + break; + g_queue_pop_head(frags); + fragment_free(frag); + } + if (!frags->length) { + g_queue_free(frags); + return TRUE; + } + return FALSE; +} +static void fragments_cleanup(bool all) { + mutex_lock(&sdp_fragments_lock); + g_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all)); + mutex_unlock(&sdp_fragments_lock); +} + enum ice_candidate_type ice_candidate_type(const str *s) { @@ -566,10 +688,22 @@ static void __agent_deschedule(struct ice_agent *ag) { void ice_init(void) { random_string((void *) &tie_breaker, sizeof(tie_breaker)); timerthread_init(&ice_agents_timer_thread, ice_agents_timer_run); + + sdp_fragments = g_hash_table_new_full(frag_key_hash, frag_key_eq, fragment_key_free, NULL); + mutex_init(&sdp_fragments_lock); } void ice_free(void) { timerthread_free(&ice_agents_timer_thread); + + fragments_cleanup(true); + g_hash_table_destroy(sdp_fragments); + sdp_fragments = NULL; + mutex_destroy(&sdp_fragments_lock); +} + +void ice_slow_timer(void) { + fragments_cleanup(false); } diff --git a/include/ice.h b/include/ice.h index 90d758a00..af10e51b8 100644 --- a/include/ice.h +++ b/include/ice.h @@ -66,6 +66,9 @@ struct call_media; struct call; struct stream_params; struct stun_attrs; +struct ng_buffer; +struct call_monologue; +struct sdp_ng_flags; @@ -167,6 +170,10 @@ int ice_request(struct stream_fd *, const endpoint_t *, struct stun_attrs *); int ice_response(struct stream_fd *, const endpoint_t *src, struct stun_attrs *attrs, void *transaction); +void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct sdp_ng_flags *flags); +void dequeue_sdp_fragments(struct call_monologue *dialogue[2]); +void ice_slow_timer(void); + #include "call.h" diff --git a/t/test-stats.c b/t/test-stats.c index 284a03ba4..73d903524 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -6,6 +6,7 @@ #include "control_ng.h" #include "call_interfaces.h" #include "ssllib.h" +#include "ice.h" int _log_facility_rtcp; int _log_facility_cdr; @@ -67,6 +68,7 @@ int main(void) { call_init(); statistics_init(); call_interfaces_init(); + ice_init(); control_ng_init(); dtls_init(); @@ -7463,6 +7465,7 @@ int main(void) { call_interfaces_free(); control_ng_cleanup(); dtls_cert_free(); + ice_free(); return 0; }