From 14100b0b8a7cc0a33d9a9e0c2f286bb2d905de66 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 12 Mar 2018 14:10:07 -0400 Subject: [PATCH] use reference counting on SSRC objects possibly fixes #488 and #482 Change-Id: Ib68e456b9322836b153c9fd0e59c3556378557ae --- daemon/call.c | 11 +++++--- daemon/codec.c | 15 ++++++----- daemon/media_socket.c | 23 ++++++++++++++++ daemon/redis.c | 1 + daemon/rtcp.c | 8 +++++- daemon/ssrc.c | 63 ++++++++++++++++++++++++++++++++++--------- daemon/ssrc.h | 7 ++--- 7 files changed, 101 insertions(+), 27 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 62c867c6a..f1a79a4fe 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1981,9 +1981,6 @@ static void __call_free(void *p) { __C_DBG("freeing call struct"); - call_buffer_free(&c->buffer); - mutex_destroy(&c->buffer_lock); - rwlock_destroy(&c->master_lock); obj_put(c->dtls_cert); while (c->monologues.head) { @@ -2027,9 +2024,17 @@ static void __call_free(void *p) { crypto_cleanup(&ps->crypto); g_queue_clear(&ps->sfds); g_hash_table_destroy(ps->rtp_stats); + if (ps->ssrc_in) + obj_put(&ps->ssrc_in->parent->h); + if (ps->ssrc_out) + obj_put(&ps->ssrc_out->parent->h); g_slice_free1(sizeof(*ps), ps); } + call_buffer_free(&c->buffer); + mutex_destroy(&c->buffer_lock); + rwlock_destroy(&c->master_lock); + assert(c->stream_fds.head == NULL); } diff --git a/daemon/codec.c b/daemon/codec.c index 8ddb27057..a6e5d90e3 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -57,7 +57,7 @@ static codec_handler_func handler_func_passthrough_ssrc; static codec_handler_func handler_func_transcode; static struct ssrc_entry *__ssrc_handler_new(void *p); -static void __ssrc_handler_free(struct codec_ssrc_handler *p); +static void __free_ssrc_handler(void *); static void __transcode_packet_free(struct transcode_packet *); @@ -127,8 +127,7 @@ reset: handler->dest_pt = *dest; handler->func = handler_func_transcode; - handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, (ssrc_free_func_t) __ssrc_handler_free, - handler); + handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, handler); ilog(LOG_DEBUG, "Created transcode context for " STR_FORMAT " -> " STR_FORMAT "", STR_FMT(&source->encoding_with_params), @@ -567,7 +566,7 @@ static struct ssrc_entry *__ssrc_handler_new(void *p) { h->dest_pt.codec_def->rtpname, h->dest_pt.clock_rate, h->dest_pt.channels); - struct codec_ssrc_handler *ch = g_slice_alloc0(sizeof(*ch)); + struct codec_ssrc_handler *ch = obj_alloc0("codec_ssrc_handler", sizeof(*ch), __free_ssrc_handler); ch->handler = h; mutex_init(&ch->lock); packet_sequencer_init(&ch->sequencer, (GDestroyNotify) __transcode_packet_free); @@ -607,7 +606,7 @@ static struct ssrc_entry *__ssrc_handler_new(void *p) { return &ch->h; err: - __ssrc_handler_free(ch); + obj_put(&ch->h); return NULL; } static int __encoder_flush(encoder_t *enc, void *u1, void *u2) { @@ -615,7 +614,9 @@ static int __encoder_flush(encoder_t *enc, void *u1, void *u2) { *going = 1; return 0; } -static void __ssrc_handler_free(struct codec_ssrc_handler *ch) { +static void __free_ssrc_handler(void *chp) { + struct codec_ssrc_handler *ch = chp; + ilog(LOG_DEBUG, "__free_ssrc_handler"); packet_sequencer_destroy(&ch->sequencer); if (ch->decoder) decoder_close(ch->decoder); @@ -629,7 +630,6 @@ static void __ssrc_handler_free(struct codec_ssrc_handler *ch) { encoder_free(ch->encoder); } g_string_free(ch->sample_buffer, TRUE); - g_slice_free1(sizeof(*ch), ch); } static int __packet_encoded(encoder_t *enc, void *u1, void *u2) { @@ -767,6 +767,7 @@ static int handler_func_transcode(struct codec_handler *h, struct call_media *me } mutex_unlock(&ch->lock); + obj_put(&ch->h); return 0; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 92617f39a..2122981ae 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1162,10 +1162,17 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o mutex_lock(&in_srtp->in_lock); (*ssrc_in_p) = in_srtp->ssrc_in; + if (*ssrc_in_p) + obj_hold(&(*ssrc_in_p)->parent->h); if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != in_ssrc)) { // SSRC mismatch - get the new entry + if (*ssrc_in_p) + obj_put(&(*ssrc_in_p)->parent->h); + if (in_srtp->ssrc_in) + obj_put(&in_srtp->ssrc_in->parent->h); (*ssrc_in_p) = in_srtp->ssrc_in = get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT); + obj_hold(&in_srtp->ssrc_in->parent->h); // might have created a new entry, which would have a new random // ssrc_map_out. we don't need this if we're not transcoding @@ -1180,10 +1187,17 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o mutex_lock(&out_srtp->out_lock); (*ssrc_out_p) = out_srtp->ssrc_out; + if (*ssrc_out_p) + obj_hold(&(*ssrc_out_p)->parent->h); if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != out_ssrc)) { // SSRC mismatch - get the new entry + if (*ssrc_out_p) + obj_put(&(*ssrc_out_p)->parent->h); + if (out_srtp->ssrc_out) + obj_put(&out_srtp->ssrc_out->parent->h); (*ssrc_out_p) = out_srtp->ssrc_out = get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT); + obj_hold(&out_srtp->ssrc_out->parent->h); // reverse SSRC mapping (*ssrc_out_p)->ssrc_map_out = in_ssrc; @@ -1699,6 +1713,15 @@ out: g_queue_clear_full(&phc->mp.packets_out, codec_packet_free); + if (phc->mp.ssrc_in) { + obj_put(&phc->mp.ssrc_in->parent->h); + phc->mp.ssrc_in = NULL; + } + if (phc->mp.ssrc_out) { + obj_put(&phc->mp.ssrc_out->parent->h); + phc->mp.ssrc_out = NULL; + } + return ret; } diff --git a/daemon/redis.c b/daemon/redis.c index f16a0de5b..20837c337 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1481,6 +1481,7 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) { se->output_ctx.payload_type = json_reader_get_ll(root_reader, "out_payload_type"); json_reader_end_element(root_reader); + obj_put(&se->h); } json_reader_end_member (root_reader); return 0; diff --git a/daemon/rtcp.c b/daemon/rtcp.c index c2cf496ba..53b049c44 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1259,7 +1259,7 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) // we might not be keeping track of stats for this SSRC (handler_func_passthrough_ssrc). // just leave the values in place. if (!packets) - return; + goto out; unsigned int lost = atomic64_get(&input_ctx->packets_lost); unsigned int dupes = atomic64_get(&input_ctx->duplicates); @@ -1286,6 +1286,12 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) rr->high_seq_received = htonl(atomic64_get(&input_ctx->last_seq)); // XXX jitter, last SR + +out: + if (input_ctx) + obj_put(&input_ctx->parent->h); + if (map_ctx) + obj_put(&map_ctx->parent->h); } static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { assert(ctx->scratch.sr.ssrc == ctx->mp->ssrc_in->parent->h.ssrc); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 693d101e4..c5e21aa7f 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -6,6 +6,9 @@ +static void __free_ssrc_entry_call(void *e); + + static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { c->parent = parent; c->payload_type = -1; @@ -14,11 +17,12 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { } static void init_ssrc_entry(struct ssrc_entry *ent, u_int32_t ssrc) { ent->ssrc = ssrc; + ent->last_used = rtpe_now.tv_sec; mutex_init(&ent->lock); } static struct ssrc_entry *create_ssrc_entry_call(void *uptr) { struct ssrc_entry_call *ent; - ent = g_slice_alloc0(sizeof(*ent)); + ent = obj_alloc0("ssrc_entry_call", sizeof(*ent), __free_ssrc_entry_call); init_ssrc_ctx(&ent->input_ctx, ent); init_ssrc_ctx(&ent->output_ctx, ent); return &ent->h; @@ -26,7 +30,9 @@ static struct ssrc_entry *create_ssrc_entry_call(void *uptr) { static void add_ssrc_entry(u_int32_t ssrc, struct ssrc_entry *ent, struct ssrc_hash *ht) { init_ssrc_entry(ent, ssrc); g_hash_table_replace(ht->ht, &ent->ssrc, ent); + obj_hold(ent); // HT entry g_queue_push_tail(&ht->q, ent); + obj_hold(ent); // queue entry } static void free_sender_report(struct ssrc_sender_report_item *i) { g_slice_free1(sizeof(*i), i); @@ -37,11 +43,16 @@ static void free_rr_time(struct ssrc_rr_time_item *i) { static void free_stats_block(struct ssrc_stats_block *ssb) { g_slice_free1(sizeof(*ssb), ssb); } -static void free_ssrc_entry_call(struct ssrc_entry_call *e) { +static void __free_ssrc_entry_call(void *ep) { + struct ssrc_entry_call *e = ep; + ilog(LOG_DEBUG, "__free_ssrc_entry_call"); g_queue_clear_full(&e->sender_reports, (GDestroyNotify) free_sender_report); g_queue_clear_full(&e->rr_time_reports, (GDestroyNotify) free_rr_time); g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block); - g_slice_free1(sizeof(*e), e); +} +static void ssrc_entry_put(void *ep) { + struct ssrc_entry_call *e = ep; + obj_put(&e->h); } // returned as mos * 10 (i.e. 10 - 50 for 1.0 to 5.0) @@ -68,12 +79,31 @@ static struct ssrc_entry *find_ssrc(u_int32_t ssrc, struct ssrc_hash *ht) { struct ssrc_entry *ret = g_atomic_pointer_get(&ht->cache); if (!ret || ret->ssrc != ssrc) { ret = g_hash_table_lookup(ht->ht, &ssrc); - if (ret) + if (ret) { + obj_hold(ret); + // cache shares the reference from ht g_atomic_pointer_set(&ht->cache, ret); + ret->last_used = rtpe_now.tv_sec; + } + } + else { + obj_hold(ret); + ret->last_used = rtpe_now.tv_sec; } rwlock_unlock_r(&ht->lock); return ret; } + +static int ssrc_time_cmp(const void *aa, const void *bb, void *pp) { + const struct ssrc_entry *a = aa, *b = bb; + if (a->last_used < b->last_used) + return -1; + if (a->last_used > b->last_used) + return 1; + return 0; +} + +// returns a new reference void *get_ssrc(u_int32_t ssrc, struct ssrc_hash *ht /* , int *created */) { struct ssrc_entry *ent; @@ -102,11 +132,13 @@ restart: rwlock_lock_w(&ht->lock); while (G_UNLIKELY(ht->q.length > 20)) { // arbitrary limit + g_queue_sort(&ht->q, ssrc_time_cmp, NULL); struct ssrc_entry *old_ent = g_queue_pop_head(&ht->q); ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %x) - deleting SSRC %x", ssrc, old_ent->ssrc); - g_hash_table_remove(ht->ht, &old_ent->ssrc); g_atomic_pointer_set(&ht->cache, NULL); + g_hash_table_remove(ht->ht, &old_ent->ssrc); // does obj_put + obj_put(old_ent); // for the queue entry } if (g_hash_table_lookup(ht->ht, &ssrc)) { @@ -114,7 +146,7 @@ restart: rwlock_unlock_w(&ht->lock); // return created entry if slot is still empty if (!g_atomic_pointer_compare_and_exchange(&ht->precreat, NULL, ent)) - ht->destroy_func(ent); + obj_put(ent); goto restart; } add_ssrc_entry(ssrc, ent, ht); @@ -127,7 +159,7 @@ restart: if (g_atomic_pointer_get(&ht->precreat) == NULL) { struct ssrc_entry *nextent = ht->create_func(ht->uptr); if (!g_atomic_pointer_compare_and_exchange(&ht->precreat, NULL, nextent)) - ht->destroy_func(nextent); + obj_put(nextent); } return ent; @@ -136,27 +168,26 @@ void free_ssrc_hash(struct ssrc_hash **ht) { if (!*ht) return; g_hash_table_destroy((*ht)->ht); - g_queue_clear(&(*ht)->q); + g_queue_clear_full(&(*ht)->q, ssrc_entry_put); if ((*ht)->precreat) - (*ht)->destroy_func((void *) (*ht)->precreat); + obj_put((struct ssrc_entry *) (*ht)->precreat); g_slice_free1(sizeof(**ht), *ht); *ht = NULL; } -struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t cfunc, ssrc_free_func_t ffunc, void *uptr) { +struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t cfunc, void *uptr) { struct ssrc_hash *ret; ret = g_slice_alloc0(sizeof(*ret)); - ret->ht = g_hash_table_new_full(uint32_hash, uint32_eq, NULL, (GDestroyNotify) ffunc); + ret->ht = g_hash_table_new_full(uint32_hash, uint32_eq, NULL, ssrc_entry_put); rwlock_init(&ret->lock); ret->create_func = cfunc; - ret->destroy_func = ffunc; ret->uptr = uptr; ret->precreat = cfunc(uptr); // because object creation might be slow return ret; } struct ssrc_hash *create_ssrc_hash_call(void) { - return create_ssrc_hash_full(create_ssrc_entry_call, (ssrc_free_func_t) free_ssrc_entry_call, NULL); + return create_ssrc_hash_full(create_ssrc_entry_call, NULL); } struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) { @@ -228,6 +259,7 @@ static long long __calc_rtt(struct call *c, u_int32_t ssrc, u_int32_t ntp_middle // not found mutex_unlock(&e->h.lock); + obj_put(&e->h); return 0; found:; @@ -246,6 +278,7 @@ found:; e->last_rtt = rtt; + obj_put(&e->h); return rtt; } @@ -266,6 +299,7 @@ void ssrc_sender_report(struct call_media *m, const struct ssrc_sender_report *s sr->ntp_msw, sr->ntp_lsw, seri->time_item.ntp_ts); mutex_unlock(&e->lock); + obj_put(e); } void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_report *rr, const struct timeval *tv) @@ -341,6 +375,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor out_ul_oe: mutex_unlock(&other_e->h.lock); + obj_put(&other_e->h); goto out_nl; out_nl: ; @@ -361,6 +396,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r rr->ntp_msw, rr->ntp_lsw, srti->time_item.ntp_ts); mutex_unlock(&e->lock); + obj_put(e); } void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr, @@ -392,4 +428,5 @@ void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics * if (!e) return; e->last_rtt = vm->rnd_trip_delay; + obj_put(&e->h); } diff --git a/daemon/ssrc.h b/daemon/ssrc.h index 27e4f983d..4f5dff7c7 100644 --- a/daemon/ssrc.h +++ b/daemon/ssrc.h @@ -6,6 +6,7 @@ #include #include "compat.h" #include "aux.h" +#include "obj.h" @@ -21,7 +22,6 @@ enum ssrc_dir; typedef struct ssrc_entry *(*ssrc_create_func_t)(void *uptr); -typedef void (*ssrc_free_func_t)(struct ssrc_entry *); struct ssrc_hash { @@ -30,7 +30,6 @@ struct ssrc_hash { rwlock_t lock; ssrc_create_func_t create_func; void *uptr; - ssrc_free_func_t destroy_func; volatile struct ssrc_entry *cache; // last used entry volatile struct ssrc_entry *precreat; // next used entry }; @@ -63,8 +62,10 @@ struct ssrc_stats_block { }; struct ssrc_entry { + struct obj obj; mutex_t lock; u_int32_t ssrc; + time_t last_used; }; struct ssrc_entry_call { @@ -162,7 +163,7 @@ struct ssrc_xr_voip_metrics { void free_ssrc_hash(struct ssrc_hash **); -struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t, ssrc_free_func_t, void *uptr); +struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t, void *uptr); struct ssrc_hash *create_ssrc_hash_call(void);