Browse Source

use reference counting on SSRC objects

possibly fixes #488 and #482

Change-Id: Ib68e456b9322836b153c9fd0e59c3556378557ae
changes/88/19588/3
Richard Fuchs 8 years ago
parent
commit
14100b0b8a
7 changed files with 101 additions and 27 deletions
  1. +8
    -3
      daemon/call.c
  2. +8
    -7
      daemon/codec.c
  3. +23
    -0
      daemon/media_socket.c
  4. +1
    -0
      daemon/redis.c
  5. +7
    -1
      daemon/rtcp.c
  6. +50
    -13
      daemon/ssrc.c
  7. +4
    -3
      daemon/ssrc.h

+ 8
- 3
daemon/call.c View File

@ -1981,9 +1981,6 @@ static void __call_free(void *p) {
__C_DBG("freeing call struct");
call_buffer_free(&c->buffer);
mutex_destroy(&c->buffer_lock);
rwlock_destroy(&c->master_lock);
obj_put(c->dtls_cert);
while (c->monologues.head) {
@ -2027,9 +2024,17 @@ static void __call_free(void *p) {
crypto_cleanup(&ps->crypto);
g_queue_clear(&ps->sfds);
g_hash_table_destroy(ps->rtp_stats);
if (ps->ssrc_in)
obj_put(&ps->ssrc_in->parent->h);
if (ps->ssrc_out)
obj_put(&ps->ssrc_out->parent->h);
g_slice_free1(sizeof(*ps), ps);
}
call_buffer_free(&c->buffer);
mutex_destroy(&c->buffer_lock);
rwlock_destroy(&c->master_lock);
assert(c->stream_fds.head == NULL);
}


+ 8
- 7
daemon/codec.c View File

@ -57,7 +57,7 @@ static codec_handler_func handler_func_passthrough_ssrc;
static codec_handler_func handler_func_transcode;
static struct ssrc_entry *__ssrc_handler_new(void *p);
static void __ssrc_handler_free(struct codec_ssrc_handler *p);
static void __free_ssrc_handler(void *);
static void __transcode_packet_free(struct transcode_packet *);
@ -127,8 +127,7 @@ reset:
handler->dest_pt = *dest;
handler->func = handler_func_transcode;
handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, (ssrc_free_func_t) __ssrc_handler_free,
handler);
handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, handler);
ilog(LOG_DEBUG, "Created transcode context for " STR_FORMAT " -> " STR_FORMAT "",
STR_FMT(&source->encoding_with_params),
@ -567,7 +566,7 @@ static struct ssrc_entry *__ssrc_handler_new(void *p) {
h->dest_pt.codec_def->rtpname, h->dest_pt.clock_rate,
h->dest_pt.channels);
struct codec_ssrc_handler *ch = g_slice_alloc0(sizeof(*ch));
struct codec_ssrc_handler *ch = obj_alloc0("codec_ssrc_handler", sizeof(*ch), __free_ssrc_handler);
ch->handler = h;
mutex_init(&ch->lock);
packet_sequencer_init(&ch->sequencer, (GDestroyNotify) __transcode_packet_free);
@ -607,7 +606,7 @@ static struct ssrc_entry *__ssrc_handler_new(void *p) {
return &ch->h;
err:
__ssrc_handler_free(ch);
obj_put(&ch->h);
return NULL;
}
static int __encoder_flush(encoder_t *enc, void *u1, void *u2) {
@ -615,7 +614,9 @@ static int __encoder_flush(encoder_t *enc, void *u1, void *u2) {
*going = 1;
return 0;
}
static void __ssrc_handler_free(struct codec_ssrc_handler *ch) {
static void __free_ssrc_handler(void *chp) {
struct codec_ssrc_handler *ch = chp;
ilog(LOG_DEBUG, "__free_ssrc_handler");
packet_sequencer_destroy(&ch->sequencer);
if (ch->decoder)
decoder_close(ch->decoder);
@ -629,7 +630,6 @@ static void __ssrc_handler_free(struct codec_ssrc_handler *ch) {
encoder_free(ch->encoder);
}
g_string_free(ch->sample_buffer, TRUE);
g_slice_free1(sizeof(*ch), ch);
}
static int __packet_encoded(encoder_t *enc, void *u1, void *u2) {
@ -767,6 +767,7 @@ static int handler_func_transcode(struct codec_handler *h, struct call_media *me
}
mutex_unlock(&ch->lock);
obj_put(&ch->h);
return 0;
}


+ 23
- 0
daemon/media_socket.c View File

@ -1162,10 +1162,17 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
mutex_lock(&in_srtp->in_lock);
(*ssrc_in_p) = in_srtp->ssrc_in;
if (*ssrc_in_p)
obj_hold(&(*ssrc_in_p)->parent->h);
if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != in_ssrc)) {
// SSRC mismatch - get the new entry
if (*ssrc_in_p)
obj_put(&(*ssrc_in_p)->parent->h);
if (in_srtp->ssrc_in)
obj_put(&in_srtp->ssrc_in->parent->h);
(*ssrc_in_p) = in_srtp->ssrc_in =
get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT);
obj_hold(&in_srtp->ssrc_in->parent->h);
// might have created a new entry, which would have a new random
// ssrc_map_out. we don't need this if we're not transcoding
@ -1180,10 +1187,17 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
mutex_lock(&out_srtp->out_lock);
(*ssrc_out_p) = out_srtp->ssrc_out;
if (*ssrc_out_p)
obj_hold(&(*ssrc_out_p)->parent->h);
if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != out_ssrc)) {
// SSRC mismatch - get the new entry
if (*ssrc_out_p)
obj_put(&(*ssrc_out_p)->parent->h);
if (out_srtp->ssrc_out)
obj_put(&out_srtp->ssrc_out->parent->h);
(*ssrc_out_p) = out_srtp->ssrc_out =
get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT);
obj_hold(&out_srtp->ssrc_out->parent->h);
// reverse SSRC mapping
(*ssrc_out_p)->ssrc_map_out = in_ssrc;
@ -1699,6 +1713,15 @@ out:
g_queue_clear_full(&phc->mp.packets_out, codec_packet_free);
if (phc->mp.ssrc_in) {
obj_put(&phc->mp.ssrc_in->parent->h);
phc->mp.ssrc_in = NULL;
}
if (phc->mp.ssrc_out) {
obj_put(&phc->mp.ssrc_out->parent->h);
phc->mp.ssrc_out = NULL;
}
return ret;
}


+ 1
- 0
daemon/redis.c View File

@ -1481,6 +1481,7 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) {
se->output_ctx.payload_type = json_reader_get_ll(root_reader, "out_payload_type");
json_reader_end_element(root_reader);
obj_put(&se->h);
}
json_reader_end_member (root_reader);
return 0;


+ 7
- 1
daemon/rtcp.c View File

@ -1259,7 +1259,7 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
// we might not be keeping track of stats for this SSRC (handler_func_passthrough_ssrc).
// just leave the values in place.
if (!packets)
return;
goto out;
unsigned int lost = atomic64_get(&input_ctx->packets_lost);
unsigned int dupes = atomic64_get(&input_ctx->duplicates);
@ -1286,6 +1286,12 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
rr->high_seq_received = htonl(atomic64_get(&input_ctx->last_seq));
// XXX jitter, last SR
out:
if (input_ctx)
obj_put(&input_ctx->parent->h);
if (map_ctx)
obj_put(&map_ctx->parent->h);
}
static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
assert(ctx->scratch.sr.ssrc == ctx->mp->ssrc_in->parent->h.ssrc);


+ 50
- 13
daemon/ssrc.c View File

@ -6,6 +6,9 @@
static void __free_ssrc_entry_call(void *e);
static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) {
c->parent = parent;
c->payload_type = -1;
@ -14,11 +17,12 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) {
}
static void init_ssrc_entry(struct ssrc_entry *ent, u_int32_t ssrc) {
ent->ssrc = ssrc;
ent->last_used = rtpe_now.tv_sec;
mutex_init(&ent->lock);
}
static struct ssrc_entry *create_ssrc_entry_call(void *uptr) {
struct ssrc_entry_call *ent;
ent = g_slice_alloc0(sizeof(*ent));
ent = obj_alloc0("ssrc_entry_call", sizeof(*ent), __free_ssrc_entry_call);
init_ssrc_ctx(&ent->input_ctx, ent);
init_ssrc_ctx(&ent->output_ctx, ent);
return &ent->h;
@ -26,7 +30,9 @@ static struct ssrc_entry *create_ssrc_entry_call(void *uptr) {
static void add_ssrc_entry(u_int32_t ssrc, struct ssrc_entry *ent, struct ssrc_hash *ht) {
init_ssrc_entry(ent, ssrc);
g_hash_table_replace(ht->ht, &ent->ssrc, ent);
obj_hold(ent); // HT entry
g_queue_push_tail(&ht->q, ent);
obj_hold(ent); // queue entry
}
static void free_sender_report(struct ssrc_sender_report_item *i) {
g_slice_free1(sizeof(*i), i);
@ -37,11 +43,16 @@ static void free_rr_time(struct ssrc_rr_time_item *i) {
static void free_stats_block(struct ssrc_stats_block *ssb) {
g_slice_free1(sizeof(*ssb), ssb);
}
static void free_ssrc_entry_call(struct ssrc_entry_call *e) {
static void __free_ssrc_entry_call(void *ep) {
struct ssrc_entry_call *e = ep;
ilog(LOG_DEBUG, "__free_ssrc_entry_call");
g_queue_clear_full(&e->sender_reports, (GDestroyNotify) free_sender_report);
g_queue_clear_full(&e->rr_time_reports, (GDestroyNotify) free_rr_time);
g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block);
g_slice_free1(sizeof(*e), e);
}
static void ssrc_entry_put(void *ep) {
struct ssrc_entry_call *e = ep;
obj_put(&e->h);
}
// returned as mos * 10 (i.e. 10 - 50 for 1.0 to 5.0)
@ -68,12 +79,31 @@ static struct ssrc_entry *find_ssrc(u_int32_t ssrc, struct ssrc_hash *ht) {
struct ssrc_entry *ret = g_atomic_pointer_get(&ht->cache);
if (!ret || ret->ssrc != ssrc) {
ret = g_hash_table_lookup(ht->ht, &ssrc);
if (ret)
if (ret) {
obj_hold(ret);
// cache shares the reference from ht
g_atomic_pointer_set(&ht->cache, ret);
ret->last_used = rtpe_now.tv_sec;
}
}
else {
obj_hold(ret);
ret->last_used = rtpe_now.tv_sec;
}
rwlock_unlock_r(&ht->lock);
return ret;
}
static int ssrc_time_cmp(const void *aa, const void *bb, void *pp) {
const struct ssrc_entry *a = aa, *b = bb;
if (a->last_used < b->last_used)
return -1;
if (a->last_used > b->last_used)
return 1;
return 0;
}
// returns a new reference
void *get_ssrc(u_int32_t ssrc, struct ssrc_hash *ht /* , int *created */) {
struct ssrc_entry *ent;
@ -102,11 +132,13 @@ restart:
rwlock_lock_w(&ht->lock);
while (G_UNLIKELY(ht->q.length > 20)) { // arbitrary limit
g_queue_sort(&ht->q, ssrc_time_cmp, NULL);
struct ssrc_entry *old_ent = g_queue_pop_head(&ht->q);
ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %x) - deleting SSRC %x",
ssrc, old_ent->ssrc);
g_hash_table_remove(ht->ht, &old_ent->ssrc);
g_atomic_pointer_set(&ht->cache, NULL);
g_hash_table_remove(ht->ht, &old_ent->ssrc); // does obj_put
obj_put(old_ent); // for the queue entry
}
if (g_hash_table_lookup(ht->ht, &ssrc)) {
@ -114,7 +146,7 @@ restart:
rwlock_unlock_w(&ht->lock);
// return created entry if slot is still empty
if (!g_atomic_pointer_compare_and_exchange(&ht->precreat, NULL, ent))
ht->destroy_func(ent);
obj_put(ent);
goto restart;
}
add_ssrc_entry(ssrc, ent, ht);
@ -127,7 +159,7 @@ restart:
if (g_atomic_pointer_get(&ht->precreat) == NULL) {
struct ssrc_entry *nextent = ht->create_func(ht->uptr);
if (!g_atomic_pointer_compare_and_exchange(&ht->precreat, NULL, nextent))
ht->destroy_func(nextent);
obj_put(nextent);
}
return ent;
@ -136,27 +168,26 @@ void free_ssrc_hash(struct ssrc_hash **ht) {
if (!*ht)
return;
g_hash_table_destroy((*ht)->ht);
g_queue_clear(&(*ht)->q);
g_queue_clear_full(&(*ht)->q, ssrc_entry_put);
if ((*ht)->precreat)
(*ht)->destroy_func((void *) (*ht)->precreat);
obj_put((struct ssrc_entry *) (*ht)->precreat);
g_slice_free1(sizeof(**ht), *ht);
*ht = NULL;
}
struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t cfunc, ssrc_free_func_t ffunc, void *uptr) {
struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t cfunc, void *uptr) {
struct ssrc_hash *ret;
ret = g_slice_alloc0(sizeof(*ret));
ret->ht = g_hash_table_new_full(uint32_hash, uint32_eq, NULL, (GDestroyNotify) ffunc);
ret->ht = g_hash_table_new_full(uint32_hash, uint32_eq, NULL, ssrc_entry_put);
rwlock_init(&ret->lock);
ret->create_func = cfunc;
ret->destroy_func = ffunc;
ret->uptr = uptr;
ret->precreat = cfunc(uptr); // because object creation might be slow
return ret;
}
struct ssrc_hash *create_ssrc_hash_call(void) {
return create_ssrc_hash_full(create_ssrc_entry_call, (ssrc_free_func_t) free_ssrc_entry_call, NULL);
return create_ssrc_hash_full(create_ssrc_entry_call, NULL);
}
struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) {
@ -228,6 +259,7 @@ static long long __calc_rtt(struct call *c, u_int32_t ssrc, u_int32_t ntp_middle
// not found
mutex_unlock(&e->h.lock);
obj_put(&e->h);
return 0;
found:;
@ -246,6 +278,7 @@ found:;
e->last_rtt = rtt;
obj_put(&e->h);
return rtt;
}
@ -266,6 +299,7 @@ void ssrc_sender_report(struct call_media *m, const struct ssrc_sender_report *s
sr->ntp_msw, sr->ntp_lsw, seri->time_item.ntp_ts);
mutex_unlock(&e->lock);
obj_put(e);
}
void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_report *rr,
const struct timeval *tv)
@ -341,6 +375,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor
out_ul_oe:
mutex_unlock(&other_e->h.lock);
obj_put(&other_e->h);
goto out_nl;
out_nl:
;
@ -361,6 +396,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r
rr->ntp_msw, rr->ntp_lsw, srti->time_item.ntp_ts);
mutex_unlock(&e->lock);
obj_put(e);
}
void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr,
@ -392,4 +428,5 @@ void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics *
if (!e)
return;
e->last_rtt = vm->rnd_trip_delay;
obj_put(&e->h);
}

+ 4
- 3
daemon/ssrc.h View File

@ -6,6 +6,7 @@
#include <glib.h>
#include "compat.h"
#include "aux.h"
#include "obj.h"
@ -21,7 +22,6 @@ enum ssrc_dir;
typedef struct ssrc_entry *(*ssrc_create_func_t)(void *uptr);
typedef void (*ssrc_free_func_t)(struct ssrc_entry *);
struct ssrc_hash {
@ -30,7 +30,6 @@ struct ssrc_hash {
rwlock_t lock;
ssrc_create_func_t create_func;
void *uptr;
ssrc_free_func_t destroy_func;
volatile struct ssrc_entry *cache; // last used entry
volatile struct ssrc_entry *precreat; // next used entry
};
@ -63,8 +62,10 @@ struct ssrc_stats_block {
};
struct ssrc_entry {
struct obj obj;
mutex_t lock;
u_int32_t ssrc;
time_t last_used;
};
struct ssrc_entry_call {
@ -162,7 +163,7 @@ struct ssrc_xr_voip_metrics {
void free_ssrc_hash(struct ssrc_hash **);
struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t, ssrc_free_func_t, void *uptr);
struct ssrc_hash *create_ssrc_hash_full(ssrc_create_func_t, void *uptr);
struct ssrc_hash *create_ssrc_hash_call(void);


Loading…
Cancel
Save