diff --git a/daemon/call.c b/daemon/call.c index 1e66f7334..5825393bc 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -697,6 +697,8 @@ next: kill_calls_timer(hlp.del_scheduled, NULL); kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url); + call_interfaces_timer(); + struct timeval tv_stop; gettimeofday(&tv_stop, NULL); long long duration = timeval_diff(&tv_stop, &tv_start); @@ -2109,6 +2111,10 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, if (flags && flags->fragment) { // trickle ICE SDP fragment. don't do anything other than update // the ICE stuff. + if (!MEDIA_ISSET(other_media, TRICKLE_ICE)) + return ERROR_NO_ICE_AGENT; + if (!other_media->ice_agent) + return ERROR_NO_ICE_AGENT; ice_update(other_media->ice_agent, sp); continue; } diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 904a74e54..2094dd082 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -31,6 +31,17 @@ #include "dtmf.h" +struct fragment_key { + str call_id; + str from_tag; +}; +struct sdp_fragment { + struct ng_buffer *ngbuf; + struct timeval received; + GQueue streams; + struct sdp_ng_flags flags; +}; + static pcre *info_re; static pcre_extra *info_ree; static pcre *streams_re; @@ -39,6 +50,9 @@ static pcre_extra *streams_ree; int trust_address_def; int dtls_passive_def; +static mutex_t sdp_fragments_lock; +static GHashTable *sdp_fragments; + INLINE int call_ng_flags_prefix(struct sdp_ng_flags *out, str *s_ori, const char *prefix, void (*cb)(struct sdp_ng_flags *, str *, void *), void *ptr); @@ -1169,6 +1183,98 @@ static enum load_limit_reasons call_offer_session_limit(void) { return ret; } +static void fragment_free(struct sdp_fragment *frag) { + streams_free(&frag->streams); + call_ng_free_flags(&frag->flags); + obj_put(frag->ngbuf); + g_slice_free1(sizeof(*frag), frag); +} +static void fragment_key_free(void *p) { + struct fragment_key *k = p; + free(k->call_id.s); + free(k->from_tag.s); + g_slice_free1(sizeof(*k), k); +} +static void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct sdp_ng_flags *flags) { + ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, + STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag)); + + struct fragment_key *k = g_slice_alloc0(sizeof(*k)); + str_init_dup_str(&k->call_id, &flags->call_id); + str_init_dup_str(&k->from_tag, &flags->from_tag); + + struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag)); + frag->received = rtpe_now; + frag->ngbuf = obj_get(ngbuf); + frag->streams = *streams; + frag->flags = *flags; + g_queue_init(streams); + ZERO(*flags); + + mutex_lock(&sdp_fragments_lock); + GQueue *frags = g_hash_table_lookup_queue_new(sdp_fragments, k, fragment_key_free); + g_queue_push_tail(frags, frag); + mutex_unlock(&sdp_fragments_lock); +} +#define MAX_FRAG_AGE 3000000 +static void dequeue_sdp_fragments(struct call_monologue *monologue) { + struct fragment_key k; + ZERO(k); + k.call_id = monologue->call->callid; + k.from_tag = monologue->tag; + + mutex_lock(&sdp_fragments_lock); + GQueue *frags = g_hash_table_lookup(sdp_fragments, &k); + if (!frags) { + mutex_unlock(&sdp_fragments_lock); + return; + } + + g_hash_table_remove(sdp_fragments, &k); + // we own the queue now + mutex_unlock(&sdp_fragments_lock); + + struct sdp_fragment *frag; + while ((frag = g_queue_pop_head(frags))) { + if (timeval_diff(&rtpe_now, &frag->received) > MAX_FRAG_AGE) + goto next; + + ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, + STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag)); + + monologue_offer_answer(monologue, &frag->streams, &frag->flags); + +next: + fragment_free(frag); + } + + g_queue_free(frags); +} +static gboolean fragment_check_cleanup(void *k, void *v, void *p) { + int all = GPOINTER_TO_INT(p); + struct fragment_key *key = k; + GQueue *frags = v; + if (!key || !frags) + return TRUE; + while (frags->length) { + struct sdp_fragment *frag = frags->head->data; + if (!all && timeval_diff(&rtpe_now, &frag->received) <= MAX_FRAG_AGE) + break; + g_queue_pop_head(frags); + fragment_free(frag); + } + if (!frags->length) { + g_queue_free(frags); + return TRUE; + } + return FALSE; +} +static void fragments_cleanup(int all) { + mutex_lock(&sdp_fragments_lock); + g_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all)); + mutex_unlock(&sdp_fragments_lock); +} + static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t *input, bencode_item_t *output, enum call_opmode opmode, const char* addr, const endpoint_t *sin) @@ -1227,18 +1333,16 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t /* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */ call = call_get(&flags.call_id); - /* Failover scenario because of timeout on offer response: siprouter tries - * to establish session with another rtpengine2 even though rtpengine1 - * might have persisted part of the session. rtpengine2 deletes previous - * call in memory and recreates an OWN call in redis */ // SDP fragments for trickle ICE must always operate on an existing call - if (opmode == OP_OFFER && !flags.fragment) { - if (!call) { - /* call == NULL, should create call */ - call = call_get_or_create(&flags.call_id, 0); - } + if (!call && opmode == OP_OFFER && flags.fragment) { + queue_sdp_fragment(ngbuf, &streams, &flags); + errstr = NULL; + goto out; } + if (opmode == OP_OFFER && !call) + call = call_get_or_create(&flags.call_id, 0); + errstr = "Unknown call-id"; if (!call) goto out; @@ -1289,12 +1393,21 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t call->drop_traffic = 0; } + int do_dequeue = 1; + ret = monologue_offer_answer(monologue, &streams, &flags); if (!ret) { // SDP fragments for trickle ICE are consumed with no replacement returned if (!flags.fragment) ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags); } + else if (ret == ERROR_NO_ICE_AGENT && flags.fragment) { + queue_sdp_fragment(ngbuf, &streams, &flags); + ret = 0; + do_dequeue = 0; + } + + // streams and flags are invalid after here struct recording *recording = call->recording; if (recording != NULL) { @@ -1305,6 +1418,9 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t recording_response(recording, output); } + if (do_dequeue) + dequeue_sdp_fragments(monologue); + rwlock_unlock_w(&call->master_lock); if (!flags.no_redis_update) { @@ -2252,6 +2368,26 @@ void call_interfaces_free() { pcre_free_study(streams_ree); streams_ree = NULL; } + + fragments_cleanup(1); + g_hash_table_destroy(sdp_fragments); + sdp_fragments = NULL; + mutex_destroy(&sdp_fragments_lock); +} + +void call_interfaces_timer() { + fragments_cleanup(0); +} + +unsigned static int frag_key_hash(const void *A) { + const struct fragment_key *a = A; + return str_hash(&a->call_id) ^ str_hash(&a->from_tag); +} +static int frag_key_eq(const void *A, const void *B) { + const struct fragment_key *a = A; + const struct fragment_key *b = B; + return str_equal(&a->call_id, &b->call_id) + && str_equal(&a->from_tag, &b->from_tag); } int call_interfaces_init() { @@ -2268,5 +2404,8 @@ int call_interfaces_init() { return -1; streams_ree = pcre_study(streams_re, 0, &errptr); + sdp_fragments = g_hash_table_new_full(frag_key_hash, frag_key_eq, fragment_key_free, NULL); + mutex_init(&sdp_fragments_lock); + return 0; } diff --git a/include/call.h b/include/call.h index 41ab8e2b1..fe5e8580c 100644 --- a/include/call.h +++ b/include/call.h @@ -62,6 +62,7 @@ enum call_stream_state { #define ERROR_NO_FREE_PORTS -100 #define ERROR_NO_FREE_LOGS -101 +#define ERROR_NO_ICE_AGENT -102 #define MAX_RTP_PACKET_SIZE 8192 #define RTP_BUFFER_HEAD_ROOM 128 diff --git a/include/call_interfaces.h b/include/call_interfaces.h index b49363c87..fbdb5d85b 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -160,6 +160,7 @@ void ng_call_stats(struct call *call, const str *fromtag, const str *totag, benc int call_interfaces_init(void); void call_interfaces_free(void); +void call_interfaces_timer(void); #endif