diff --git a/daemon/call.c b/daemon/call.c index 1ce0b5a56..e369f51cb 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -213,7 +213,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { CALL_CLEAR(sfd->call, FOREIGN_MEDIA); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) { - struct ssrc_ctx *ctx = ps->ssrc_in[u]; + struct ssrc_entry_call *ctx = ps->ssrc_in[u]; if (!ctx) break; @@ -222,7 +222,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { atomic_get_na(&ctx->stats->last_pt)); } for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) { - struct ssrc_ctx *ctx = ps->ssrc_out[u]; + struct ssrc_entry_call *ctx = ps->ssrc_out[u]; if (!ctx) break; @@ -4008,7 +4008,7 @@ static void __call_cleanup(call_t *c) { media_player_put(&ml->rec_player); if (ml->tone_freqs) g_array_free(ml->tone_freqs, true); - obj_release(ml->janus_session); + obj_release_o(ml->janus_session); } while (c->stream_fds.head) { @@ -4171,7 +4171,7 @@ void call_destroy(call_t *c) { (unsigned int) local_endpoint->port, FMT_M(addr, ps->endpoint.port), (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0), + FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0), atomic64_get_na(&ps->stats_in->packets), atomic64_get_na(&ps->stats_in->bytes), atomic64_get_na(&ps->stats_in->errors), @@ -4361,9 +4361,9 @@ static void __call_free(call_t *c) { t_queue_clear(&ps->sfds); t_hash_table_destroy(ps->rtp_stats); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) - ssrc_ctx_put(&ps->ssrc_in[u]); + ssrc_entry_release(ps->ssrc_in[u]); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) - ssrc_ctx_put(&ps->ssrc_out[u]); + ssrc_entry_release(ps->ssrc_out[u]); bufferpool_unref(ps->stats_in); bufferpool_unref(ps->stats_out); g_free(ps); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index ac08b6b84..9f7bf9820 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -2749,19 +2749,19 @@ static void ng_stats_endpoint(const ng_parser_t *parser, parser_arg dict, const } static void ng_stats_stream_ssrc(const ng_parser_t *parser, parser_arg dict, - struct ssrc_ctx *const ssrcs[RTPE_NUM_SSRC_TRACKING], + struct ssrc_entry_call *const ssrcs[RTPE_NUM_SSRC_TRACKING], const char *label) { parser_arg list = parser->dict_add_list(dict, label); for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { - struct ssrc_ctx *c = ssrcs[i]; + struct ssrc_entry_call *c = ssrcs[i]; if (!c) break; parser_arg ssrc = parser->list_add_dict(list); - parser->dict_add_int(ssrc, "SSRC", ssrcs[i]->parent->h.ssrc); + parser->dict_add_int(ssrc, "SSRC", ssrcs[i]->h.ssrc); parser->dict_add_int(ssrc, "bytes", atomic64_get_na(&c->stats->bytes)); parser->dict_add_int(ssrc, "packets", atomic64_get_na(&c->stats->packets)); parser->dict_add_int(ssrc, "last RTP timestamp", atomic_get_na(&c->stats->timestamp)); diff --git a/daemon/cli.c b/daemon/cli.c index f1bad6794..c4b69b021 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -808,7 +808,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml) sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0, + ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0, atomic64_get_na(&ps->stats_in->packets), atomic64_get_na(&ps->stats_in->bytes), atomic64_get_na(&ps->stats_in->errors), diff --git a/daemon/codec.c b/daemon/codec.c index ebdd14a14..e5e7a55ab 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -58,7 +58,7 @@ static void codec_store_add_raw_order(struct codec_store *cs, rtp_payload_type * static rtp_payload_type *codec_store_find_compatible(struct codec_store *cs, const rtp_payload_type *pt); static void __rtp_payload_type_add_name(codec_names_ht, rtp_payload_type *pt); -static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq); +static void codec_calc_lost(struct ssrc_entry_call *ssrc, uint16_t seq); static void __codec_options_set(call_t *call, rtp_payload_type *pt, str_case_value_ht codec_set); @@ -422,7 +422,7 @@ static void __handler_shutdown(struct codec_handler *handler) { delay_buffer_stop(&handler->delay_buffer); } - obj_release(handler->ssrc_handler); + ssrc_entry_release(handler->ssrc_handler); handler->kernelize = false; handler->transcoder = false; handler->output_handler = handler; // reset to default @@ -1315,7 +1315,7 @@ static void __rtcp_timer_run(struct codec_timer *ct) { rwlock_lock_r(&rt->call->master_lock); // copy out references to SSRCs for lock-free handling - struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,}; + struct ssrc_entry_call *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,}; if (media->streams.head) { struct packet_stream *ps = media->streams.head->data; mutex_lock(&ps->out_lock); @@ -1323,7 +1323,7 @@ static void __rtcp_timer_run(struct codec_timer *ct) { if (!ps->ssrc_out[u]) // end of list break; ssrc_out[u] = ps->ssrc_out[u]; - ssrc_ctx_hold(ssrc_out[u]); + ssrc_entry_hold(ssrc_out[u]); } mutex_unlock(&ps->out_lock); } @@ -1340,7 +1340,7 @@ static void __rtcp_timer_run(struct codec_timer *ct) { for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { if (!ssrc_out[u]) // end of list break; - ssrc_ctx_put(&ssrc_out[u]); + ssrc_entry_release(ssrc_out[u]); } out: @@ -2011,7 +2011,7 @@ void mqtt_timer_start(struct mqtt_timer **mqtp, call_t *call, struct call_media static void codec_timer_stop(struct codec_timer **ctp) { if (!ctp) return; - obj_release(*ctp); + obj_release_o(*ctp); } // master lock held in W void rtcp_timer_stop(struct rtcp_timer **rtp) { @@ -2064,7 +2064,7 @@ static void codec_add_raw_packet_common(struct media_packet *mp, unsigned int cl { p->clockrate = clockrate; if (mp->rtp && mp->ssrc_out) { - ssrc_ctx_hold(mp->ssrc_out); + ssrc_entry_hold(mp->ssrc_out); p->ssrc_out = mp->ssrc_out; if (!p->rtp) p->rtp = mp->rtp; @@ -2139,36 +2139,19 @@ static int handler_func_passthrough(struct codec_handler *h, struct media_packet #ifdef WITH_TRANSCODING static void __ssrc_lock_both(struct media_packet *mp) { - struct ssrc_ctx *ssrc_in = mp->ssrc_in; - struct ssrc_entry_call *ssrc_in_p = ssrc_in->parent; - struct ssrc_ctx *ssrc_out = mp->ssrc_out; - struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent; + struct ssrc_entry_call *ssrc_in = mp->ssrc_in; + struct ssrc_entry_call *ssrc_out = mp->ssrc_out; - // we need a nested lock here - both input and output SSRC needs to be locked. - // we don't know the lock order, so try both, and keep trying until we succeed. - while (1) { - mutex_lock(&ssrc_in_p->h.lock); - if (ssrc_in_p == ssrc_out_p) - break; - if (!mutex_trylock(&ssrc_out_p->h.lock)) - break; - mutex_unlock(&ssrc_in_p->h.lock); - - mutex_lock(&ssrc_out_p->h.lock); - if (!mutex_trylock(&ssrc_in_p->h.lock)) - break; - mutex_unlock(&ssrc_out_p->h.lock); - } + // nested lock: in first, out second + mutex_lock(&ssrc_in->h.lock); + mutex_lock(&ssrc_out->h.lock); } static void __ssrc_unlock_both(struct media_packet *mp) { - struct ssrc_ctx *ssrc_in = mp->ssrc_in; - struct ssrc_entry_call *ssrc_in_p = ssrc_in->parent; - struct ssrc_ctx *ssrc_out = mp->ssrc_out; - struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent; + struct ssrc_entry_call *ssrc_in = mp->ssrc_in; + struct ssrc_entry_call *ssrc_out = mp->ssrc_out; - mutex_unlock(&ssrc_in_p->h.lock); - if (ssrc_in_p != ssrc_out_p) - mutex_unlock(&ssrc_out_p->h.lock); + mutex_unlock(&ssrc_in->h.lock); + mutex_unlock(&ssrc_out->h.lock); } static void __seq_free(void *p) { @@ -2181,12 +2164,10 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa { struct codec_handler *h = packet->handler; - struct ssrc_ctx *ssrc_in = mp->ssrc_in; - struct ssrc_entry_call *ssrc_in_p = ssrc_in->parent; - struct ssrc_ctx *ssrc_out = mp->ssrc_out; - struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent; + struct ssrc_entry_call *ssrc_in = mp->ssrc_in; + struct ssrc_entry_call *ssrc_out = mp->ssrc_out; - struct codec_ssrc_handler *ch = get_ssrc(ssrc_in_p->h.ssrc, &h->ssrc_hash); + struct codec_ssrc_handler *ch = get_ssrc(ssrc_in->h.ssrc, &h->ssrc_hash); if (G_UNLIKELY(!ch)) { __transcode_packet_free(packet); return 0; @@ -2206,7 +2187,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa atomic64_inc_na(&mp->sfd->local_intf->stats->in.packets); atomic64_add_na(&mp->sfd->local_intf->stats->in.bytes, mp->payload.len); - struct codec_ssrc_handler *input_ch = get_ssrc(ssrc_in_p->h.ssrc, &h->input_handler->ssrc_hash); + struct codec_ssrc_handler *input_ch = get_ssrc(ssrc_in->h.ssrc, &h->input_handler->ssrc_hash); if (packet->bypass_seq) { // bypass sequencer @@ -2225,13 +2206,13 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa __ssrc_lock_both(mp); // get sequencer appropriate for our output - if (!ssrc_in_p->sequencers) - ssrc_in_p->sequencers = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __seq_free); - packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in_p->sequencers, mp->media_out); + if (!ssrc_in->sequencers) + ssrc_in->sequencers = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __seq_free); + packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->sequencers, mp->media_out); if (!seq) { seq = g_new0(__typeof(*seq), 1); packet_sequencer_init(seq, (GDestroyNotify) __transcode_packet_free); - g_hash_table_insert(ssrc_in_p->sequencers, mp->media_out, seq); + g_hash_table_insert(ssrc_in->sequencers, mp->media_out, seq); // this is a quick fix to restore sequencer values until upper layer behavior will be fixed unsigned int stats_ext_seq = atomic_get_na(&ssrc_in->stats->ext_seq); if(stats_ext_seq) { @@ -2253,7 +2234,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa ilogs(transcoding, LOG_DEBUG, "Ignoring duplicate RTP packet"); if (func_ret != 1) __transcode_packet_free(packet); - ssrc_in_p->duplicates++; + ssrc_in->duplicates++; atomic64_inc_na(&mp->sfd->local_intf->stats->s.duplicates); RTPE_STATS_INC(rtp_duplicates); goto out; @@ -2317,18 +2298,18 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa // new packet might have different handlers h = packet->handler; - obj_release(ch); - obj_release(input_ch); - ch = get_ssrc(ssrc_in_p->h.ssrc, &h->ssrc_hash); + ssrc_entry_release(ch); + ssrc_entry_release(input_ch); + ch = get_ssrc(ssrc_in->h.ssrc, &h->ssrc_hash); if (G_UNLIKELY(!ch)) goto next; - input_ch = get_ssrc(ssrc_in_p->h.ssrc, &h->input_handler->ssrc_hash); + input_ch = get_ssrc(ssrc_in->h.ssrc, &h->input_handler->ssrc_hash); if (G_UNLIKELY(!input_ch)) { - obj_release(ch); + ssrc_entry_release(ch); goto next; } - ssrc_in_p->packets_lost = seq->lost_count; + ssrc_in->packets_lost = seq->lost_count; atomic_set_na(&ssrc_in->stats->ext_seq, seq->ext_seq); ilogs(transcoding, LOG_DEBUG, "Processing RTP packet: seq %u, TS %lu", @@ -2336,7 +2317,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa if (seq_ret == 1) { // seq reset - update output seq. we keep our output seq clean - ssrc_out_p->seq_diff -= packet->p.seq - seq_ori; + ssrc_out->seq_diff -= packet->p.seq - seq_ori; seq_ret = 0; } @@ -2353,9 +2334,9 @@ next: out: __ssrc_unlock_both(mp); - obj_release(input_ch); + ssrc_entry_release(input_ch); out_ch: - obj_release(ch); + ssrc_entry_release(ch); mp->rtp = orig_rtp; @@ -2371,8 +2352,7 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch, unsigned long ts_delay) { struct rtp_header *rh = (void *) buf; - struct ssrc_ctx *ssrc_out = mp->ssrc_out; - struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent; + struct ssrc_entry_call *ssrc_out = mp->ssrc_out; // reconstruct RTP header unsigned long ts = payload_ts; ZERO(*rh); @@ -2383,9 +2363,9 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch, if (seq != -1) rh->seq_num = htons(seq); else - rh->seq_num = htons(ntohs(mp->rtp->seq_num) + (ssrc_out_p->seq_diff += seq_inc)); + rh->seq_num = htons(ntohs(mp->rtp->seq_num) + (ssrc_out->seq_diff += seq_inc)); rh->timestamp = htonl(ts); - rh->ssrc = htonl(ssrc_out_p->h.ssrc); + rh->ssrc = htonl(ssrc_out->h.ssrc); // add to output queue struct codec_packet *p = g_new0(__typeof(*p), 1); @@ -2398,7 +2378,7 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch, p->rtp = rh; p->ts = ts; p->clockrate = handler->dest_pt.clock_rate; - ssrc_ctx_hold(ssrc_out); + ssrc_entry_hold(ssrc_out); p->ssrc_out = ssrc_out; int64_t ts_diff_us = 0; @@ -2480,7 +2460,7 @@ static struct codec_ssrc_handler *__output_ssrc_handler(struct codec_ssrc_handle // our encoder is in a different codec handler ilogs(transcoding, LOG_DEBUG, "Switching context from decoder to encoder"); handler = handler->output_handler; - struct codec_ssrc_handler *new_ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, &handler->ssrc_hash); + struct codec_ssrc_handler *new_ch = get_ssrc(mp->ssrc_in->h.ssrc, &handler->ssrc_hash); if (G_UNLIKELY(!new_ch)) { ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Switched from input to output codec context, but no codec handler present"); @@ -2559,7 +2539,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr payload_type = h->real_dtmf_payload_type; skip: - obj_release(output_ch); + ssrc_entry_release(output_ch); char *buf = bufferpool_alloc(media_bufferpool, packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM); memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len); @@ -2569,7 +2549,7 @@ skip: else // use our own sequencing input_ch->codec_output_rtp_seq(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts, packet->marker, payload_type, ts_delay); - mp->ssrc_out->parent->seq_diff++; + mp->ssrc_out->seq_diff++; return 0; } @@ -2694,12 +2674,12 @@ static tc_code packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_hand ret = TCC_CONSUMED; else ret = packet_dtmf_fwd(ch, input_ch, dup, mp); - mp->ssrc_out->parent->seq_diff++; + mp->ssrc_out->seq_diff++; if (ret != TCC_CONSUMED) __transcode_packet_free(dup); } - mp->ssrc_out->parent->seq_diff--; + mp->ssrc_out->seq_diff--; // discard the received event do_blocking = true; @@ -2803,7 +2783,7 @@ void codec_packet_free(struct codec_packet *p) { p->free_func(p->s.s); if (p->plain_free_func && p->plain.s) p->plain_free_func(p->plain.s); - ssrc_ctx_put(&p->ssrc_out); + ssrc_entry_release(p->ssrc_out); g_free(p); } bool codec_packet_copy(struct codec_packet *p) { @@ -2819,7 +2799,7 @@ struct codec_packet *codec_packet_dup(struct codec_packet *p) { dup->link.data = dup; // XXX obsolete this codec_packet_copy(dup); if (dup->ssrc_out) - ssrc_ctx_hold(dup->ssrc_out); + ssrc_entry_hold(dup->ssrc_out); if (dup->rtp) dup->rtp = (void *) dup->s.s; return dup; @@ -2977,7 +2957,7 @@ static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_p // check for DTMF injection if (h->dtmf_payload_type != -1) { - struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, &h->ssrc_hash); + struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->h.ssrc, &h->ssrc_hash); if (ch) { uint64_t ts64 = ntohl(mp->rtp->timestamp); @@ -3005,18 +2985,18 @@ static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_p else if (!ch->dtmf_events.length) ML_CLEAR(mp->media->monologue, DTMF_INJECTION_ACTIVE); - obj_release(ch); + ssrc_entry_release(ch); } } // substitute out SSRC etc - mp->rtp->ssrc = htonl(mp->ssrc_out->parent->h.ssrc); + mp->rtp->ssrc = htonl(mp->ssrc_out->h.ssrc); // to track our seq unsigned short seq = ntohs(mp->rtp->seq_num); while (true) { - mp->rtp->seq_num = htons(seq + mp->ssrc_out->parent->seq_diff); + mp->rtp->seq_num = htons(seq + mp->ssrc_out->seq_diff); // keep track of other stats here? @@ -3025,7 +3005,7 @@ static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_p if (duplicates == 0) break; duplicates--; - mp->ssrc_out->parent->seq_diff++; + mp->ssrc_out->seq_diff++; } // restore original in case it was mangled @@ -3084,7 +3064,7 @@ uint64_t codec_last_dtmf_event(struct codec_ssrc_handler *ch) { return ev->ts; } -uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_ctx *ssrc_in) { +uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_entry_call *ssrc_in) { if (!ch || !ch->encoder) { if (!ssrc_in) return 0; @@ -3262,7 +3242,7 @@ static void __buffer_delay_seq(struct delay_buffer *dbuf, struct media_packet *m return; if (__buffer_delay_do_direct(dbuf)) { - mp->ssrc_out->parent->seq_diff += seq_adj; + mp->ssrc_out->seq_diff += seq_adj; return; } @@ -3271,7 +3251,7 @@ static void __buffer_delay_seq(struct delay_buffer *dbuf, struct media_packet *m // peg the adjustment to the most recent frame if any struct delay_frame *dframe = t_queue_peek_head(&dbuf->frames); if (!dframe) { - mp->ssrc_out->parent->seq_diff += seq_adj; + mp->ssrc_out->seq_diff += seq_adj; return; } @@ -3331,7 +3311,7 @@ static bool __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *dec // schedule timer if not running yet if (!dtxb->ct.next) { if (!dtxb->ssrc) - dtxb->ssrc = mp->ssrc_in->parent->h.ssrc; + dtxb->ssrc = mp->ssrc_in->h.ssrc; dtxb->ct.next = mp->tv; dtxb->ct.next += rtpe_config.dtx_delay_us; timerthread_obj_schedule_abs(&dtxb->ct.tt_obj, dtxb->ct.next); @@ -3368,8 +3348,8 @@ static void delay_frame_free(struct delay_frame *dframe) { av_frame_free(&dframe->frame); g_free(dframe->mp.raw.s); media_packet_release(&dframe->mp); - obj_release(dframe->ch); - obj_release(dframe->input_ch); + ssrc_entry_release(dframe->ch); + ssrc_entry_release(dframe->input_ch); if (dframe->packet) __transcode_packet_free(dframe->packet); g_free(dframe); @@ -3387,8 +3367,8 @@ static void dtx_packet_free(struct dtx_packet *dtxp) { if (dtxp->packet) __transcode_packet_free(dtxp->packet); media_packet_release(&dtxp->mp); - obj_release(dtxp->decoder_handler); - obj_release(dtxp->input_handler); + ssrc_entry_release(dtxp->decoder_handler); + ssrc_entry_release(dtxp->input_handler); g_free(dtxp); } static void delay_buffer_stop(struct delay_buffer **pcmbp) { @@ -3569,7 +3549,7 @@ static void __delay_frame_process(struct delay_buffer *dbuf, struct delay_frame } if (dframe->seq_adj) - dframe->mp.ssrc_out->parent->seq_diff += dframe->seq_adj; + dframe->mp.ssrc_out->seq_diff += dframe->seq_adj; } static void __delay_send_later(struct codec_timer *ct) { struct delay_buffer *dbuf = (void *) ct; @@ -3809,7 +3789,7 @@ static void __dtx_send_later(struct codec_timer *ct) { shutdown = true; else if (!ps->ssrc_in[0]) shutdown = true; - else if (dtxb->ssrc != ps->ssrc_in[0]->parent->h.ssrc) + else if (dtxb->ssrc != ps->ssrc_in[0]->h.ssrc) shutdown = true; else if (dtxb->ct.next == 0) shutdown = true; @@ -3886,13 +3866,13 @@ static void __dtx_send_later(struct codec_timer *ct) { // packet consumed - track seq rwlock_lock_r(&call->master_lock); __ssrc_lock_both(&mp_copy); - mp_copy.ssrc_out->parent->seq_diff--; + mp_copy.ssrc_out->seq_diff--; __ssrc_unlock_both(&mp_copy); rwlock_unlock_r(&call->master_lock); } obj_release(call); - obj_release(ch); - obj_release(input_ch); + ssrc_entry_release(ch); + ssrc_entry_release(input_ch); if (dtxp) dtx_packet_free(dtxp); media_packet_release(&mp_copy); @@ -3978,8 +3958,8 @@ static void __dtx_send_later(struct codec_timer *ct) { out: obj_release(call); - obj_release(ch); - obj_release(input_ch); + ssrc_entry_release(ch); + ssrc_entry_release(input_ch); if (dtxp) dtx_packet_free(dtxp); media_packet_release(&mp_copy); @@ -3997,7 +3977,7 @@ static void __dtx_shutdown(struct dtx_buffer *dtxb) { ch->encoder->mux_dts = 0; } - obj_release(dtxb->csh); + ssrc_entry_release(dtxb->csh); } obj_release(dtxb->call); t_queue_clear_full(&dtxb->packets, dtx_packet_free); @@ -4358,7 +4338,7 @@ static struct ssrc_entry *__ssrc_handler_transcode_new(void *p) { return &ch->h; err: - obj_release(ch); + ssrc_entry_release(ch); return NULL; } static struct ssrc_entry *__ssrc_handler_decode_new(void *p) { @@ -4384,7 +4364,7 @@ static struct ssrc_entry *__ssrc_handler_decode_new(void *p) { return &ch->h; err: - obj_release(ch); + ssrc_entry_release(ch); return NULL; } static int __encoder_flush(encoder_t *enc, void *u1, void *u2) { @@ -4545,7 +4525,7 @@ static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, stru + fraction_divl(pkt->pts, cr_fact), ch->rtp_mark ? 1 : 0, payload_type, ts_delay); - mp->ssrc_out->parent->seq_diff++; + mp->ssrc_out->seq_diff++; ch->rtp_mark = 0; if (!repeats) break; @@ -4662,7 +4642,7 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v discard: av_frame_free(&frame); - obj_release(new_ch); + ssrc_entry_release(new_ch); return 0; } @@ -4836,33 +4816,29 @@ void codec_update_all_source_handlers(struct call_monologue *ml, const sdp_ng_fl } -void codec_calc_jitter(struct ssrc_ctx *ssrc, unsigned long ts, unsigned int clockrate, int64_t tv) { +void codec_calc_jitter(struct ssrc_entry_call *ssrc, unsigned long ts, unsigned int clockrate, int64_t tv) { if (!ssrc || !clockrate) return; - struct ssrc_entry_call *sec = ssrc->parent; // RFC 3550 A.8 uint32_t transit = (((tv / 1000) * clockrate) / 1000) - ts; - mutex_lock(&sec->h.lock); + LOCK(&ssrc->h.lock); int32_t d = 0; - if (sec->transit) - d = transit - sec->transit; - sec->transit = transit; + if (ssrc->transit) + d = transit - ssrc->transit; + ssrc->transit = transit; if (d < 0) d = -d; // ignore implausibly large values if (d < 100000) - sec->jitter += d - ((sec->jitter + 8) >> 4); - mutex_unlock(&sec->h.lock); + ssrc->jitter += d - ((ssrc->jitter + 8) >> 4); } -static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) { - struct ssrc_entry_call *s = ssrc->parent; - - LOCK(&s->h.lock); +static void codec_calc_lost(struct ssrc_entry_call *ssrc, uint16_t seq) { + LOCK(&ssrc->h.lock); // XXX shared code from kernel module - uint32_t last_seq = s->last_seq_tracked; + uint32_t last_seq = ssrc->last_seq_tracked; uint32_t new_seq = last_seq; // old seq or seq reset? @@ -4873,8 +4849,8 @@ static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) { else if (seq_diff > 0x100) { // reset seq and loss tracker new_seq = seq; - s->last_seq_tracked = seq; - s->lost_bits = -1; + ssrc->last_seq_tracked = seq; + ssrc->lost_bits = -1; } else { // seq wrap? @@ -4885,20 +4861,20 @@ static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) { break; } seq_diff = new_seq - last_seq; - s->last_seq_tracked = new_seq; + ssrc->last_seq_tracked = new_seq; // shift loss tracker bit field and count losses - if (seq_diff >= (sizeof(s->lost_bits) * 8)) { + if (seq_diff >= (sizeof(ssrc->lost_bits) * 8)) { // complete loss - s->packets_lost += sizeof(s->lost_bits) * 8; - s->lost_bits = -1; + ssrc->packets_lost += sizeof(ssrc->lost_bits) * 8; + ssrc->lost_bits = -1; } else { while (seq_diff) { // shift out one bit and see if we lost it - if ((s->lost_bits & 0x80000000) == 0) - s->packets_lost++; - s->lost_bits <<= 1; + if ((ssrc->lost_bits & 0x80000000) == 0) + ssrc->packets_lost++; + ssrc->lost_bits <<= 1; seq_diff--; } } @@ -4906,8 +4882,8 @@ static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) { // track this frame as being seen seq_diff = (new_seq & 0xffff) - seq; - if (seq_diff < (sizeof(s->lost_bits) * 8)) - s->lost_bits |= (1 << seq_diff); + if (seq_diff < (sizeof(ssrc->lost_bits) * 8)) + ssrc->lost_bits |= (1 << seq_diff); } @@ -4977,12 +4953,12 @@ static int handler_func_inject_dtmf(struct codec_handler *h, struct media_packet h->input_handler = __input_handler(h, mp); h->output_handler = h->input_handler; - struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, &h->ssrc_hash); + struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->h.ssrc, &h->ssrc_hash); if (!ch) return 0; decoder_input_data(ch->decoder, &mp->payload, mp->rtp->timestamp, h->packet_decoded, ch, mp); - obj_release(ch); + ssrc_entry_release(ch); return 0; } @@ -6267,8 +6243,8 @@ static void codec_timers_run(void *p) { #ifdef WITH_TRANSCODING static void transcode_job_free(struct transcode_job *j) { media_packet_release(&j->mp); - obj_release(j->ch); - obj_release(j->input_ch); + ssrc_entry_release(j->ch); + ssrc_entry_release(j->input_ch); if (j->packet) __transcode_packet_free(j->packet); g_free(j); diff --git a/daemon/dtmf.c b/daemon/dtmf.c index 9bc18a140..eddd894f4 100644 --- a/daemon/dtmf.c +++ b/daemon/dtmf.c @@ -715,7 +715,7 @@ int dtmf_code_from_char(char c) { // takes over the csh reference static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *sink, struct call_monologue *monologue, - struct packet_stream *ps, struct ssrc_ctx *ssrc_in, struct codec_handler *ch, + struct packet_stream *ps, struct ssrc_entry_call *ssrc_in, struct codec_handler *ch, struct codec_ssrc_handler *csh, int code, int volume, int duration, int pause) { @@ -725,13 +725,13 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * struct sink_handler *sh = l->data; struct packet_stream *sink_ps = sh->sink; __auto_type sink_media = sink_ps->media; - packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->parent->sequencers, sink_ps->media); + packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->sequencers, sink_ps->media); if (!seq) continue; - struct ssrc_ctx *ssrc_out = get_ssrc_ctx(sh->attrs.transcoding ? - ssrc_in->ssrc_map_out : ssrc_in->parent->h.ssrc, - &sink_media->ssrc_hash_out, SSRC_DIR_OUTPUT); + struct ssrc_entry_call *ssrc_out = get_ssrc(sh->attrs.transcoding ? + ssrc_in->ssrc_map_out : ssrc_in->h.ssrc, + &sink_media->ssrc_hash_out); if (!ssrc_out) return "No output SSRC context present"; // XXX generate stream @@ -750,7 +750,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * .m_pt = 0xff, .timestamp = 0, .seq_num = htons(seq->seq), - .ssrc = htonl(ssrc_in->parent->h.ssrc), + .ssrc = htonl(ssrc_in->h.ssrc), }; struct media_packet packet = { .tv = rtpe_now, @@ -788,7 +788,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * media_socket_dequeue(&packet, sink_ps); obj_put_o((struct obj *) csh); - ssrc_ctx_put(&ssrc_out); + ssrc_entry_release(ssrc_out); } return 0; @@ -802,7 +802,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura if (!media->streams.head) return "Media doesn't have an RTP stream"; struct packet_stream *ps = media->streams.head->data; - struct ssrc_ctx *ssrc_in = ps->ssrc_in[0]; + struct ssrc_entry_call *ssrc_in = ps->ssrc_in[0]; if (!ssrc_in) return "No SSRC context present for DTMF injection"; // XXX fall back to generating stream @@ -834,9 +834,9 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura ch->source_pt.payload_type, ch->dest_pt.payload_type, ch_pt, - ssrc_in->parent->h.ssrc); + ssrc_in->h.ssrc); - csh = get_ssrc(ssrc_in->parent->h.ssrc, &ch->ssrc_hash); + csh = get_ssrc(ssrc_in->h.ssrc, &ch->ssrc_hash); if (!csh) continue; break; @@ -857,7 +857,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura ilog(LOG_DEBUG, "Injecting RFC DTMF event #%i for %i ms (vol %i) from '" STR_FORMAT "' (media #%u) " "into RTP PT %i, SSRC %" PRIx32, code, duration, volume, STR_FMT(&monologue->tag), media->index, pt, - ssrc_in->parent->h.ssrc); + ssrc_in->h.ssrc); // synthesise start and stop events // the num_samples needs to be based on the the previous packet timestamp so we need to diff --git a/daemon/main.c b/daemon/main.c index df844e79b..69ea9b02e 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -607,7 +607,7 @@ static void create_listeners(const GQueue *endpoints_in, GQueue *objects_out, static void release_listeners(GQueue *q) { while (q->length) { struct obj *o = g_queue_pop_head(q); - obj_release(o); + obj_release_o(o); } } diff --git a/daemon/media_player.c b/daemon/media_player.c index 96c1ca7c8..341a748ac 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -180,7 +180,7 @@ static void media_player_shutdown(struct media_player *mp) { unsigned int num = send_timer_flush(mp->sink->send_timer, mp->coder.handler); ilog(LOG_DEBUG, "%u packets removed from send queue", num); // roll back seq numbers already used - mp->ssrc_out->parent->seq_diff -= num; + mp->ssrc_out->seq_diff -= num; } if (mp->opts.block_egress && mp->media) @@ -224,7 +224,7 @@ long long media_player_stop(struct media_player *mp) { #ifdef WITH_TRANSCODING static void __media_player_free(struct media_player *mp) { media_player_shutdown(mp); - ssrc_ctx_put(&mp->ssrc_out); + ssrc_entry_release(mp->ssrc_out); mutex_destroy(&mp->lock); obj_put(mp->call); av_packet_free(&mp->coder.pkt); @@ -235,7 +235,7 @@ static void __media_player_free(struct media_player *mp) { // call->master_lock held in W -void media_player_new(struct media_player **mpp, struct call_monologue *ml, struct ssrc_ctx *prev_ssrc, +void media_player_new(struct media_player **mpp, struct call_monologue *ml, struct ssrc_entry_call *prev_ssrc, media_player_opts_t *opts) { #ifdef WITH_TRANSCODING @@ -305,7 +305,7 @@ struct send_timer *send_timer_new(struct packet_stream *ps) { } // call is locked in R -static void send_timer_rtcp(struct send_timer *st, struct ssrc_ctx *ssrc_out) { +static void send_timer_rtcp(struct send_timer *st, struct ssrc_entry_call *ssrc_out) { struct call_media *media = st->sink ? st->sink->media : NULL; if (!media) return; @@ -403,11 +403,11 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet } // do we send RTCP? - struct ssrc_ctx *ssrc_out = cp->ssrc_out; + struct ssrc_entry_call *ssrc_out = cp->ssrc_out; if (ssrc_out && ssrc_out->next_rtcp) { - mutex_lock(&ssrc_out->parent->h.lock); + mutex_lock(&ssrc_out->h.lock); int64_t diff = ssrc_out->next_rtcp - rtpe_now; - mutex_unlock(&ssrc_out->parent->h.lock); + mutex_unlock(&ssrc_out->h.lock); if (diff < 0) send_timer_rtcp(st, ssrc_out); } @@ -597,7 +597,7 @@ static void media_player_kernel_player_start_now(struct media_player *mp) { .pt = dst_pt->payload_type, .seq = mp->seq, .ts = mp->buffer_ts, - .ssrc = mp->ssrc_out->parent->h.ssrc, + .ssrc = mp->ssrc_out->h.ssrc, .repeat = mp->opts.repeat, .stats = mp->sink->stats_out, .iface_stats = mp->sink->selected_sfd->local_intf->stats, @@ -1142,8 +1142,8 @@ void media_player_set_media(struct media_player *mp, struct call_media *media) { mp->sink = media->streams.head->data; mp->crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP], media, true); } - if (!mp->ssrc_out || mp->ssrc_out->parent->h.ssrc != mp->ssrc) { - struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(mp->ssrc, &media->ssrc_hash_out, SSRC_DIR_OUTPUT); + if (!mp->ssrc_out || mp->ssrc_out->h.ssrc != mp->ssrc) { + struct ssrc_entry_call *ssrc_ctx = get_ssrc(mp->ssrc, &media->ssrc_hash_out); ssrc_ctx->next_rtcp = rtpe_now; mp->ssrc_out = ssrc_ctx; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 4f8e42aea..9baac3b2e 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -107,10 +107,10 @@ static int __k_null(struct rtpengine_srtp *s, struct packet_stream *); static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *); static int __k_srtp_decrypt(struct rtpengine_srtp *s, struct packet_stream *); -static int call_avp2savp_rtp(str *s, struct packet_stream *, struct ssrc_ctx *); -static int call_savp2avp_rtp(str *s, struct packet_stream *, struct ssrc_ctx *); -static int call_avp2savp_rtcp(str *s, struct packet_stream *, struct ssrc_ctx *); -static int call_savp2avp_rtcp(str *s, struct packet_stream *, struct ssrc_ctx *); +static int call_avp2savp_rtp(str *s, struct packet_stream *, struct ssrc_entry_call *); +static int call_savp2avp_rtp(str *s, struct packet_stream *, struct ssrc_entry_call *); +static int call_avp2savp_rtcp(str *s, struct packet_stream *, struct ssrc_entry_call *); +static int call_savp2avp_rtcp(str *s, struct packet_stream *, struct ssrc_entry_call *); static struct logical_intf *__get_logical_interface(const str *name, sockfamily_t *fam); @@ -1438,19 +1438,19 @@ static void stream_fd_closed(int fd, void *p) { -static int call_avp2savp_rtp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx) +static int call_avp2savp_rtp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx) { return rtp_avp2savp(s, &stream->crypto, ssrc_ctx); } -static int call_avp2savp_rtcp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx) +static int call_avp2savp_rtcp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx) { return rtcp_avp2savp(s, &stream->crypto, ssrc_ctx); } -static int call_savp2avp_rtp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx) +static int call_savp2avp_rtp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx) { return rtp_savp2avp(s, &stream->selected_sfd->crypto, ssrc_ctx); } -static int call_savp2avp_rtcp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx) +static int call_savp2avp_rtcp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx) { return rtcp_savp2avp(s, &stream->selected_sfd->crypto, ssrc_ctx); } @@ -1606,7 +1606,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st reti->track_ssrc = 1; for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { if (stream->ssrc_in[u]) { - reti->ssrc[u] = htonl(stream->ssrc_in[u]->parent->h.ssrc); + reti->ssrc[u] = htonl(stream->ssrc_in[u]->h.ssrc); reti->ssrc_stats[u] = stream->ssrc_in[u]->stats; } } @@ -1769,7 +1769,7 @@ static const char *kernelize_one(kernelize_state *s, for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { if (sink->ssrc_out[u]) { // XXX order can be different from ingress? - redi->output.seq_offset[u] = sink->ssrc_out[u]->parent->seq_diff; + redi->output.seq_offset[u] = sink->ssrc_out[u]->seq_diff; redi->output.ssrc_stats[u] = sink->ssrc_out[u]->stats; } @@ -1891,7 +1891,7 @@ no_kernel: } // must be called with appropriate locks (master lock and/or in/out_lock) -int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], +int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int start_idx) { for (unsigned int v = 0; v < RTPE_NUM_SSRC_TRACKING; v++) { @@ -1899,14 +1899,14 @@ int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACK unsigned int idx = (start_idx + v) % RTPE_NUM_SSRC_TRACKING; if (!list[idx]) continue; - if (list[idx]->parent->h.ssrc != ssrc) + if (list[idx]->h.ssrc != ssrc) continue; return idx; } return -1; } // must be called with appropriate locks (master lock and/or in/out_lock) -struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], +struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int start_idx) { int idx = __hunt_ssrc_ctx_idx(ssrc, list, start_idx); @@ -2070,9 +2070,9 @@ noop: // returns non-null with reason string if stream should be removed from kernel static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, mutex_t *lock, - struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p, + struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p, uint32_t output_ssrc, - struct ssrc_ctx **output, struct ssrc_hash *ssrc_hash, enum ssrc_dir dir, const char *label) + struct ssrc_entry_call **output, struct ssrc_hash *ssrc_hash, const char *label) { const char *ret = NULL; @@ -2086,10 +2086,10 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, *ctx_idx_p = (*ctx_idx_p + 1) % RTPE_NUM_SSRC_TRACKING; // eject old entry if present if (list[ctx_idx]) - ssrc_ctx_put(&list[ctx_idx]); + ssrc_entry_release(list[ctx_idx]); // get new entry list[ctx_idx] = - get_ssrc_ctx(ssrc, ssrc_hash, dir); + get_ssrc(ssrc, ssrc_hash); ret = "SSRC changed"; ilog(LOG_DEBUG, "New %s SSRC for: %s%s:%d SSRC: %x%s", label, @@ -2097,17 +2097,16 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, } if (ctx_idx != 0) { // move most recent entry to front of the list - struct ssrc_ctx *tmp = list[0]; + struct ssrc_entry_call *tmp = list[0]; list[0] = list[ctx_idx]; list[ctx_idx] = tmp; ctx_idx = 0; } // extract and hold entry - if (*output) - ssrc_ctx_put(output); + ssrc_entry_release(*output); *output = list[ctx_idx]; - ssrc_ctx_hold(*output); + ssrc_entry_hold(*output); // reverse SSRC mapping if (!output_ssrc) @@ -2121,26 +2120,27 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, // check and update input SSRC pointers // returns non-null with reason string if stream should be removed from kernel static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, - struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash) + struct ssrc_entry_call **ssrc_in_p, struct ssrc_hash *ssrc_hash) { return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in, - &in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress"); + &in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, "ingress"); } // check and update output SSRC pointers // returns non-null with reason string if stream should be removed from kernel static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ssrc_bs, - struct ssrc_ctx *ssrc_in, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash, + struct ssrc_entry_call *ssrc_in, struct ssrc_entry_call **ssrc_out_p, + struct ssrc_hash *ssrc_hash, bool ssrc_change) { if (ssrc_change) return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock, out_srtp->ssrc_out, - &out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, + &out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, "egress (mapped)"); return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock, out_srtp->ssrc_out, - &out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, + &out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, "egress (direct)"); } @@ -2674,9 +2674,9 @@ void media_packet_copy(struct media_packet *dst, const struct media_packet *src) if (dst->sfd) obj_hold(dst->sfd); if (dst->ssrc_in) - obj_hold(&dst->ssrc_in->parent->h); + ssrc_entry_hold(dst->ssrc_in); if (dst->ssrc_out) - obj_hold(&dst->ssrc_out->parent->h); + ssrc_entry_hold(dst->ssrc_out); dst->rtp = __g_memdup(src->rtp, sizeof(*src->rtp)); dst->rtcp = __g_memdup(src->rtcp, sizeof(*src->rtcp)); dst->payload = STR_NULL; @@ -2685,10 +2685,8 @@ void media_packet_copy(struct media_packet *dst, const struct media_packet *src) void media_packet_release(struct media_packet *mp) { if (mp->sfd) obj_put(mp->sfd); - if (mp->ssrc_in) - obj_put(&mp->ssrc_in->parent->h); - if (mp->ssrc_out) - obj_put(&mp->ssrc_out->parent->h); + ssrc_entry_release(mp->ssrc_in); + ssrc_entry_release(mp->ssrc_out); media_socket_dequeue(mp, NULL); g_free(mp->rtp); g_free(mp->rtcp); @@ -2981,7 +2979,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { next_mirror: media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left - ssrc_ctx_put(&mirror_phc.mp.ssrc_out); + ssrc_entry_release(mirror_phc.mp.ssrc_out); } } @@ -3020,7 +3018,7 @@ err_next: next: media_socket_dequeue(&phc->mp, NULL); // just free if anything left - ssrc_ctx_put(&phc->mp.ssrc_out); + ssrc_entry_release(phc->mp.ssrc_out); } ///////////////// INGRESS POST-PROCESSING HANDLING @@ -3072,9 +3070,9 @@ out: rwlock_unlock_r(&phc->mp.call->master_lock); media_socket_dequeue(&phc->mp, NULL); // just free - ssrc_ctx_put(&phc->mp.ssrc_out); + ssrc_entry_release(phc->mp.ssrc_out); - ssrc_ctx_put(&phc->mp.ssrc_in); + ssrc_entry_release(phc->mp.ssrc_in); rtcp_list_free(&phc->rtcp_list); g_queue_clear_full(&free_list, bufferpool_unref); diff --git a/daemon/mqtt.c b/daemon/mqtt.c index 44e64e014..8e245daaa 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -26,7 +26,7 @@ static bool is_connected = false; static struct interface_sampled_rate_stats interface_rate_stats; -static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media); +static void mqtt_ssrc_stats(struct ssrc_entry_call *ssrc, JsonBuilder *json, struct call_media *media); @@ -202,14 +202,12 @@ static void mqtt_monologue_stats(struct call_monologue *ml, JsonBuilder *json) { } -static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media) { +static void mqtt_ssrc_stats(struct ssrc_entry_call *ssrc, JsonBuilder *json, struct call_media *media) { if (!ssrc || !media) return; - struct ssrc_entry_call *sc = ssrc->parent; - json_builder_set_member_name(json, "SSRC"); - json_builder_add_int_value(json, sc->h.ssrc); + json_builder_add_int_value(json, ssrc->h.ssrc); unsigned char prim_pt = 255; mutex_lock(&ssrc->tracker.lock); @@ -245,8 +243,8 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal int64_t packets, octets, packets_lost, duplicates; packets = atomic64_get_na(&ssrc->stats->packets); octets = atomic64_get_na(&ssrc->stats->bytes); - packets_lost = sc->packets_lost; - duplicates = sc->duplicates; + packets_lost = ssrc->packets_lost; + duplicates = ssrc->duplicates; // process per-second stats int64_t cur_ts = rtpe_now; @@ -296,16 +294,16 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal json_builder_add_double_value(json, (double) duplicates * 1000000.0 / usecs_diff); } - mutex_lock(&sc->h.lock); - uint32_t jitter = sc->jitter; + mutex_lock(&ssrc->h.lock); + uint32_t jitter = ssrc->jitter; int64_t mos = -1, rtt = -1, rtt_leg = -1; - if (sc->stats_blocks.length) { - struct ssrc_stats_block *sb = sc->stats_blocks.tail->data; + if (ssrc->stats_blocks.length) { + struct ssrc_stats_block *sb = ssrc->stats_blocks.tail->data; mos = sb->mos; rtt = sb->rtt; rtt_leg = sb->rtt_leg; } - mutex_unlock(&sc->h.lock); + mutex_unlock(&ssrc->h.lock); if (clockrate) { json_builder_set_member_name(json, "jitter"); diff --git a/daemon/redis.c b/daemon/redis.c index c9e646f89..0297d6632 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1981,15 +1981,15 @@ static void json_build_ssrc_iter(const ng_parser_t *parser, parser_arg dict, hel struct ssrc_entry_call *se_out = get_ssrc(ssrc, &md->ssrc_hash_out); if (se_in) { - atomic_set_na(&se_in->input_ctx.stats->ext_seq, parser_get_ll(dict, "in_srtp_index")); - atomic_set_na(&se_in->input_ctx.stats->rtcp_seq, parser_get_ll(dict, "in_srtcp_index")); - payload_tracker_add(&se_in->input_ctx.tracker, parser_get_ll(dict, "in_payload_type")); + atomic_set_na(&se_in->stats->ext_seq, parser_get_ll(dict, "in_srtp_index")); + atomic_set_na(&se_in->stats->rtcp_seq, parser_get_ll(dict, "in_srtcp_index")); + payload_tracker_add(&se_in->tracker, parser_get_ll(dict, "in_payload_type")); obj_put(&se_in->h); } if (se_out) { - atomic_set_na(&se_out->output_ctx.stats->ext_seq, parser_get_ll(dict, "out_srtp_index")); - atomic_set_na(&se_out->output_ctx.stats->rtcp_seq, parser_get_ll(dict, "out_srtcp_index")); - payload_tracker_add(&se_out->output_ctx.tracker, parser_get_ll(dict, "out_payload_type")); + atomic_set_na(&se_out->stats->ext_seq, parser_get_ll(dict, "out_srtp_index")); + atomic_set_na(&se_out->stats->rtcp_seq, parser_get_ll(dict, "out_srtcp_index")); + payload_tracker_add(&se_out->tracker, parser_get_ll(dict, "out_payload_type")); obj_put(&se_out->h); } } @@ -2728,12 +2728,12 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) { JSON_SET_SIMPLE("ssrc", "%" PRIu32, se->h.ssrc); // XXX use function for in/out - JSON_SET_SIMPLE("in_srtp_index", "%u", atomic_get_na(&se->input_ctx.stats->ext_seq)); - JSON_SET_SIMPLE("in_srtcp_index", "%u", atomic_get_na(&se->input_ctx.stats->rtcp_seq)); - JSON_SET_SIMPLE("in_payload_type", "%i", se->input_ctx.tracker.most[0]); - JSON_SET_SIMPLE("out_srtp_index", "%u", atomic_get_na(&se->output_ctx.stats->ext_seq)); - JSON_SET_SIMPLE("out_srtcp_index", "%u", atomic_get_na(&se->output_ctx.stats->rtcp_seq)); - JSON_SET_SIMPLE("out_payload_type", "%i", se->output_ctx.tracker.most[0]); + JSON_SET_SIMPLE("in_srtp_index", "%u", atomic_get_na(&se->stats->ext_seq)); + JSON_SET_SIMPLE("in_srtcp_index", "%u", atomic_get_na(&se->stats->rtcp_seq)); + JSON_SET_SIMPLE("in_payload_type", "%i", se->tracker.most[0]); + //JSON_SET_SIMPLE("out_srtp_index", "%u", atomic_get_na(&se->output_ctx.stats->ext_seq)); + //JSON_SET_SIMPLE("out_srtcp_index", "%u", atomic_get_na(&se->output_ctx.stats->rtcp_seq)); + //JSON_SET_SIMPLE("out_payload_type", "%i", se->output_ctx.tracker.most[0]); // XXX add rest of info } } // --- for medias.head diff --git a/daemon/rtcp.c b/daemon/rtcp.c index b4be671a8..7a873310e 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -282,6 +282,9 @@ struct rtcp_handlers { *homer; }; +TYPED_GQUEUE(ssrc, struct ssrc_entry_call) +TYPED_GQUEUE(ssrc_rr, struct ssrc_receiver_report) + // log handler function prototypes // scratch area (prepare/parse packet) @@ -840,7 +843,7 @@ error: } /* rfc 3711 section 3.4 */ -int rtcp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { +int rtcp_avp2savp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) { struct rtcp_packet *rtcp; unsigned int i; uint32_t *idx; @@ -888,7 +891,7 @@ int rtcp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { /* rfc 3711 section 3.4 */ -int rtcp_savp2avp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { +int rtcp_savp2avp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) { struct rtcp_packet *rtcp; str payload, to_auth, to_decrypt, auth_tag; uint32_t idx; @@ -1287,7 +1290,7 @@ static void mos_xr_voip_metrics(struct rtcp_process_ctx *ctx, const struct xr_rb static void transcode_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) { if (!ctx->mp->ssrc_in) return; - if (ctx->scratch_common_ssrc != ctx->mp->ssrc_in->parent->h.ssrc) + if (ctx->scratch_common_ssrc != ctx->mp->ssrc_in->h.ssrc) return; // forward SSRC mapping common->ssrc = htonl(ctx->mp->ssrc_in->ssrc_map_out); @@ -1297,23 +1300,21 @@ static void transcode_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *c static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) { if (!ctx->mp->ssrc_in) return; - if (ctx->scratch.rr.from != ctx->mp->ssrc_in->parent->h.ssrc) + if (ctx->scratch.rr.from != ctx->mp->ssrc_in->h.ssrc) return; if (!ctx->mp->media) return; // reverse SSRC mapping - struct ssrc_ctx *map_ctx = get_ssrc_ctx(ctx->scratch.rr.ssrc, &ctx->mp->media->ssrc_hash_out, - SSRC_DIR_OUTPUT); + struct ssrc_entry_call *map_ctx = get_ssrc(ctx->scratch.rr.ssrc, &ctx->mp->media->ssrc_hash_out); rr->ssrc = htonl(map_ctx->ssrc_map_out); if (!ctx->mp->media_out) return; // for reception stats - struct ssrc_ctx *input_ctx = get_ssrc_ctx(map_ctx->ssrc_map_out, - &ctx->mp->media_out->ssrc_hash_in, - SSRC_DIR_INPUT); + struct ssrc_entry_call *input_ctx = get_ssrc(map_ctx->ssrc_map_out, + &ctx->mp->media_out->ssrc_hash_in); if (!input_ctx) return; @@ -1326,8 +1327,8 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) if (!packets) goto out; - unsigned int lost = input_ctx->parent->packets_lost; - unsigned int dupes = input_ctx->parent->duplicates; + unsigned int lost = input_ctx->packets_lost; + unsigned int dupes = input_ctx->duplicates; unsigned int tot_lost = lost - dupes; // can be negative/rollover ilogs(rtcp, LOG_DEBUG, "Substituting RTCP RR SSRC from %s%x%s to %x: %u packets, %u lost, %u duplicates", @@ -1353,14 +1354,13 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) // XXX jitter, last SR out: - if (input_ctx) - obj_put(&input_ctx->parent->h); - obj_put(&map_ctx->parent->h); + ssrc_entry_release(input_ctx); + ssrc_entry_release(map_ctx); } static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) { if (!ctx->mp->ssrc_in) return; - if (ctx->scratch.sr.ssrc != ctx->mp->ssrc_in->parent->h.ssrc) + if (ctx->scratch.sr.ssrc != ctx->mp->ssrc_in->h.ssrc) return; if (!ctx->mp->ssrc_out) return; @@ -1410,8 +1410,8 @@ void rtcp_init(void) { static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, - uint32_t ssrc, uint32_t ssrc_out, uint32_t ts, uint32_t packets, uint32_t octets, GQueue *rrs, - GQueue *srrs) + uint32_t ssrc, uint32_t ssrc_out, uint32_t ts, uint32_t packets, uint32_t octets, ssrc_q *rrs, + ssrc_rr_q *srrs) { GString *ret = g_string_sized_new(128); g_string_set_size(ret, sizeof(struct sender_report_packet)); @@ -1444,7 +1444,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, // receiver reports int i = 0, n = 0; while (rrs->length) { - struct ssrc_ctx *s = g_queue_pop_head(rrs); + struct ssrc_entry_call *s = t_queue_pop_head(rrs); if (i < 30) { g_string_set_size(ret, ret->len + sizeof(struct report_block)); struct report_block *rr = (void *) ret->str + ret->len - sizeof(struct report_block); @@ -1452,23 +1452,22 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, // XXX unify with transcode_rr // last received SR? - struct ssrc_entry_call *se = s->parent; int64_t tv_diff = 0; uint32_t ntp_middle_bits = 0; - mutex_lock(&se->h.lock); - if (se->sender_reports.length) { - struct ssrc_time_item *si = se->sender_reports.tail->data; + mutex_lock(&s->h.lock); + if (s->sender_reports.length) { + struct ssrc_time_item *si = s->sender_reports.tail->data; tv_diff = rtpe_now - si->received; ntp_middle_bits = si->ntp_middle_bits; } - uint32_t jitter = se->jitter; - mutex_unlock(&se->h.lock); + uint32_t jitter = s->jitter; + mutex_unlock(&s->h.lock); - uint64_t lost = se->packets_lost; + uint64_t lost = s->packets_lost; uint64_t tot = atomic64_get(&s->stats->packets); *rr = (struct report_block) { - .ssrc = htonl(s->parent->h.ssrc), + .ssrc = htonl(s->h.ssrc), .fraction_lost = lost * 256 / (tot + lost), .number_lost[0] = (lost >> 16) & 0xff, .number_lost[1] = (lost >> 8) & 0xff, @@ -1483,7 +1482,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, struct ssrc_receiver_report *srr = g_new(__typeof(*srr), 1); *srr = (struct ssrc_receiver_report) { .from = ssrc_out, - .ssrc = s->parent->h.ssrc, + .ssrc = s->h.ssrc, .fraction_lost = lost * 256 / (tot + lost), .packets_lost = lost, .high_seq_received = atomic_get_na(&s->stats->ext_seq), @@ -1491,12 +1490,12 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, .dlsr = tv_diff * 65536 / 1000000, .jitter = jitter >> 4, }; - g_queue_push_tail(srrs, srr); + t_queue_push_tail(srrs, srr); } n++; } - ssrc_ctx_put(&s); + ssrc_entry_release(s); i++; } @@ -1537,22 +1536,21 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, return ret; } -static void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash) { +static void rtcp_receiver_reports(ssrc_q *out, struct ssrc_hash *hash) { LOCK(&hash->lock); for (GList *l = hash->nq.head; l; l = l->next) { - struct ssrc_entry_call *e = l->data; - struct ssrc_ctx *i = &e->input_ctx; + struct ssrc_entry_call *i = l->data; if (!atomic64_get_na(&i->stats->packets)) continue; - ssrc_ctx_hold(i); - g_queue_push_tail(out, i); + ssrc_entry_hold(i); + t_queue_push_tail(out, i); } } // call must be locked in R -void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { +void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out) { // figure out where to send it struct packet_stream *ps = media->streams.head->data; // crypto context is held separately @@ -1574,17 +1572,17 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { log_info_stream_fd(ps->selected_sfd); - GQueue rrs = G_QUEUE_INIT; + ssrc_q rrs = TYPED_GQUEUE_INIT; rtcp_receiver_reports(&rrs, &media->ssrc_hash_in); ilogs(rtcp, LOG_DEBUG, "Generating and sending RTCP SR for %x and up to %i source(s)", - ssrc_out->parent->h.ssrc, rrs.length); + ssrc_out->h.ssrc, rrs.length); struct ssrc_sender_report ssr; - GQueue srrs = G_QUEUE_INIT; + ssrc_rr_q srrs = TYPED_GQUEUE_INIT; - GString *sr = rtcp_sender_report(&ssr, ssrc_out->parent->h.ssrc, - ssrc_out->ssrc_map_out ? : ssrc_out->parent->h.ssrc, + GString *sr = rtcp_sender_report(&ssr, ssrc_out->h.ssrc, + ssrc_out->ssrc_map_out ? : ssrc_out->h.ssrc, atomic_get_na(&ssrc_out->stats->timestamp), atomic64_get_na(&ssrc_out->stats->packets), atomic64_get(&ssrc_out->stats->bytes), @@ -1613,13 +1611,13 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { struct call_media *other_media = sink->media; ssrc_sender_report(other_media, &ssr, rtpe_now); - for (GList *k = srrs.head; k; k = k->next) { + for (__auto_type k = srrs.head; k; k = k->next) { struct ssrc_receiver_report *srr = k->data; ssrc_receiver_report(other_media, sink->selected_sfd, srr, rtpe_now); } } while (srrs.length) { - struct ssrc_receiver_report *srr = g_queue_pop_head(&srrs); + struct ssrc_receiver_report *srr = t_queue_pop_head(&srrs); g_free(srr); } } diff --git a/daemon/rtp.c b/daemon/rtp.c index c71fff5e6..e12181b9f 100644 --- a/daemon/rtp.c +++ b/daemon/rtp.c @@ -43,13 +43,15 @@ error: return -1; } -static unsigned int packet_index(struct ssrc_ctx *ssrc_ctx, struct rtp_header *rtp, crypto_debug_string **cds) { +static unsigned int packet_index(struct ssrc_entry_call *ssrc_ctx, struct rtp_header *rtp, + crypto_debug_string **cds) +{ uint16_t seq; seq = ntohs(rtp->seq_num); - *cds = crypto_debug_init((seq & 0x1ff) == (ssrc_ctx->parent->h.ssrc & 0x1ff)); - crypto_debug_printf(*cds, "SSRC %" PRIx32 ", seq %" PRIu16, ssrc_ctx->parent->h.ssrc, seq); + *cds = crypto_debug_init((seq & 0x1ff) == (ssrc_ctx->h.ssrc & 0x1ff)); + crypto_debug_printf(*cds, "SSRC %" PRIx32 ", seq %" PRIu16, ssrc_ctx->h.ssrc, seq); /* rfc 3711 section 3.3.1 */ unsigned int srtp_index = atomic_get_na(&ssrc_ctx->stats->ext_seq); @@ -100,7 +102,7 @@ void rtp_append_mki(str *s, struct crypto_context *c, crypto_debug_string *cds) } /* rfc 3711, section 3.3 */ -int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { +int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) { struct rtp_header *rtp; str payload, to_auth; unsigned int index; @@ -142,7 +144,7 @@ int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { } // just updates the ext_seq in ssrc -int rtp_update_index(str *s, struct packet_stream *ps, struct ssrc_ctx *ssrc) { +int rtp_update_index(str *s, struct packet_stream *ps, struct ssrc_entry_call *ssrc) { struct rtp_header *rtp; if (G_UNLIKELY(!ssrc)) @@ -155,7 +157,7 @@ int rtp_update_index(str *s, struct packet_stream *ps, struct ssrc_ctx *ssrc) { } /* rfc 3711, section 3.3 */ -int rtp_savp2avp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { +int rtp_savp2avp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) { struct rtp_header *rtp; unsigned int index; str payload, to_auth, to_decrypt, auth_tag; diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 86249ce57..0fbd230b4 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -26,15 +26,6 @@ static mos_calc_fn *mos_calcs[__MOS_TYPES] = { static void __free_ssrc_entry_call(struct ssrc_entry_call *e); -static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { - c->parent = parent; - payload_tracker_init(&c->tracker); - while (!c->ssrc_map_out) - c->ssrc_map_out = ssl_random(); - c->seq_out = ssl_random(); - atomic64_set_na(&c->last_sample, rtpe_now); - c->stats = bufferpool_alloc0(shm_bufferpool, sizeof(*c->stats)); -} static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) { ent->ssrc = ssrc; mutex_init(&ent->lock); @@ -43,8 +34,12 @@ static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) { static struct ssrc_entry *create_ssrc_entry_call(void *uptr) { struct ssrc_entry_call *ent; ent = obj_alloc0(struct ssrc_entry_call, __free_ssrc_entry_call); - init_ssrc_ctx(&ent->input_ctx, ent); - init_ssrc_ctx(&ent->output_ctx, ent); + payload_tracker_init(&ent->tracker); + while (!ent->ssrc_map_out) + ent->ssrc_map_out = ssl_random(); + ent->seq_out = ssl_random(); + atomic64_set_na(&ent->last_sample, rtpe_now); + ent->stats = bufferpool_alloc0(shm_bufferpool, sizeof(*ent->stats)); //ent->seq_out = ssl_random(); //ent->ts_out = ssl_random(); ent->lost_bits = -1; @@ -70,8 +65,7 @@ static void __free_ssrc_entry_call(struct ssrc_entry_call *e) { g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block); if (e->sequencers) g_hash_table_destroy(e->sequencers); - bufferpool_unref(e->input_ctx.stats); - bufferpool_unref(e->output_ctx.stats); + bufferpool_unref(e->stats); } static void ssrc_entry_put(void *ep) { struct ssrc_entry_call *e = ep; @@ -297,14 +291,6 @@ void ssrc_hash_call_init(struct ssrc_hash *sh) { ssrc_hash_full_init(sh, create_ssrc_entry_call, NULL); } -struct ssrc_ctx *get_ssrc_ctx(uint32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) { - struct ssrc_entry *s = get_ssrc(ssrc, ht /* , NULL */); - if (G_UNLIKELY(!s)) - return NULL; - struct ssrc_ctx *ret = ((void *) s) + dir; - return ret; -} - static void *__do_time_report_item(struct call_media *m, size_t struct_size, size_t reports_queue_offset, @@ -370,10 +356,10 @@ static int64_t __calc_rtt(struct call_media *m, struct crtt_args a) return 0; if (a.pt_p) - *a.pt_p = e->output_ctx.tracker.most[0] == 255 ? -1 : e->output_ctx.tracker.most[0]; + *a.pt_p = e->tracker.most[0] == 255 ? -1 : e->tracker.most[0]; // grab the opposite side SSRC for the time reports - uint32_t map_ssrc = e->output_ctx.ssrc_map_out; + uint32_t map_ssrc = e->ssrc_map_out; if (!map_ssrc) map_ssrc = e->h.ssrc; obj_put(&e->h); @@ -448,7 +434,7 @@ void ssrc_receiver_report(struct call_media *m, stream_fd *sfd, const struct ssr int pt; int64_t rtt = calc_rtt(m, - .ht = &m->ssrc_hash_in, + .ht = &m->ssrc_hash_out, .tv = tv, .pt_p = &pt, .ssrc = rr->ssrc, @@ -580,7 +566,7 @@ void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr, dlrr->lrr, dlrr->dlrr); calc_rtt(m, - .ht = &m->ssrc_hash_in, + .ht = &m->ssrc_hash_out, .tv = tv, .pt_p = NULL, .ssrc = dlrr->ssrc, @@ -722,22 +708,21 @@ void ssrc_collect_metrics(struct call_media *media) { return; struct packet_stream *ps = media->streams.head->data; for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { - struct ssrc_ctx *s = ps->ssrc_in[i]; + struct ssrc_entry_call *s = ps->ssrc_in[i]; if (!s) break; // end of list - struct ssrc_entry_call *e = s->parent; // exclude zero values - technically possible but unlikely and probably just unset - if (!e->jitter) + if (!s->jitter) continue; - if (e->input_ctx.tracker.most_len > 0 && e->input_ctx.tracker.most[0] != 255) { - const rtp_payload_type *rpt = get_rtp_payload_type(e->input_ctx.tracker.most[0], + if (s->tracker.most_len > 0 && s->tracker.most[0] != 255) { + const rtp_payload_type *rpt = get_rtp_payload_type(s->tracker.most[0], &ps->media->codecs); if (rpt && rpt->clock_rate) - e->jitter = e->jitter * 1000 / rpt->clock_rate; + s->jitter = s->jitter * 1000 / rpt->clock_rate; } - RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd); + RTPE_SAMPLE_SFD(jitter_measured, s->jitter, ps->selected_sfd); } } diff --git a/include/call.h b/include/call.h index a41eae154..1f24be7e4 100644 --- a/include/call.h +++ b/include/call.h @@ -439,7 +439,7 @@ struct packet_stream { struct endpoint advertised_endpoint; /* RO */ struct endpoint learned_endpoint; /* LOCK: out_lock */ struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */ + struct ssrc_entry_call *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */ *ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */ unsigned int ssrc_in_idx, /* LOCK: in_lock */ ssrc_out_idx; /* LOCK: out_lock */ diff --git a/include/codec.h b/include/codec.h index c6c9bd80b..4b0340848 100644 --- a/include/codec.h +++ b/include/codec.h @@ -95,7 +95,7 @@ struct codec_packet { struct rtp_header *rtp; unsigned long ts; unsigned int clockrate; - struct ssrc_ctx *ssrc_out; + struct ssrc_entry_call *ssrc_out; void (*free_func)(void *); void (*plain_free_func)(void *); }; @@ -151,7 +151,7 @@ struct codec_handler *codec_handler_make_media_player(const rtp_payload_type *sr str_case_value_ht codec_set); struct codec_handler *codec_handler_make_dummy(const rtp_payload_type *dst_pt, struct call_media *media, str_case_value_ht codec_set); -void codec_calc_jitter(struct ssrc_ctx *, unsigned long ts, unsigned int clockrate, int64_t); +void codec_calc_jitter(struct ssrc_entry_call *, unsigned long ts, unsigned int clockrate, int64_t); void codec_update_all_handlers(struct call_monologue *ml); void codec_update_all_source_handlers(struct call_monologue *ml, const sdp_ng_flags *flags); @@ -239,7 +239,7 @@ __attribute__((nonnull(1, 2))) void __codec_handlers_update(struct call_media *receiver, struct call_media *sink, struct chu_args); void codec_add_dtmf_event(struct codec_ssrc_handler *ch, int code, int level, uint64_t ts, bool injected); uint64_t codec_last_dtmf_event(struct codec_ssrc_handler *ch); -uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_ctx *); +uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_entry_call *); void codec_decoder_skip_pts(struct codec_ssrc_handler *ch, uint64_t); uint64_t codec_decoder_unskip_pts(struct codec_ssrc_handler *ch); void codec_tracker_update(struct codec_store *, struct codec_store *); diff --git a/include/media_player.h b/include/media_player.h index 5d85a02c9..91cc74f85 100644 --- a/include/media_player.h +++ b/include/media_player.h @@ -12,7 +12,7 @@ struct call_media; struct call_monologue; struct codec_handler; -struct ssrc_ctx; +struct ssrc_entry_call; struct packet_stream; struct codec_packet; struct media_player; @@ -82,7 +82,7 @@ struct media_player { struct media_player_media_file *media_file; uint32_t ssrc; - struct ssrc_ctx *ssrc_out; + struct ssrc_entry_call *ssrc_out; unsigned long seq; unsigned long buffer_ts; unsigned long sync_ts; @@ -122,7 +122,7 @@ struct send_timer { #define MPO(...) (media_player_opts_t){__VA_ARGS__} -void media_player_new(struct media_player **, struct call_monologue *, struct ssrc_ctx *prev_ssrc, media_player_opts_t *); +void media_player_new(struct media_player **, struct call_monologue *, struct ssrc_entry_call *prev_ssrc, media_player_opts_t *); bool media_player_add(struct media_player *mp, media_player_opts_t opts); bool media_player_start(struct media_player *); long long media_player_stop(struct media_player *); diff --git a/include/media_socket.h b/include/media_socket.h index a38a1fef3..c063f8935 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -19,7 +19,7 @@ struct media_packet; struct transport_protocol; -struct ssrc_ctx; +struct ssrc_entry_call; struct rtpengine_srtp; struct jb_packet; struct poller; @@ -29,7 +29,7 @@ TYPED_GQUEUE(stream_fd, stream_fd) typedef int rtcp_filter_func(struct media_packet *, GQueue *); -typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_ctx *); +typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_entry_call *); enum transport_protocol_index { @@ -273,7 +273,7 @@ struct media_packet { struct rtp_header *rtp; struct rtcp_packet *rtcp; - struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp + struct ssrc_entry_call *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp str payload; codec_packet_q packets_out; @@ -316,9 +316,9 @@ void unkernelize(struct packet_stream *, const char *); void __stream_unconfirm(struct packet_stream *, const char *); void __reset_sink_handlers(struct packet_stream *); -int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], +int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int start_idx); -struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], +struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int start_idx); void media_packet_copy(struct media_packet *, const struct media_packet *); diff --git a/include/rtcp.h b/include/rtcp.h index 169371d03..aa6a0995f 100644 --- a/include/rtcp.h +++ b/include/rtcp.h @@ -9,7 +9,6 @@ struct crypto_context; struct rtcp_packet; -struct ssrc_ctx; struct rtcp_handler; struct call_monologue; @@ -18,8 +17,8 @@ extern struct rtcp_handler *rtcp_transcode_handler; extern struct rtcp_handler *rtcp_sink_handler; -int rtcp_avp2savp(str *, struct crypto_context *, struct ssrc_ctx *); -int rtcp_savp2avp(str *, struct crypto_context *, struct ssrc_ctx *); +int rtcp_avp2savp(str *, struct crypto_context *, struct ssrc_entry_call *); +int rtcp_savp2avp(str *, struct crypto_context *, struct ssrc_entry_call *); int rtcp_payload(struct rtcp_packet **out, str *p, const str *s); @@ -31,6 +30,6 @@ rtcp_filter_func rtcp_avpf2avp_filter; void rtcp_init(void); -void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out); +void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out); #endif diff --git a/include/rtp.h b/include/rtp.h index 4c263ff86..c015dd1cb 100644 --- a/include/rtp.h +++ b/include/rtp.h @@ -9,18 +9,17 @@ struct crypto_context; struct rtp_header; struct ssrc_hash; -enum ssrc_dir; -struct ssrc_ctx; +struct ssrc_entry_call; struct codec_store; typedef GString crypto_debug_string; const rtp_payload_type *get_rtp_payload_type(unsigned int, struct codec_store *); -int rtp_avp2savp(str *, struct crypto_context *, struct ssrc_ctx *); -int rtp_savp2avp(str *, struct crypto_context *, struct ssrc_ctx *); +int rtp_avp2savp(str *, struct crypto_context *, struct ssrc_entry_call *); +int rtp_savp2avp(str *, struct crypto_context *, struct ssrc_entry_call *); -int rtp_update_index(str *, struct packet_stream *, struct ssrc_ctx *); +int rtp_update_index(str *, struct packet_stream *, struct ssrc_entry_call *); void rtp_append_mki(str *s, struct crypto_context *c, crypto_debug_string *); int srtp_payloads(str *to_auth, str *to_decrypt, str *auth_tag, str *mki, diff --git a/include/ssrc.h b/include/ssrc.h index ee54ce6e3..3f783af23 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -39,8 +39,26 @@ struct payload_tracker { unsigned char last_pts[16]; int last_pt_idx; }; -struct ssrc_ctx { - struct ssrc_entry_call *parent; + +struct ssrc_stats_block { + int64_t reported; + uint64_t jitter; // ms + uint64_t rtt; // us - combined from both sides + uint32_t rtt_leg; // RTT only for the leg receiving the RTCP report + uint64_t packetloss; // percent + uint64_t mos; // nominal range of 10 - 50 for MOS values 1.0 to 5.0 +}; + +struct ssrc_entry { + struct obj obj; + GList link; + mutex_t lock; + uint32_t ssrc; +}; + +struct ssrc_entry_call { + struct ssrc_entry h; // must be first + struct payload_tracker tracker; // XXX move entire crypto context in here? @@ -61,28 +79,7 @@ struct ssrc_ctx { sample_duplicates; int64_t next_rtcp; // for self-generated RTCP reports -}; -struct ssrc_stats_block { - int64_t reported; - uint64_t jitter; // ms - uint64_t rtt; // us - combined from both sides - uint32_t rtt_leg; // RTT only for the leg receiving the RTCP report - uint64_t packetloss; // percent - uint64_t mos; // nominal range of 10 - 50 for MOS values 1.0 to 5.0 -}; - -struct ssrc_entry { - struct obj obj; - GList link; - mutex_t lock; - uint32_t ssrc; -}; - -struct ssrc_entry_call { - struct ssrc_entry h; // must be first - struct ssrc_ctx input_ctx, - output_ctx; GQueue sender_reports; // as received via RTCP GQueue rr_time_reports; // as received via RTCP GQueue stats_blocks; // calculated @@ -106,10 +103,6 @@ struct ssrc_entry_call { // output only uint16_t seq_diff; }; -enum ssrc_dir { // these values must not be used externally - SSRC_DIR_INPUT = G_STRUCT_OFFSET(struct ssrc_entry_call, input_ctx), - SSRC_DIR_OUTPUT = G_STRUCT_OFFSET(struct ssrc_entry_call, output_ctx), -}; struct ssrc_time_item { int64_t received; @@ -209,9 +202,6 @@ INLINE void *get_ssrc(uint32_t ssrc, struct ssrc_hash *ht) { return get_ssrc_full(ssrc, ht, NULL); } -struct ssrc_ctx *get_ssrc_ctx(uint32_t, struct ssrc_hash *, enum ssrc_dir); // creates new entry if not found - - void ssrc_sender_report(struct call_media *, const struct ssrc_sender_report *, int64_t); void ssrc_receiver_report(struct call_media *, stream_fd *, const struct ssrc_receiver_report *, int64_t); void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *rr, int64_t); @@ -226,20 +216,13 @@ void payload_tracker_init(struct payload_tracker *t); void payload_tracker_add(struct payload_tracker *, int); -#define ssrc_ctx_put(c) \ - do { \ - struct ssrc_ctx **__cc = (c); \ - if ((__cc) && *(__cc)) { \ - obj_put(&(*__cc)->parent->h); \ - *(__cc) = NULL; \ - } \ - } while (0) -#define ssrc_ctx_hold(c) \ - do { \ - if (c) \ - obj_hold(&(c)->parent->h); \ - } while (0) - +#define ssrc_entry_release(c) do { \ + if (c) { \ + obj_put(&(c)->h); \ + c = NULL; \ + } \ +} while (0) +#define ssrc_entry_hold(c) obj_hold(&(c)->h) #endif diff --git a/lib/obj.h b/lib/obj.h index b8bf590cd..9c510d4e2 100644 --- a/lib/obj.h +++ b/lib/obj.h @@ -114,7 +114,8 @@ INLINE void __obj_put(struct obj *o); #endif -#define obj_release(op) do { if (op) obj_put_o((struct obj *) op); op = NULL; } while (0) +#define obj_release_o(op) do { if (op) obj_put_o((struct obj *) op); op = NULL; } while (0) +#define obj_release(op) do { if (op) obj_put(op); op = NULL; } while (0) diff --git a/t/aead-decrypt.c b/t/aead-decrypt.c index 363ceb522..e928330bb 100644 --- a/t/aead-decrypt.c +++ b/t/aead-decrypt.c @@ -58,14 +58,11 @@ int main(int argc, char **argv) { }; struct ssrc_entry_call se = { - .input_ctx = { - .parent = &se, - .stats = &stats, - }, + .stats = &stats, }; - int ret = rtp_savp2avp(&s, &cc, &se.input_ctx); + int ret = rtp_savp2avp(&s, &cc, &se); assert(ret == 0); - printf("idx %d ROC %d\n", se.input_ctx.stats->ext_seq, se.input_ctx.stats->ext_seq >> 16); + printf("idx %d ROC %d\n", se.stats->ext_seq, se.stats->ext_seq >> 16); return 0; } diff --git a/t/test-transcode.c b/t/test-transcode.c index 50b01918b..054984d99 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -261,13 +261,13 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media .call = &call, .media = media, .media_out = other_media, - .ssrc_in = get_ssrc_ctx(ssrc, &media->ssrc_hash_in, SSRC_DIR_INPUT), + .ssrc_in = get_ssrc(ssrc, &media->ssrc_hash_in), .sfd = &sfd, }; // from __stream_ssrc() if (!MEDIA_ISSET(media, TRANSCODING)) mp.ssrc_in->ssrc_map_out = ntohl(ssrc); - mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, &other_media->ssrc_hash_out, SSRC_DIR_OUTPUT); + mp.ssrc_out = get_ssrc(mp.ssrc_in->ssrc_map_out, &other_media->ssrc_hash_out); payload_tracker_add(&mp.ssrc_in->tracker, pt_in & 0x7f); int packet_len = sizeof(struct rtp_header) + pl.len; @@ -358,8 +358,8 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media } printf("test ok: %s:%i\n\n", file, line); free(packet); - ssrc_ctx_put(&mp.ssrc_in); - ssrc_ctx_put(&mp.ssrc_out); + ssrc_entry_release(mp.ssrc_in); + ssrc_entry_release(mp.ssrc_out); } #define packet(side, pt_in, pload, pt_out, pload_exp) \