diff --git a/daemon/call.c b/daemon/call.c index d1cf256ff..57bdc5386 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2012,9 +2012,6 @@ static void __call_free(void *p) { __C_DBG("freeing call struct"); - call_buffer_free(&c->buffer); - mutex_destroy(&c->buffer_lock); - rwlock_destroy(&c->master_lock); obj_put(c->dtls_cert); while (c->monologues.head) { @@ -2052,9 +2049,17 @@ static void __call_free(void *p) { crypto_cleanup(&ps->crypto); g_queue_clear(&ps->sfds); g_hash_table_destroy(ps->rtp_stats); + if (ps->ssrc_in) + obj_put(&ps->ssrc_in->parent->h); + if (ps->ssrc_out) + obj_put(&ps->ssrc_out->parent->h); g_slice_free1(sizeof(*ps), ps); } + call_buffer_free(&c->buffer); + mutex_destroy(&c->buffer_lock); + rwlock_destroy(&c->master_lock); + assert(c->stream_fds.head == NULL); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 949e69427..ba1059d11 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1114,10 +1114,17 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o mutex_lock(&in_srtp->in_lock); (*ssrc_in_p) = in_srtp->ssrc_in; + if (*ssrc_in_p) + obj_hold(&(*ssrc_in_p)->parent->h); if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != ssrc)) { // SSRC mismatch - get the new entry + if (*ssrc_in_p) + obj_put(&(*ssrc_in_p)->parent->h); + if (in_srtp->ssrc_in) + obj_put(&in_srtp->ssrc_in->parent->h); (*ssrc_in_p) = in_srtp->ssrc_in = get_ssrc_ctx(ssrc, ssrc_hash, SSRC_DIR_INPUT); + obj_hold(&in_srtp->ssrc_in->parent->h); } mutex_unlock(&in_srtp->in_lock); @@ -1126,10 +1133,17 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o mutex_lock(&out_srtp->out_lock); (*ssrc_out_p) = out_srtp->ssrc_out; + if (*ssrc_out_p) + obj_hold(&(*ssrc_out_p)->parent->h); if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != ssrc)) { // SSRC mismatch - get the new entry + if (*ssrc_out_p) + obj_put(&(*ssrc_out_p)->parent->h); + if (out_srtp->ssrc_out) + obj_put(&out_srtp->ssrc_out->parent->h); (*ssrc_out_p) = out_srtp->ssrc_out = get_ssrc_ctx(ssrc, ssrc_hash, SSRC_DIR_OUTPUT); + obj_hold(&out_srtp->ssrc_out->parent->h); } mutex_unlock(&out_srtp->out_lock); @@ -1513,6 +1527,15 @@ done: unlock_out: rwlock_unlock_r(&call->master_lock); + if (ssrc_in) { + obj_put(&ssrc_in->parent->h); + ssrc_in = NULL; + } + if (ssrc_out) { + obj_put(&ssrc_out->parent->h); + ssrc_out = NULL; + } + return ret; } diff --git a/daemon/redis.c b/daemon/redis.c index f319a0dd3..b30c68923 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1430,6 +1430,7 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) { se->payload_type = json_reader_get_ll(root_reader, "payload_type"); json_reader_end_element(root_reader); + obj_put(&se->h); } json_reader_end_member (root_reader); return 0; diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 8d5aa29bd..41df2088a 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -6,6 +6,9 @@ +static void __free_ssrc_entry_call(void *e); + + static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { c->parent = parent; } @@ -15,8 +18,7 @@ static void init_ssrc_entry(struct ssrc_entry *ent, u_int32_t ssrc) { } static struct ssrc_entry *create_ssrc_entry_call(void *uptr) { struct ssrc_entry_call *ent; - ent = g_slice_alloc0(sizeof(*ent)); - ent->payload_type = -1; + ent = obj_alloc0("ssrc_entry_call", sizeof(*ent), __free_ssrc_entry_call); init_ssrc_ctx(&ent->input_ctx, ent); init_ssrc_ctx(&ent->output_ctx, ent); return &ent->h; @@ -24,7 +26,9 @@ static struct ssrc_entry *create_ssrc_entry_call(void *uptr) { static void add_ssrc_entry(u_int32_t ssrc, struct ssrc_entry *ent, struct ssrc_hash *ht) { init_ssrc_entry(ent, ssrc); g_hash_table_replace(ht->ht, &ent->ssrc, ent); + obj_hold(ent); // HT entry g_queue_push_tail(&ht->q, ent); + obj_hold(ent); // queue entry } static void free_sender_report(struct ssrc_sender_report_item *i) { g_slice_free1(sizeof(*i), i); @@ -35,11 +39,16 @@ static void free_rr_time(struct ssrc_rr_time_item *i) { static void free_stats_block(struct ssrc_stats_block *ssb) { g_slice_free1(sizeof(*ssb), ssb); } -static void free_ssrc_entry_call(struct ssrc_entry_call *e) { +static void __free_ssrc_entry_call(void *ep) { + struct ssrc_entry_call *e = ep; + ilog(LOG_DEBUG, "__free_ssrc_entry_call"); g_queue_clear_full(&e->sender_reports, (GDestroyNotify) free_sender_report); g_queue_clear_full(&e->rr_time_reports, (GDestroyNotify) free_rr_time); g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block); - g_slice_free1(sizeof(*e), e); +} +static void ssrc_entry_put(void *ep) { + struct ssrc_entry_call *e = ep; + obj_put(&e->h); } // returned as mos * 10 (i.e. 10 - 50 for 1.0 to 5.0) @@ -66,12 +75,18 @@ static struct ssrc_entry *find_ssrc(u_int32_t ssrc, struct ssrc_hash *ht) { struct ssrc_entry *ret = g_atomic_pointer_get(&ht->cache); if (!ret || ret->ssrc != ssrc) { ret = g_hash_table_lookup(ht->ht, &ssrc); - if (ret) + if (ret) { + obj_hold(ret); + // cache shares the reference from ht g_atomic_pointer_set(&ht->cache, ret); + } } + else + obj_hold(ret); rwlock_unlock_r(&ht->lock); return ret; } +// returns a new reference void *get_ssrc(u_int32_t ssrc, struct ssrc_hash *ht /* , int *created */) { struct ssrc_entry *ent; @@ -103,8 +118,9 @@ restart: struct ssrc_entry *old_ent = g_queue_pop_head(&ht->q); ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %u) - deleting SSRC %u", ssrc, old_ent->ssrc); - g_hash_table_remove(ht->ht, &old_ent->ssrc); g_atomic_pointer_set(&ht->cache, NULL); + g_hash_table_remove(ht->ht, &old_ent->ssrc); // does obj_put + obj_put(old_ent); // for the queue entry } if (g_hash_table_lookup(ht->ht, &ssrc)) { @@ -112,7 +128,7 @@ restart: rwlock_unlock_w(&ht->lock); // return created entry if slot is still empty if (!g_atomic_pointer_compare_and_exchange(&ht->precreat, NULL, ent)) - ht->destroy_func(ent); + obj_put(ent); goto restart; } add_ssrc_entry(ssrc, ent, ht); @@ -125,7 +141,7 @@ restart: if (g_atomic_pointer_get(&ht->precreat) == NULL) { struct ssrc_entry *nextent = ht->create_func(ht->uptr); if (!g_atomic_pointer_compare_and_exchange(&ht->precreat, NULL, nextent)) - ht->destroy_func(nextent); + obj_put(nextent); } return ent; @@ -134,27 +150,26 @@ void free_ssrc_hash(struct ssrc_hash **ht) { if (!*ht) return; g_hash_table_destroy((*ht)->ht); - g_queue_clear(&(*ht)->q); + g_queue_clear_full(&(*ht)->q, ssrc_entry_put); if ((*ht)->precreat) - (*ht)->destroy_func((void *) (*ht)->precreat); + obj_put((struct ssrc_entry *) (*ht)->precreat); g_slice_free1(sizeof(**ht), *ht); *ht = NULL; } -struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t cfunc, ssrc_free_func_t ffunc, void *uptr) { +struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t cfunc, void *uptr) { struct ssrc_hash *ret; ret = g_slice_alloc0(sizeof(*ret)); - ret->ht = g_hash_table_new_full(uint32_hash, uint32_eq, NULL, (GDestroyNotify) ffunc); + ret->ht = g_hash_table_new_full(uint32_hash, uint32_eq, NULL, ssrc_entry_put); rwlock_init(&ret->lock); ret->create_func = cfunc; - ret->destroy_func = ffunc; ret->uptr = uptr; ret->precreat = cfunc(uptr); // because object creation might be slow return ret; } struct ssrc_hash *create_ssrc_hash_call(void) { - return create_ssrc_hash_full(create_ssrc_entry_call, (ssrc_free_func_t) free_ssrc_entry_call, NULL); + return create_ssrc_hash_full(create_ssrc_entry_call, NULL); } struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) { @@ -226,6 +241,7 @@ static long long __calc_rtt(struct call *c, u_int32_t ssrc, u_int32_t ntp_middle // not found mutex_unlock(&e->h.lock); + obj_put(&e->h); return 0; found:; @@ -244,6 +260,7 @@ found:; e->last_rtt = rtt; + obj_put(&e->h); return rtt; } @@ -264,6 +281,7 @@ void ssrc_sender_report(struct call_media *m, const struct ssrc_sender_report *s sr->ntp_msw, sr->ntp_lsw, seri->time_item.ntp_ts); mutex_unlock(&e->lock); + obj_put(e); } void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_report *rr, const struct timeval *tv) @@ -342,6 +360,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor out_ul_oe: mutex_unlock(&other_e->h.lock); + obj_put(&other_e->h); goto out_nl; out_nl: ; @@ -362,6 +381,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r rr->ntp_msw, rr->ntp_lsw, srti->time_item.ntp_ts); mutex_unlock(&e->lock); + obj_put(e); } void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr, @@ -393,4 +413,5 @@ void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics * if (!e) return; e->last_rtt = vm->rnd_trip_delay; + obj_put(&e->h); } diff --git a/daemon/ssrc.h b/daemon/ssrc.h index 26111c9c6..4cea6b937 100644 --- a/daemon/ssrc.h +++ b/daemon/ssrc.h @@ -6,6 +6,7 @@ #include #include "compat.h" #include "aux.h" +#include "obj.h" @@ -21,7 +22,6 @@ enum ssrc_dir; typedef struct ssrc_entry *(*ssrc_create_func_t)(void *uptr); -typedef void (*ssrc_free_func_t)(struct ssrc_entry *); struct ssrc_hash { @@ -30,7 +30,6 @@ struct ssrc_hash { rwlock_t lock; ssrc_create_func_t create_func; void *uptr; - ssrc_free_func_t destroy_func; volatile struct ssrc_entry *cache; // last used entry volatile struct ssrc_entry *precreat; // next used entry }; @@ -51,6 +50,7 @@ struct ssrc_stats_block { }; struct ssrc_entry { + struct obj obj; mutex_t lock; u_int32_t ssrc; }; @@ -151,7 +151,7 @@ struct ssrc_xr_voip_metrics { void free_ssrc_hash(struct ssrc_hash **); -struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t, ssrc_free_func_t, void *uptr); +struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t, void *uptr); struct ssrc_hash *create_ssrc_hash_call(void);