Browse Source

MT#55283 eliminate input/output combination SSRC

Change-Id: I94636b7f6fb6fb31cd3bf332fffe0203e17f6c16
pull/1938/head
Richard Fuchs 8 months ago
parent
commit
cf22d82932
23 changed files with 302 additions and 366 deletions
  1. +6
    -6
      daemon/call.c
  2. +3
    -3
      daemon/call_interfaces.c
  3. +1
    -1
      daemon/cli.c
  4. +94
    -118
      daemon/codec.c
  5. +11
    -11
      daemon/dtmf.c
  6. +1
    -1
      daemon/main.c
  7. +10
    -10
      daemon/media_player.c
  8. +34
    -36
      daemon/media_socket.c
  9. +10
    -12
      daemon/mqtt.c
  10. +12
    -12
      daemon/redis.c
  11. +40
    -42
      daemon/rtcp.c
  12. +8
    -6
      daemon/rtp.c
  13. +17
    -32
      daemon/ssrc.c
  14. +1
    -1
      include/call.h
  15. +3
    -3
      include/codec.h
  16. +3
    -3
      include/media_player.h
  17. +5
    -5
      include/media_socket.h
  18. +3
    -4
      include/rtcp.h
  19. +4
    -5
      include/rtp.h
  20. +27
    -44
      include/ssrc.h
  21. +2
    -1
      lib/obj.h
  22. +3
    -6
      t/aead-decrypt.c
  23. +4
    -4
      t/test-transcode.c

+ 6
- 6
daemon/call.c View File

@ -213,7 +213,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
CALL_CLEAR(sfd->call, FOREIGN_MEDIA);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
struct ssrc_ctx *ctx = ps->ssrc_in[u];
struct ssrc_entry_call *ctx = ps->ssrc_in[u];
if (!ctx)
break;
@ -222,7 +222,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
atomic_get_na(&ctx->stats->last_pt));
}
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
struct ssrc_ctx *ctx = ps->ssrc_out[u];
struct ssrc_entry_call *ctx = ps->ssrc_out[u];
if (!ctx)
break;
@ -4008,7 +4008,7 @@ static void __call_cleanup(call_t *c) {
media_player_put(&ml->rec_player);
if (ml->tone_freqs)
g_array_free(ml->tone_freqs, true);
obj_release(ml->janus_session);
obj_release_o(ml->janus_session);
}
while (c->stream_fds.head) {
@ -4171,7 +4171,7 @@ void call_destroy(call_t *c) {
(unsigned int) local_endpoint->port,
FMT_M(addr, ps->endpoint.port),
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0),
FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0),
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),
@ -4361,9 +4361,9 @@ static void __call_free(call_t *c) {
t_queue_clear(&ps->sfds);
t_hash_table_destroy(ps->rtp_stats);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++)
ssrc_ctx_put(&ps->ssrc_in[u]);
ssrc_entry_release(ps->ssrc_in[u]);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++)
ssrc_ctx_put(&ps->ssrc_out[u]);
ssrc_entry_release(ps->ssrc_out[u]);
bufferpool_unref(ps->stats_in);
bufferpool_unref(ps->stats_out);
g_free(ps);


+ 3
- 3
daemon/call_interfaces.c View File

@ -2749,19 +2749,19 @@ static void ng_stats_endpoint(const ng_parser_t *parser, parser_arg dict, const
}
static void ng_stats_stream_ssrc(const ng_parser_t *parser, parser_arg dict,
struct ssrc_ctx *const ssrcs[RTPE_NUM_SSRC_TRACKING],
struct ssrc_entry_call *const ssrcs[RTPE_NUM_SSRC_TRACKING],
const char *label)
{
parser_arg list = parser->dict_add_list(dict, label);
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
struct ssrc_ctx *c = ssrcs[i];
struct ssrc_entry_call *c = ssrcs[i];
if (!c)
break;
parser_arg ssrc = parser->list_add_dict(list);
parser->dict_add_int(ssrc, "SSRC", ssrcs[i]->parent->h.ssrc);
parser->dict_add_int(ssrc, "SSRC", ssrcs[i]->h.ssrc);
parser->dict_add_int(ssrc, "bytes", atomic64_get_na(&c->stats->bytes));
parser->dict_add_int(ssrc, "packets", atomic64_get_na(&c->stats->packets));
parser->dict_add_int(ssrc, "last RTP timestamp", atomic_get_na(&c->stats->timestamp));


+ 1
- 1
daemon/cli.c View File

@ -808,7 +808,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml)
sockaddr_print_buf(&ps->endpoint.address),
ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0,
ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0,
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),


+ 94
- 118
daemon/codec.c View File

@ -58,7 +58,7 @@ static void codec_store_add_raw_order(struct codec_store *cs, rtp_payload_type *
static rtp_payload_type *codec_store_find_compatible(struct codec_store *cs,
const rtp_payload_type *pt);
static void __rtp_payload_type_add_name(codec_names_ht, rtp_payload_type *pt);
static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq);
static void codec_calc_lost(struct ssrc_entry_call *ssrc, uint16_t seq);
static void __codec_options_set(call_t *call, rtp_payload_type *pt, str_case_value_ht codec_set);
@ -422,7 +422,7 @@ static void __handler_shutdown(struct codec_handler *handler) {
delay_buffer_stop(&handler->delay_buffer);
}
obj_release(handler->ssrc_handler);
ssrc_entry_release(handler->ssrc_handler);
handler->kernelize = false;
handler->transcoder = false;
handler->output_handler = handler; // reset to default
@ -1315,7 +1315,7 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
rwlock_lock_r(&rt->call->master_lock);
// copy out references to SSRCs for lock-free handling
struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
struct ssrc_entry_call *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
@ -1323,7 +1323,7 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
if (!ps->ssrc_out[u]) // end of list
break;
ssrc_out[u] = ps->ssrc_out[u];
ssrc_ctx_hold(ssrc_out[u]);
ssrc_entry_hold(ssrc_out[u]);
}
mutex_unlock(&ps->out_lock);
}
@ -1340,7 +1340,7 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ssrc_out[u]) // end of list
break;
ssrc_ctx_put(&ssrc_out[u]);
ssrc_entry_release(ssrc_out[u]);
}
out:
@ -2011,7 +2011,7 @@ void mqtt_timer_start(struct mqtt_timer **mqtp, call_t *call, struct call_media
static void codec_timer_stop(struct codec_timer **ctp) {
if (!ctp)
return;
obj_release(*ctp);
obj_release_o(*ctp);
}
// master lock held in W
void rtcp_timer_stop(struct rtcp_timer **rtp) {
@ -2064,7 +2064,7 @@ static void codec_add_raw_packet_common(struct media_packet *mp, unsigned int cl
{
p->clockrate = clockrate;
if (mp->rtp && mp->ssrc_out) {
ssrc_ctx_hold(mp->ssrc_out);
ssrc_entry_hold(mp->ssrc_out);
p->ssrc_out = mp->ssrc_out;
if (!p->rtp)
p->rtp = mp->rtp;
@ -2139,36 +2139,19 @@ static int handler_func_passthrough(struct codec_handler *h, struct media_packet
#ifdef WITH_TRANSCODING
static void __ssrc_lock_both(struct media_packet *mp) {
struct ssrc_ctx *ssrc_in = mp->ssrc_in;
struct ssrc_entry_call *ssrc_in_p = ssrc_in->parent;
struct ssrc_ctx *ssrc_out = mp->ssrc_out;
struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent;
struct ssrc_entry_call *ssrc_in = mp->ssrc_in;
struct ssrc_entry_call *ssrc_out = mp->ssrc_out;
// we need a nested lock here - both input and output SSRC needs to be locked.
// we don't know the lock order, so try both, and keep trying until we succeed.
while (1) {
mutex_lock(&ssrc_in_p->h.lock);
if (ssrc_in_p == ssrc_out_p)
break;
if (!mutex_trylock(&ssrc_out_p->h.lock))
break;
mutex_unlock(&ssrc_in_p->h.lock);
mutex_lock(&ssrc_out_p->h.lock);
if (!mutex_trylock(&ssrc_in_p->h.lock))
break;
mutex_unlock(&ssrc_out_p->h.lock);
}
// nested lock: in first, out second
mutex_lock(&ssrc_in->h.lock);
mutex_lock(&ssrc_out->h.lock);
}
static void __ssrc_unlock_both(struct media_packet *mp) {
struct ssrc_ctx *ssrc_in = mp->ssrc_in;
struct ssrc_entry_call *ssrc_in_p = ssrc_in->parent;
struct ssrc_ctx *ssrc_out = mp->ssrc_out;
struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent;
struct ssrc_entry_call *ssrc_in = mp->ssrc_in;
struct ssrc_entry_call *ssrc_out = mp->ssrc_out;
mutex_unlock(&ssrc_in_p->h.lock);
if (ssrc_in_p != ssrc_out_p)
mutex_unlock(&ssrc_out_p->h.lock);
mutex_unlock(&ssrc_in->h.lock);
mutex_unlock(&ssrc_out->h.lock);
}
static void __seq_free(void *p) {
@ -2181,12 +2164,10 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
{
struct codec_handler *h = packet->handler;
struct ssrc_ctx *ssrc_in = mp->ssrc_in;
struct ssrc_entry_call *ssrc_in_p = ssrc_in->parent;
struct ssrc_ctx *ssrc_out = mp->ssrc_out;
struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent;
struct ssrc_entry_call *ssrc_in = mp->ssrc_in;
struct ssrc_entry_call *ssrc_out = mp->ssrc_out;
struct codec_ssrc_handler *ch = get_ssrc(ssrc_in_p->h.ssrc, &h->ssrc_hash);
struct codec_ssrc_handler *ch = get_ssrc(ssrc_in->h.ssrc, &h->ssrc_hash);
if (G_UNLIKELY(!ch)) {
__transcode_packet_free(packet);
return 0;
@ -2206,7 +2187,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
atomic64_inc_na(&mp->sfd->local_intf->stats->in.packets);
atomic64_add_na(&mp->sfd->local_intf->stats->in.bytes, mp->payload.len);
struct codec_ssrc_handler *input_ch = get_ssrc(ssrc_in_p->h.ssrc, &h->input_handler->ssrc_hash);
struct codec_ssrc_handler *input_ch = get_ssrc(ssrc_in->h.ssrc, &h->input_handler->ssrc_hash);
if (packet->bypass_seq) {
// bypass sequencer
@ -2225,13 +2206,13 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
__ssrc_lock_both(mp);
// get sequencer appropriate for our output
if (!ssrc_in_p->sequencers)
ssrc_in_p->sequencers = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __seq_free);
packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in_p->sequencers, mp->media_out);
if (!ssrc_in->sequencers)
ssrc_in->sequencers = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __seq_free);
packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->sequencers, mp->media_out);
if (!seq) {
seq = g_new0(__typeof(*seq), 1);
packet_sequencer_init(seq, (GDestroyNotify) __transcode_packet_free);
g_hash_table_insert(ssrc_in_p->sequencers, mp->media_out, seq);
g_hash_table_insert(ssrc_in->sequencers, mp->media_out, seq);
// this is a quick fix to restore sequencer values until upper layer behavior will be fixed
unsigned int stats_ext_seq = atomic_get_na(&ssrc_in->stats->ext_seq);
if(stats_ext_seq) {
@ -2253,7 +2234,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
ilogs(transcoding, LOG_DEBUG, "Ignoring duplicate RTP packet");
if (func_ret != 1)
__transcode_packet_free(packet);
ssrc_in_p->duplicates++;
ssrc_in->duplicates++;
atomic64_inc_na(&mp->sfd->local_intf->stats->s.duplicates);
RTPE_STATS_INC(rtp_duplicates);
goto out;
@ -2317,18 +2298,18 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
// new packet might have different handlers
h = packet->handler;
obj_release(ch);
obj_release(input_ch);
ch = get_ssrc(ssrc_in_p->h.ssrc, &h->ssrc_hash);
ssrc_entry_release(ch);
ssrc_entry_release(input_ch);
ch = get_ssrc(ssrc_in->h.ssrc, &h->ssrc_hash);
if (G_UNLIKELY(!ch))
goto next;
input_ch = get_ssrc(ssrc_in_p->h.ssrc, &h->input_handler->ssrc_hash);
input_ch = get_ssrc(ssrc_in->h.ssrc, &h->input_handler->ssrc_hash);
if (G_UNLIKELY(!input_ch)) {
obj_release(ch);
ssrc_entry_release(ch);
goto next;
}
ssrc_in_p->packets_lost = seq->lost_count;
ssrc_in->packets_lost = seq->lost_count;
atomic_set_na(&ssrc_in->stats->ext_seq, seq->ext_seq);
ilogs(transcoding, LOG_DEBUG, "Processing RTP packet: seq %u, TS %lu",
@ -2336,7 +2317,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
if (seq_ret == 1) {
// seq reset - update output seq. we keep our output seq clean
ssrc_out_p->seq_diff -= packet->p.seq - seq_ori;
ssrc_out->seq_diff -= packet->p.seq - seq_ori;
seq_ret = 0;
}
@ -2353,9 +2334,9 @@ next:
out:
__ssrc_unlock_both(mp);
obj_release(input_ch);
ssrc_entry_release(input_ch);
out_ch:
obj_release(ch);
ssrc_entry_release(ch);
mp->rtp = orig_rtp;
@ -2371,8 +2352,7 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch,
unsigned long ts_delay)
{
struct rtp_header *rh = (void *) buf;
struct ssrc_ctx *ssrc_out = mp->ssrc_out;
struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent;
struct ssrc_entry_call *ssrc_out = mp->ssrc_out;
// reconstruct RTP header
unsigned long ts = payload_ts;
ZERO(*rh);
@ -2383,9 +2363,9 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch,
if (seq != -1)
rh->seq_num = htons(seq);
else
rh->seq_num = htons(ntohs(mp->rtp->seq_num) + (ssrc_out_p->seq_diff += seq_inc));
rh->seq_num = htons(ntohs(mp->rtp->seq_num) + (ssrc_out->seq_diff += seq_inc));
rh->timestamp = htonl(ts);
rh->ssrc = htonl(ssrc_out_p->h.ssrc);
rh->ssrc = htonl(ssrc_out->h.ssrc);
// add to output queue
struct codec_packet *p = g_new0(__typeof(*p), 1);
@ -2398,7 +2378,7 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch,
p->rtp = rh;
p->ts = ts;
p->clockrate = handler->dest_pt.clock_rate;
ssrc_ctx_hold(ssrc_out);
ssrc_entry_hold(ssrc_out);
p->ssrc_out = ssrc_out;
int64_t ts_diff_us = 0;
@ -2480,7 +2460,7 @@ static struct codec_ssrc_handler *__output_ssrc_handler(struct codec_ssrc_handle
// our encoder is in a different codec handler
ilogs(transcoding, LOG_DEBUG, "Switching context from decoder to encoder");
handler = handler->output_handler;
struct codec_ssrc_handler *new_ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, &handler->ssrc_hash);
struct codec_ssrc_handler *new_ch = get_ssrc(mp->ssrc_in->h.ssrc, &handler->ssrc_hash);
if (G_UNLIKELY(!new_ch)) {
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT,
"Switched from input to output codec context, but no codec handler present");
@ -2559,7 +2539,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr
payload_type = h->real_dtmf_payload_type;
skip:
obj_release(output_ch);
ssrc_entry_release(output_ch);
char *buf = bufferpool_alloc(media_bufferpool,
packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM);
memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len);
@ -2569,7 +2549,7 @@ skip:
else // use our own sequencing
input_ch->codec_output_rtp_seq(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts,
packet->marker, payload_type, ts_delay);
mp->ssrc_out->parent->seq_diff++;
mp->ssrc_out->seq_diff++;
return 0;
}
@ -2694,12 +2674,12 @@ static tc_code packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_hand
ret = TCC_CONSUMED;
else
ret = packet_dtmf_fwd(ch, input_ch, dup, mp);
mp->ssrc_out->parent->seq_diff++;
mp->ssrc_out->seq_diff++;
if (ret != TCC_CONSUMED)
__transcode_packet_free(dup);
}
mp->ssrc_out->parent->seq_diff--;
mp->ssrc_out->seq_diff--;
// discard the received event
do_blocking = true;
@ -2803,7 +2783,7 @@ void codec_packet_free(struct codec_packet *p) {
p->free_func(p->s.s);
if (p->plain_free_func && p->plain.s)
p->plain_free_func(p->plain.s);
ssrc_ctx_put(&p->ssrc_out);
ssrc_entry_release(p->ssrc_out);
g_free(p);
}
bool codec_packet_copy(struct codec_packet *p) {
@ -2819,7 +2799,7 @@ struct codec_packet *codec_packet_dup(struct codec_packet *p) {
dup->link.data = dup; // XXX obsolete this
codec_packet_copy(dup);
if (dup->ssrc_out)
ssrc_ctx_hold(dup->ssrc_out);
ssrc_entry_hold(dup->ssrc_out);
if (dup->rtp)
dup->rtp = (void *) dup->s.s;
return dup;
@ -2977,7 +2957,7 @@ static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_p
// check for DTMF injection
if (h->dtmf_payload_type != -1) {
struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, &h->ssrc_hash);
struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->h.ssrc, &h->ssrc_hash);
if (ch) {
uint64_t ts64 = ntohl(mp->rtp->timestamp);
@ -3005,18 +2985,18 @@ static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_p
else if (!ch->dtmf_events.length)
ML_CLEAR(mp->media->monologue, DTMF_INJECTION_ACTIVE);
obj_release(ch);
ssrc_entry_release(ch);
}
}
// substitute out SSRC etc
mp->rtp->ssrc = htonl(mp->ssrc_out->parent->h.ssrc);
mp->rtp->ssrc = htonl(mp->ssrc_out->h.ssrc);
// to track our seq
unsigned short seq = ntohs(mp->rtp->seq_num);
while (true) {
mp->rtp->seq_num = htons(seq + mp->ssrc_out->parent->seq_diff);
mp->rtp->seq_num = htons(seq + mp->ssrc_out->seq_diff);
// keep track of other stats here?
@ -3025,7 +3005,7 @@ static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_p
if (duplicates == 0)
break;
duplicates--;
mp->ssrc_out->parent->seq_diff++;
mp->ssrc_out->seq_diff++;
}
// restore original in case it was mangled
@ -3084,7 +3064,7 @@ uint64_t codec_last_dtmf_event(struct codec_ssrc_handler *ch) {
return ev->ts;
}
uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_ctx *ssrc_in) {
uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_entry_call *ssrc_in) {
if (!ch || !ch->encoder) {
if (!ssrc_in)
return 0;
@ -3262,7 +3242,7 @@ static void __buffer_delay_seq(struct delay_buffer *dbuf, struct media_packet *m
return;
if (__buffer_delay_do_direct(dbuf)) {
mp->ssrc_out->parent->seq_diff += seq_adj;
mp->ssrc_out->seq_diff += seq_adj;
return;
}
@ -3271,7 +3251,7 @@ static void __buffer_delay_seq(struct delay_buffer *dbuf, struct media_packet *m
// peg the adjustment to the most recent frame if any
struct delay_frame *dframe = t_queue_peek_head(&dbuf->frames);
if (!dframe) {
mp->ssrc_out->parent->seq_diff += seq_adj;
mp->ssrc_out->seq_diff += seq_adj;
return;
}
@ -3331,7 +3311,7 @@ static bool __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *dec
// schedule timer if not running yet
if (!dtxb->ct.next) {
if (!dtxb->ssrc)
dtxb->ssrc = mp->ssrc_in->parent->h.ssrc;
dtxb->ssrc = mp->ssrc_in->h.ssrc;
dtxb->ct.next = mp->tv;
dtxb->ct.next += rtpe_config.dtx_delay_us;
timerthread_obj_schedule_abs(&dtxb->ct.tt_obj, dtxb->ct.next);
@ -3368,8 +3348,8 @@ static void delay_frame_free(struct delay_frame *dframe) {
av_frame_free(&dframe->frame);
g_free(dframe->mp.raw.s);
media_packet_release(&dframe->mp);
obj_release(dframe->ch);
obj_release(dframe->input_ch);
ssrc_entry_release(dframe->ch);
ssrc_entry_release(dframe->input_ch);
if (dframe->packet)
__transcode_packet_free(dframe->packet);
g_free(dframe);
@ -3387,8 +3367,8 @@ static void dtx_packet_free(struct dtx_packet *dtxp) {
if (dtxp->packet)
__transcode_packet_free(dtxp->packet);
media_packet_release(&dtxp->mp);
obj_release(dtxp->decoder_handler);
obj_release(dtxp->input_handler);
ssrc_entry_release(dtxp->decoder_handler);
ssrc_entry_release(dtxp->input_handler);
g_free(dtxp);
}
static void delay_buffer_stop(struct delay_buffer **pcmbp) {
@ -3569,7 +3549,7 @@ static void __delay_frame_process(struct delay_buffer *dbuf, struct delay_frame
}
if (dframe->seq_adj)
dframe->mp.ssrc_out->parent->seq_diff += dframe->seq_adj;
dframe->mp.ssrc_out->seq_diff += dframe->seq_adj;
}
static void __delay_send_later(struct codec_timer *ct) {
struct delay_buffer *dbuf = (void *) ct;
@ -3809,7 +3789,7 @@ static void __dtx_send_later(struct codec_timer *ct) {
shutdown = true;
else if (!ps->ssrc_in[0])
shutdown = true;
else if (dtxb->ssrc != ps->ssrc_in[0]->parent->h.ssrc)
else if (dtxb->ssrc != ps->ssrc_in[0]->h.ssrc)
shutdown = true;
else if (dtxb->ct.next == 0)
shutdown = true;
@ -3886,13 +3866,13 @@ static void __dtx_send_later(struct codec_timer *ct) {
// packet consumed - track seq
rwlock_lock_r(&call->master_lock);
__ssrc_lock_both(&mp_copy);
mp_copy.ssrc_out->parent->seq_diff--;
mp_copy.ssrc_out->seq_diff--;
__ssrc_unlock_both(&mp_copy);
rwlock_unlock_r(&call->master_lock);
}
obj_release(call);
obj_release(ch);
obj_release(input_ch);
ssrc_entry_release(ch);
ssrc_entry_release(input_ch);
if (dtxp)
dtx_packet_free(dtxp);
media_packet_release(&mp_copy);
@ -3978,8 +3958,8 @@ static void __dtx_send_later(struct codec_timer *ct) {
out:
obj_release(call);
obj_release(ch);
obj_release(input_ch);
ssrc_entry_release(ch);
ssrc_entry_release(input_ch);
if (dtxp)
dtx_packet_free(dtxp);
media_packet_release(&mp_copy);
@ -3997,7 +3977,7 @@ static void __dtx_shutdown(struct dtx_buffer *dtxb) {
ch->encoder->mux_dts = 0;
}
obj_release(dtxb->csh);
ssrc_entry_release(dtxb->csh);
}
obj_release(dtxb->call);
t_queue_clear_full(&dtxb->packets, dtx_packet_free);
@ -4358,7 +4338,7 @@ static struct ssrc_entry *__ssrc_handler_transcode_new(void *p) {
return &ch->h;
err:
obj_release(ch);
ssrc_entry_release(ch);
return NULL;
}
static struct ssrc_entry *__ssrc_handler_decode_new(void *p) {
@ -4384,7 +4364,7 @@ static struct ssrc_entry *__ssrc_handler_decode_new(void *p) {
return &ch->h;
err:
obj_release(ch);
ssrc_entry_release(ch);
return NULL;
}
static int __encoder_flush(encoder_t *enc, void *u1, void *u2) {
@ -4545,7 +4525,7 @@ static void packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, stru
+ fraction_divl(pkt->pts, cr_fact),
ch->rtp_mark ? 1 : 0,
payload_type, ts_delay);
mp->ssrc_out->parent->seq_diff++;
mp->ssrc_out->seq_diff++;
ch->rtp_mark = 0;
if (!repeats)
break;
@ -4662,7 +4642,7 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v
discard:
av_frame_free(&frame);
obj_release(new_ch);
ssrc_entry_release(new_ch);
return 0;
}
@ -4836,33 +4816,29 @@ void codec_update_all_source_handlers(struct call_monologue *ml, const sdp_ng_fl
}
void codec_calc_jitter(struct ssrc_ctx *ssrc, unsigned long ts, unsigned int clockrate, int64_t tv) {
void codec_calc_jitter(struct ssrc_entry_call *ssrc, unsigned long ts, unsigned int clockrate, int64_t tv) {
if (!ssrc || !clockrate)
return;
struct ssrc_entry_call *sec = ssrc->parent;
// RFC 3550 A.8
uint32_t transit = (((tv / 1000) * clockrate) / 1000) - ts;
mutex_lock(&sec->h.lock);
LOCK(&ssrc->h.lock);
int32_t d = 0;
if (sec->transit)
d = transit - sec->transit;
sec->transit = transit;
if (ssrc->transit)
d = transit - ssrc->transit;
ssrc->transit = transit;
if (d < 0)
d = -d;
// ignore implausibly large values
if (d < 100000)
sec->jitter += d - ((sec->jitter + 8) >> 4);
mutex_unlock(&sec->h.lock);
ssrc->jitter += d - ((ssrc->jitter + 8) >> 4);
}
static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) {
struct ssrc_entry_call *s = ssrc->parent;
LOCK(&s->h.lock);
static void codec_calc_lost(struct ssrc_entry_call *ssrc, uint16_t seq) {
LOCK(&ssrc->h.lock);
// XXX shared code from kernel module
uint32_t last_seq = s->last_seq_tracked;
uint32_t last_seq = ssrc->last_seq_tracked;
uint32_t new_seq = last_seq;
// old seq or seq reset?
@ -4873,8 +4849,8 @@ static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) {
else if (seq_diff > 0x100) {
// reset seq and loss tracker
new_seq = seq;
s->last_seq_tracked = seq;
s->lost_bits = -1;
ssrc->last_seq_tracked = seq;
ssrc->lost_bits = -1;
}
else {
// seq wrap?
@ -4885,20 +4861,20 @@ static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) {
break;
}
seq_diff = new_seq - last_seq;
s->last_seq_tracked = new_seq;
ssrc->last_seq_tracked = new_seq;
// shift loss tracker bit field and count losses
if (seq_diff >= (sizeof(s->lost_bits) * 8)) {
if (seq_diff >= (sizeof(ssrc->lost_bits) * 8)) {
// complete loss
s->packets_lost += sizeof(s->lost_bits) * 8;
s->lost_bits = -1;
ssrc->packets_lost += sizeof(ssrc->lost_bits) * 8;
ssrc->lost_bits = -1;
}
else {
while (seq_diff) {
// shift out one bit and see if we lost it
if ((s->lost_bits & 0x80000000) == 0)
s->packets_lost++;
s->lost_bits <<= 1;
if ((ssrc->lost_bits & 0x80000000) == 0)
ssrc->packets_lost++;
ssrc->lost_bits <<= 1;
seq_diff--;
}
}
@ -4906,8 +4882,8 @@ static void codec_calc_lost(struct ssrc_ctx *ssrc, uint16_t seq) {
// track this frame as being seen
seq_diff = (new_seq & 0xffff) - seq;
if (seq_diff < (sizeof(s->lost_bits) * 8))
s->lost_bits |= (1 << seq_diff);
if (seq_diff < (sizeof(ssrc->lost_bits) * 8))
ssrc->lost_bits |= (1 << seq_diff);
}
@ -4977,12 +4953,12 @@ static int handler_func_inject_dtmf(struct codec_handler *h, struct media_packet
h->input_handler = __input_handler(h, mp);
h->output_handler = h->input_handler;
struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, &h->ssrc_hash);
struct codec_ssrc_handler *ch = get_ssrc(mp->ssrc_in->h.ssrc, &h->ssrc_hash);
if (!ch)
return 0;
decoder_input_data(ch->decoder, &mp->payload, mp->rtp->timestamp,
h->packet_decoded, ch, mp);
obj_release(ch);
ssrc_entry_release(ch);
return 0;
}
@ -6267,8 +6243,8 @@ static void codec_timers_run(void *p) {
#ifdef WITH_TRANSCODING
static void transcode_job_free(struct transcode_job *j) {
media_packet_release(&j->mp);
obj_release(j->ch);
obj_release(j->input_ch);
ssrc_entry_release(j->ch);
ssrc_entry_release(j->input_ch);
if (j->packet)
__transcode_packet_free(j->packet);
g_free(j);


+ 11
- 11
daemon/dtmf.c View File

@ -715,7 +715,7 @@ int dtmf_code_from_char(char c) {
// takes over the csh reference
static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *sink,
struct call_monologue *monologue,
struct packet_stream *ps, struct ssrc_ctx *ssrc_in, struct codec_handler *ch,
struct packet_stream *ps, struct ssrc_entry_call *ssrc_in, struct codec_handler *ch,
struct codec_ssrc_handler *csh,
int code, int volume, int duration, int pause)
{
@ -725,13 +725,13 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *
struct sink_handler *sh = l->data;
struct packet_stream *sink_ps = sh->sink;
__auto_type sink_media = sink_ps->media;
packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->parent->sequencers, sink_ps->media);
packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->sequencers, sink_ps->media);
if (!seq)
continue;
struct ssrc_ctx *ssrc_out = get_ssrc_ctx(sh->attrs.transcoding ?
ssrc_in->ssrc_map_out : ssrc_in->parent->h.ssrc,
&sink_media->ssrc_hash_out, SSRC_DIR_OUTPUT);
struct ssrc_entry_call *ssrc_out = get_ssrc(sh->attrs.transcoding ?
ssrc_in->ssrc_map_out : ssrc_in->h.ssrc,
&sink_media->ssrc_hash_out);
if (!ssrc_out)
return "No output SSRC context present"; // XXX generate stream
@ -750,7 +750,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *
.m_pt = 0xff,
.timestamp = 0,
.seq_num = htons(seq->seq),
.ssrc = htonl(ssrc_in->parent->h.ssrc),
.ssrc = htonl(ssrc_in->h.ssrc),
};
struct media_packet packet = {
.tv = rtpe_now,
@ -788,7 +788,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *
media_socket_dequeue(&packet, sink_ps);
obj_put_o((struct obj *) csh);
ssrc_ctx_put(&ssrc_out);
ssrc_entry_release(ssrc_out);
}
return 0;
@ -802,7 +802,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura
if (!media->streams.head)
return "Media doesn't have an RTP stream";
struct packet_stream *ps = media->streams.head->data;
struct ssrc_ctx *ssrc_in = ps->ssrc_in[0];
struct ssrc_entry_call *ssrc_in = ps->ssrc_in[0];
if (!ssrc_in)
return "No SSRC context present for DTMF injection"; // XXX fall back to generating stream
@ -834,9 +834,9 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura
ch->source_pt.payload_type,
ch->dest_pt.payload_type,
ch_pt,
ssrc_in->parent->h.ssrc);
ssrc_in->h.ssrc);
csh = get_ssrc(ssrc_in->parent->h.ssrc, &ch->ssrc_hash);
csh = get_ssrc(ssrc_in->h.ssrc, &ch->ssrc_hash);
if (!csh)
continue;
break;
@ -857,7 +857,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura
ilog(LOG_DEBUG, "Injecting RFC DTMF event #%i for %i ms (vol %i) from '" STR_FORMAT "' (media #%u) "
"into RTP PT %i, SSRC %" PRIx32,
code, duration, volume, STR_FMT(&monologue->tag), media->index, pt,
ssrc_in->parent->h.ssrc);
ssrc_in->h.ssrc);
// synthesise start and stop events
// the num_samples needs to be based on the the previous packet timestamp so we need to


+ 1
- 1
daemon/main.c View File

@ -607,7 +607,7 @@ static void create_listeners(const GQueue *endpoints_in, GQueue *objects_out,
static void release_listeners(GQueue *q) {
while (q->length) {
struct obj *o = g_queue_pop_head(q);
obj_release(o);
obj_release_o(o);
}
}


+ 10
- 10
daemon/media_player.c View File

@ -180,7 +180,7 @@ static void media_player_shutdown(struct media_player *mp) {
unsigned int num = send_timer_flush(mp->sink->send_timer, mp->coder.handler);
ilog(LOG_DEBUG, "%u packets removed from send queue", num);
// roll back seq numbers already used
mp->ssrc_out->parent->seq_diff -= num;
mp->ssrc_out->seq_diff -= num;
}
if (mp->opts.block_egress && mp->media)
@ -224,7 +224,7 @@ long long media_player_stop(struct media_player *mp) {
#ifdef WITH_TRANSCODING
static void __media_player_free(struct media_player *mp) {
media_player_shutdown(mp);
ssrc_ctx_put(&mp->ssrc_out);
ssrc_entry_release(mp->ssrc_out);
mutex_destroy(&mp->lock);
obj_put(mp->call);
av_packet_free(&mp->coder.pkt);
@ -235,7 +235,7 @@ static void __media_player_free(struct media_player *mp) {
// call->master_lock held in W
void media_player_new(struct media_player **mpp, struct call_monologue *ml, struct ssrc_ctx *prev_ssrc,
void media_player_new(struct media_player **mpp, struct call_monologue *ml, struct ssrc_entry_call *prev_ssrc,
media_player_opts_t *opts)
{
#ifdef WITH_TRANSCODING
@ -305,7 +305,7 @@ struct send_timer *send_timer_new(struct packet_stream *ps) {
}
// call is locked in R
static void send_timer_rtcp(struct send_timer *st, struct ssrc_ctx *ssrc_out) {
static void send_timer_rtcp(struct send_timer *st, struct ssrc_entry_call *ssrc_out) {
struct call_media *media = st->sink ? st->sink->media : NULL;
if (!media)
return;
@ -403,11 +403,11 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet
}
// do we send RTCP?
struct ssrc_ctx *ssrc_out = cp->ssrc_out;
struct ssrc_entry_call *ssrc_out = cp->ssrc_out;
if (ssrc_out && ssrc_out->next_rtcp) {
mutex_lock(&ssrc_out->parent->h.lock);
mutex_lock(&ssrc_out->h.lock);
int64_t diff = ssrc_out->next_rtcp - rtpe_now;
mutex_unlock(&ssrc_out->parent->h.lock);
mutex_unlock(&ssrc_out->h.lock);
if (diff < 0)
send_timer_rtcp(st, ssrc_out);
}
@ -597,7 +597,7 @@ static void media_player_kernel_player_start_now(struct media_player *mp) {
.pt = dst_pt->payload_type,
.seq = mp->seq,
.ts = mp->buffer_ts,
.ssrc = mp->ssrc_out->parent->h.ssrc,
.ssrc = mp->ssrc_out->h.ssrc,
.repeat = mp->opts.repeat,
.stats = mp->sink->stats_out,
.iface_stats = mp->sink->selected_sfd->local_intf->stats,
@ -1142,8 +1142,8 @@ void media_player_set_media(struct media_player *mp, struct call_media *media) {
mp->sink = media->streams.head->data;
mp->crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP], media, true);
}
if (!mp->ssrc_out || mp->ssrc_out->parent->h.ssrc != mp->ssrc) {
struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(mp->ssrc, &media->ssrc_hash_out, SSRC_DIR_OUTPUT);
if (!mp->ssrc_out || mp->ssrc_out->h.ssrc != mp->ssrc) {
struct ssrc_entry_call *ssrc_ctx = get_ssrc(mp->ssrc, &media->ssrc_hash_out);
ssrc_ctx->next_rtcp = rtpe_now;
mp->ssrc_out = ssrc_ctx;
}


+ 34
- 36
daemon/media_socket.c View File

@ -107,10 +107,10 @@ static int __k_null(struct rtpengine_srtp *s, struct packet_stream *);
static int __k_srtp_encrypt(struct rtpengine_srtp *s, struct packet_stream *);
static int __k_srtp_decrypt(struct rtpengine_srtp *s, struct packet_stream *);
static int call_avp2savp_rtp(str *s, struct packet_stream *, struct ssrc_ctx *);
static int call_savp2avp_rtp(str *s, struct packet_stream *, struct ssrc_ctx *);
static int call_avp2savp_rtcp(str *s, struct packet_stream *, struct ssrc_ctx *);
static int call_savp2avp_rtcp(str *s, struct packet_stream *, struct ssrc_ctx *);
static int call_avp2savp_rtp(str *s, struct packet_stream *, struct ssrc_entry_call *);
static int call_savp2avp_rtp(str *s, struct packet_stream *, struct ssrc_entry_call *);
static int call_avp2savp_rtcp(str *s, struct packet_stream *, struct ssrc_entry_call *);
static int call_savp2avp_rtcp(str *s, struct packet_stream *, struct ssrc_entry_call *);
static struct logical_intf *__get_logical_interface(const str *name, sockfamily_t *fam);
@ -1438,19 +1438,19 @@ static void stream_fd_closed(int fd, void *p) {
static int call_avp2savp_rtp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx)
static int call_avp2savp_rtp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx)
{
return rtp_avp2savp(s, &stream->crypto, ssrc_ctx);
}
static int call_avp2savp_rtcp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx)
static int call_avp2savp_rtcp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx)
{
return rtcp_avp2savp(s, &stream->crypto, ssrc_ctx);
}
static int call_savp2avp_rtp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx)
static int call_savp2avp_rtp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx)
{
return rtp_savp2avp(s, &stream->selected_sfd->crypto, ssrc_ctx);
}
static int call_savp2avp_rtcp(str *s, struct packet_stream *stream, struct ssrc_ctx *ssrc_ctx)
static int call_savp2avp_rtcp(str *s, struct packet_stream *stream, struct ssrc_entry_call *ssrc_ctx)
{
return rtcp_savp2avp(s, &stream->selected_sfd->crypto, ssrc_ctx);
}
@ -1606,7 +1606,7 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
reti->track_ssrc = 1;
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (stream->ssrc_in[u]) {
reti->ssrc[u] = htonl(stream->ssrc_in[u]->parent->h.ssrc);
reti->ssrc[u] = htonl(stream->ssrc_in[u]->h.ssrc);
reti->ssrc_stats[u] = stream->ssrc_in[u]->stats;
}
}
@ -1769,7 +1769,7 @@ static const char *kernelize_one(kernelize_state *s,
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (sink->ssrc_out[u]) {
// XXX order can be different from ingress?
redi->output.seq_offset[u] = sink->ssrc_out[u]->parent->seq_diff;
redi->output.seq_offset[u] = sink->ssrc_out[u]->seq_diff;
redi->output.ssrc_stats[u] = sink->ssrc_out[u]->stats;
}
@ -1891,7 +1891,7 @@ no_kernel:
}
// must be called with appropriate locks (master lock and/or in/out_lock)
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx)
{
for (unsigned int v = 0; v < RTPE_NUM_SSRC_TRACKING; v++) {
@ -1899,14 +1899,14 @@ int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACK
unsigned int idx = (start_idx + v) % RTPE_NUM_SSRC_TRACKING;
if (!list[idx])
continue;
if (list[idx]->parent->h.ssrc != ssrc)
if (list[idx]->h.ssrc != ssrc)
continue;
return idx;
}
return -1;
}
// must be called with appropriate locks (master lock and/or in/out_lock)
struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx)
{
int idx = __hunt_ssrc_ctx_idx(ssrc, list, start_idx);
@ -2070,9 +2070,9 @@ noop:
// returns non-null with reason string if stream should be removed from kernel
static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, mutex_t *lock,
struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p,
struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p,
uint32_t output_ssrc,
struct ssrc_ctx **output, struct ssrc_hash *ssrc_hash, enum ssrc_dir dir, const char *label)
struct ssrc_entry_call **output, struct ssrc_hash *ssrc_hash, const char *label)
{
const char *ret = NULL;
@ -2086,10 +2086,10 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
*ctx_idx_p = (*ctx_idx_p + 1) % RTPE_NUM_SSRC_TRACKING;
// eject old entry if present
if (list[ctx_idx])
ssrc_ctx_put(&list[ctx_idx]);
ssrc_entry_release(list[ctx_idx]);
// get new entry
list[ctx_idx] =
get_ssrc_ctx(ssrc, ssrc_hash, dir);
get_ssrc(ssrc, ssrc_hash);
ret = "SSRC changed";
ilog(LOG_DEBUG, "New %s SSRC for: %s%s:%d SSRC: %x%s", label,
@ -2097,17 +2097,16 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
}
if (ctx_idx != 0) {
// move most recent entry to front of the list
struct ssrc_ctx *tmp = list[0];
struct ssrc_entry_call *tmp = list[0];
list[0] = list[ctx_idx];
list[ctx_idx] = tmp;
ctx_idx = 0;
}
// extract and hold entry
if (*output)
ssrc_ctx_put(output);
ssrc_entry_release(*output);
*output = list[ctx_idx];
ssrc_ctx_hold(*output);
ssrc_entry_hold(*output);
// reverse SSRC mapping
if (!output_ssrc)
@ -2121,26 +2120,27 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
// check and update input SSRC pointers
// returns non-null with reason string if stream should be removed from kernel
static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
struct ssrc_entry_call **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress");
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, "ingress");
}
// check and update output SSRC pointers
// returns non-null with reason string if stream should be removed from kernel
static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ssrc_bs,
struct ssrc_ctx *ssrc_in, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash,
struct ssrc_entry_call *ssrc_in, struct ssrc_entry_call **ssrc_out_p,
struct ssrc_hash *ssrc_hash,
bool ssrc_change)
{
if (ssrc_change)
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash,
"egress (mapped)");
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash,
"egress (direct)");
}
@ -2674,9 +2674,9 @@ void media_packet_copy(struct media_packet *dst, const struct media_packet *src)
if (dst->sfd)
obj_hold(dst->sfd);
if (dst->ssrc_in)
obj_hold(&dst->ssrc_in->parent->h);
ssrc_entry_hold(dst->ssrc_in);
if (dst->ssrc_out)
obj_hold(&dst->ssrc_out->parent->h);
ssrc_entry_hold(dst->ssrc_out);
dst->rtp = __g_memdup(src->rtp, sizeof(*src->rtp));
dst->rtcp = __g_memdup(src->rtcp, sizeof(*src->rtcp));
dst->payload = STR_NULL;
@ -2685,10 +2685,8 @@ void media_packet_copy(struct media_packet *dst, const struct media_packet *src)
void media_packet_release(struct media_packet *mp) {
if (mp->sfd)
obj_put(mp->sfd);
if (mp->ssrc_in)
obj_put(&mp->ssrc_in->parent->h);
if (mp->ssrc_out)
obj_put(&mp->ssrc_out->parent->h);
ssrc_entry_release(mp->ssrc_in);
ssrc_entry_release(mp->ssrc_out);
media_socket_dequeue(mp, NULL);
g_free(mp->rtp);
g_free(mp->rtcp);
@ -2981,7 +2979,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
next_mirror:
media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left
ssrc_ctx_put(&mirror_phc.mp.ssrc_out);
ssrc_entry_release(mirror_phc.mp.ssrc_out);
}
}
@ -3020,7 +3018,7 @@ err_next:
next:
media_socket_dequeue(&phc->mp, NULL); // just free if anything left
ssrc_ctx_put(&phc->mp.ssrc_out);
ssrc_entry_release(phc->mp.ssrc_out);
}
///////////////// INGRESS POST-PROCESSING HANDLING
@ -3072,9 +3070,9 @@ out:
rwlock_unlock_r(&phc->mp.call->master_lock);
media_socket_dequeue(&phc->mp, NULL); // just free
ssrc_ctx_put(&phc->mp.ssrc_out);
ssrc_entry_release(phc->mp.ssrc_out);
ssrc_ctx_put(&phc->mp.ssrc_in);
ssrc_entry_release(phc->mp.ssrc_in);
rtcp_list_free(&phc->rtcp_list);
g_queue_clear_full(&free_list, bufferpool_unref);


+ 10
- 12
daemon/mqtt.c View File

@ -26,7 +26,7 @@ static bool is_connected = false;
static struct interface_sampled_rate_stats interface_rate_stats;
static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media);
static void mqtt_ssrc_stats(struct ssrc_entry_call *ssrc, JsonBuilder *json, struct call_media *media);
@ -202,14 +202,12 @@ static void mqtt_monologue_stats(struct call_monologue *ml, JsonBuilder *json) {
}
static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media) {
static void mqtt_ssrc_stats(struct ssrc_entry_call *ssrc, JsonBuilder *json, struct call_media *media) {
if (!ssrc || !media)
return;
struct ssrc_entry_call *sc = ssrc->parent;
json_builder_set_member_name(json, "SSRC");
json_builder_add_int_value(json, sc->h.ssrc);
json_builder_add_int_value(json, ssrc->h.ssrc);
unsigned char prim_pt = 255;
mutex_lock(&ssrc->tracker.lock);
@ -245,8 +243,8 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal
int64_t packets, octets, packets_lost, duplicates;
packets = atomic64_get_na(&ssrc->stats->packets);
octets = atomic64_get_na(&ssrc->stats->bytes);
packets_lost = sc->packets_lost;
duplicates = sc->duplicates;
packets_lost = ssrc->packets_lost;
duplicates = ssrc->duplicates;
// process per-second stats
int64_t cur_ts = rtpe_now;
@ -296,16 +294,16 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal
json_builder_add_double_value(json, (double) duplicates * 1000000.0 / usecs_diff);
}
mutex_lock(&sc->h.lock);
uint32_t jitter = sc->jitter;
mutex_lock(&ssrc->h.lock);
uint32_t jitter = ssrc->jitter;
int64_t mos = -1, rtt = -1, rtt_leg = -1;
if (sc->stats_blocks.length) {
struct ssrc_stats_block *sb = sc->stats_blocks.tail->data;
if (ssrc->stats_blocks.length) {
struct ssrc_stats_block *sb = ssrc->stats_blocks.tail->data;
mos = sb->mos;
rtt = sb->rtt;
rtt_leg = sb->rtt_leg;
}
mutex_unlock(&sc->h.lock);
mutex_unlock(&ssrc->h.lock);
if (clockrate) {
json_builder_set_member_name(json, "jitter");


+ 12
- 12
daemon/redis.c View File

@ -1981,15 +1981,15 @@ static void json_build_ssrc_iter(const ng_parser_t *parser, parser_arg dict, hel
struct ssrc_entry_call *se_out = get_ssrc(ssrc, &md->ssrc_hash_out);
if (se_in) {
atomic_set_na(&se_in->input_ctx.stats->ext_seq, parser_get_ll(dict, "in_srtp_index"));
atomic_set_na(&se_in->input_ctx.stats->rtcp_seq, parser_get_ll(dict, "in_srtcp_index"));
payload_tracker_add(&se_in->input_ctx.tracker, parser_get_ll(dict, "in_payload_type"));
atomic_set_na(&se_in->stats->ext_seq, parser_get_ll(dict, "in_srtp_index"));
atomic_set_na(&se_in->stats->rtcp_seq, parser_get_ll(dict, "in_srtcp_index"));
payload_tracker_add(&se_in->tracker, parser_get_ll(dict, "in_payload_type"));
obj_put(&se_in->h);
}
if (se_out) {
atomic_set_na(&se_out->output_ctx.stats->ext_seq, parser_get_ll(dict, "out_srtp_index"));
atomic_set_na(&se_out->output_ctx.stats->rtcp_seq, parser_get_ll(dict, "out_srtcp_index"));
payload_tracker_add(&se_out->output_ctx.tracker, parser_get_ll(dict, "out_payload_type"));
atomic_set_na(&se_out->stats->ext_seq, parser_get_ll(dict, "out_srtp_index"));
atomic_set_na(&se_out->stats->rtcp_seq, parser_get_ll(dict, "out_srtcp_index"));
payload_tracker_add(&se_out->tracker, parser_get_ll(dict, "out_payload_type"));
obj_put(&se_out->h);
}
}
@ -2728,12 +2728,12 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) {
JSON_SET_SIMPLE("ssrc", "%" PRIu32, se->h.ssrc);
// XXX use function for in/out
JSON_SET_SIMPLE("in_srtp_index", "%u", atomic_get_na(&se->input_ctx.stats->ext_seq));
JSON_SET_SIMPLE("in_srtcp_index", "%u", atomic_get_na(&se->input_ctx.stats->rtcp_seq));
JSON_SET_SIMPLE("in_payload_type", "%i", se->input_ctx.tracker.most[0]);
JSON_SET_SIMPLE("out_srtp_index", "%u", atomic_get_na(&se->output_ctx.stats->ext_seq));
JSON_SET_SIMPLE("out_srtcp_index", "%u", atomic_get_na(&se->output_ctx.stats->rtcp_seq));
JSON_SET_SIMPLE("out_payload_type", "%i", se->output_ctx.tracker.most[0]);
JSON_SET_SIMPLE("in_srtp_index", "%u", atomic_get_na(&se->stats->ext_seq));
JSON_SET_SIMPLE("in_srtcp_index", "%u", atomic_get_na(&se->stats->rtcp_seq));
JSON_SET_SIMPLE("in_payload_type", "%i", se->tracker.most[0]);
//JSON_SET_SIMPLE("out_srtp_index", "%u", atomic_get_na(&se->output_ctx.stats->ext_seq));
//JSON_SET_SIMPLE("out_srtcp_index", "%u", atomic_get_na(&se->output_ctx.stats->rtcp_seq));
//JSON_SET_SIMPLE("out_payload_type", "%i", se->output_ctx.tracker.most[0]);
// XXX add rest of info
}
} // --- for medias.head


+ 40
- 42
daemon/rtcp.c View File

@ -282,6 +282,9 @@ struct rtcp_handlers {
*homer;
};
TYPED_GQUEUE(ssrc, struct ssrc_entry_call)
TYPED_GQUEUE(ssrc_rr, struct ssrc_receiver_report)
// log handler function prototypes
// scratch area (prepare/parse packet)
@ -840,7 +843,7 @@ error:
}
/* rfc 3711 section 3.4 */
int rtcp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
int rtcp_avp2savp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) {
struct rtcp_packet *rtcp;
unsigned int i;
uint32_t *idx;
@ -888,7 +891,7 @@ int rtcp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
/* rfc 3711 section 3.4 */
int rtcp_savp2avp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
int rtcp_savp2avp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) {
struct rtcp_packet *rtcp;
str payload, to_auth, to_decrypt, auth_tag;
uint32_t idx;
@ -1287,7 +1290,7 @@ static void mos_xr_voip_metrics(struct rtcp_process_ctx *ctx, const struct xr_rb
static void transcode_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) {
if (!ctx->mp->ssrc_in)
return;
if (ctx->scratch_common_ssrc != ctx->mp->ssrc_in->parent->h.ssrc)
if (ctx->scratch_common_ssrc != ctx->mp->ssrc_in->h.ssrc)
return;
// forward SSRC mapping
common->ssrc = htonl(ctx->mp->ssrc_in->ssrc_map_out);
@ -1297,23 +1300,21 @@ static void transcode_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *c
static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) {
if (!ctx->mp->ssrc_in)
return;
if (ctx->scratch.rr.from != ctx->mp->ssrc_in->parent->h.ssrc)
if (ctx->scratch.rr.from != ctx->mp->ssrc_in->h.ssrc)
return;
if (!ctx->mp->media)
return;
// reverse SSRC mapping
struct ssrc_ctx *map_ctx = get_ssrc_ctx(ctx->scratch.rr.ssrc, &ctx->mp->media->ssrc_hash_out,
SSRC_DIR_OUTPUT);
struct ssrc_entry_call *map_ctx = get_ssrc(ctx->scratch.rr.ssrc, &ctx->mp->media->ssrc_hash_out);
rr->ssrc = htonl(map_ctx->ssrc_map_out);
if (!ctx->mp->media_out)
return;
// for reception stats
struct ssrc_ctx *input_ctx = get_ssrc_ctx(map_ctx->ssrc_map_out,
&ctx->mp->media_out->ssrc_hash_in,
SSRC_DIR_INPUT);
struct ssrc_entry_call *input_ctx = get_ssrc(map_ctx->ssrc_map_out,
&ctx->mp->media_out->ssrc_hash_in);
if (!input_ctx)
return;
@ -1326,8 +1327,8 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
if (!packets)
goto out;
unsigned int lost = input_ctx->parent->packets_lost;
unsigned int dupes = input_ctx->parent->duplicates;
unsigned int lost = input_ctx->packets_lost;
unsigned int dupes = input_ctx->duplicates;
unsigned int tot_lost = lost - dupes; // can be negative/rollover
ilogs(rtcp, LOG_DEBUG, "Substituting RTCP RR SSRC from %s%x%s to %x: %u packets, %u lost, %u duplicates",
@ -1353,14 +1354,13 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
// XXX jitter, last SR
out:
if (input_ctx)
obj_put(&input_ctx->parent->h);
obj_put(&map_ctx->parent->h);
ssrc_entry_release(input_ctx);
ssrc_entry_release(map_ctx);
}
static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
if (!ctx->mp->ssrc_in)
return;
if (ctx->scratch.sr.ssrc != ctx->mp->ssrc_in->parent->h.ssrc)
if (ctx->scratch.sr.ssrc != ctx->mp->ssrc_in->h.ssrc)
return;
if (!ctx->mp->ssrc_out)
return;
@ -1410,8 +1410,8 @@ void rtcp_init(void) {
static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
uint32_t ssrc, uint32_t ssrc_out, uint32_t ts, uint32_t packets, uint32_t octets, GQueue *rrs,
GQueue *srrs)
uint32_t ssrc, uint32_t ssrc_out, uint32_t ts, uint32_t packets, uint32_t octets, ssrc_q *rrs,
ssrc_rr_q *srrs)
{
GString *ret = g_string_sized_new(128);
g_string_set_size(ret, sizeof(struct sender_report_packet));
@ -1444,7 +1444,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
// receiver reports
int i = 0, n = 0;
while (rrs->length) {
struct ssrc_ctx *s = g_queue_pop_head(rrs);
struct ssrc_entry_call *s = t_queue_pop_head(rrs);
if (i < 30) {
g_string_set_size(ret, ret->len + sizeof(struct report_block));
struct report_block *rr = (void *) ret->str + ret->len - sizeof(struct report_block);
@ -1452,23 +1452,22 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
// XXX unify with transcode_rr
// last received SR?
struct ssrc_entry_call *se = s->parent;
int64_t tv_diff = 0;
uint32_t ntp_middle_bits = 0;
mutex_lock(&se->h.lock);
if (se->sender_reports.length) {
struct ssrc_time_item *si = se->sender_reports.tail->data;
mutex_lock(&s->h.lock);
if (s->sender_reports.length) {
struct ssrc_time_item *si = s->sender_reports.tail->data;
tv_diff = rtpe_now - si->received;
ntp_middle_bits = si->ntp_middle_bits;
}
uint32_t jitter = se->jitter;
mutex_unlock(&se->h.lock);
uint32_t jitter = s->jitter;
mutex_unlock(&s->h.lock);
uint64_t lost = se->packets_lost;
uint64_t lost = s->packets_lost;
uint64_t tot = atomic64_get(&s->stats->packets);
*rr = (struct report_block) {
.ssrc = htonl(s->parent->h.ssrc),
.ssrc = htonl(s->h.ssrc),
.fraction_lost = lost * 256 / (tot + lost),
.number_lost[0] = (lost >> 16) & 0xff,
.number_lost[1] = (lost >> 8) & 0xff,
@ -1483,7 +1482,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
struct ssrc_receiver_report *srr = g_new(__typeof(*srr), 1);
*srr = (struct ssrc_receiver_report) {
.from = ssrc_out,
.ssrc = s->parent->h.ssrc,
.ssrc = s->h.ssrc,
.fraction_lost = lost * 256 / (tot + lost),
.packets_lost = lost,
.high_seq_received = atomic_get_na(&s->stats->ext_seq),
@ -1491,12 +1490,12 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
.dlsr = tv_diff * 65536 / 1000000,
.jitter = jitter >> 4,
};
g_queue_push_tail(srrs, srr);
t_queue_push_tail(srrs, srr);
}
n++;
}
ssrc_ctx_put(&s);
ssrc_entry_release(s);
i++;
}
@ -1537,22 +1536,21 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
return ret;
}
static void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash) {
static void rtcp_receiver_reports(ssrc_q *out, struct ssrc_hash *hash) {
LOCK(&hash->lock);
for (GList *l = hash->nq.head; l; l = l->next) {
struct ssrc_entry_call *e = l->data;
struct ssrc_ctx *i = &e->input_ctx;
struct ssrc_entry_call *i = l->data;
if (!atomic64_get_na(&i->stats->packets))
continue;
ssrc_ctx_hold(i);
g_queue_push_tail(out, i);
ssrc_entry_hold(i);
t_queue_push_tail(out, i);
}
}
// call must be locked in R
void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out) {
// figure out where to send it
struct packet_stream *ps = media->streams.head->data;
// crypto context is held separately
@ -1574,17 +1572,17 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
log_info_stream_fd(ps->selected_sfd);
GQueue rrs = G_QUEUE_INIT;
ssrc_q rrs = TYPED_GQUEUE_INIT;
rtcp_receiver_reports(&rrs, &media->ssrc_hash_in);
ilogs(rtcp, LOG_DEBUG, "Generating and sending RTCP SR for %x and up to %i source(s)",
ssrc_out->parent->h.ssrc, rrs.length);
ssrc_out->h.ssrc, rrs.length);
struct ssrc_sender_report ssr;
GQueue srrs = G_QUEUE_INIT;
ssrc_rr_q srrs = TYPED_GQUEUE_INIT;
GString *sr = rtcp_sender_report(&ssr, ssrc_out->parent->h.ssrc,
ssrc_out->ssrc_map_out ? : ssrc_out->parent->h.ssrc,
GString *sr = rtcp_sender_report(&ssr, ssrc_out->h.ssrc,
ssrc_out->ssrc_map_out ? : ssrc_out->h.ssrc,
atomic_get_na(&ssrc_out->stats->timestamp),
atomic64_get_na(&ssrc_out->stats->packets),
atomic64_get(&ssrc_out->stats->bytes),
@ -1613,13 +1611,13 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
struct call_media *other_media = sink->media;
ssrc_sender_report(other_media, &ssr, rtpe_now);
for (GList *k = srrs.head; k; k = k->next) {
for (__auto_type k = srrs.head; k; k = k->next) {
struct ssrc_receiver_report *srr = k->data;
ssrc_receiver_report(other_media, sink->selected_sfd, srr, rtpe_now);
}
}
while (srrs.length) {
struct ssrc_receiver_report *srr = g_queue_pop_head(&srrs);
struct ssrc_receiver_report *srr = t_queue_pop_head(&srrs);
g_free(srr);
}
}


+ 8
- 6
daemon/rtp.c View File

@ -43,13 +43,15 @@ error:
return -1;
}
static unsigned int packet_index(struct ssrc_ctx *ssrc_ctx, struct rtp_header *rtp, crypto_debug_string **cds) {
static unsigned int packet_index(struct ssrc_entry_call *ssrc_ctx, struct rtp_header *rtp,
crypto_debug_string **cds)
{
uint16_t seq;
seq = ntohs(rtp->seq_num);
*cds = crypto_debug_init((seq & 0x1ff) == (ssrc_ctx->parent->h.ssrc & 0x1ff));
crypto_debug_printf(*cds, "SSRC %" PRIx32 ", seq %" PRIu16, ssrc_ctx->parent->h.ssrc, seq);
*cds = crypto_debug_init((seq & 0x1ff) == (ssrc_ctx->h.ssrc & 0x1ff));
crypto_debug_printf(*cds, "SSRC %" PRIx32 ", seq %" PRIu16, ssrc_ctx->h.ssrc, seq);
/* rfc 3711 section 3.3.1 */
unsigned int srtp_index = atomic_get_na(&ssrc_ctx->stats->ext_seq);
@ -100,7 +102,7 @@ void rtp_append_mki(str *s, struct crypto_context *c, crypto_debug_string *cds)
}
/* rfc 3711, section 3.3 */
int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) {
struct rtp_header *rtp;
str payload, to_auth;
unsigned int index;
@ -142,7 +144,7 @@ int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
}
// just updates the ext_seq in ssrc
int rtp_update_index(str *s, struct packet_stream *ps, struct ssrc_ctx *ssrc) {
int rtp_update_index(str *s, struct packet_stream *ps, struct ssrc_entry_call *ssrc) {
struct rtp_header *rtp;
if (G_UNLIKELY(!ssrc))
@ -155,7 +157,7 @@ int rtp_update_index(str *s, struct packet_stream *ps, struct ssrc_ctx *ssrc) {
}
/* rfc 3711, section 3.3 */
int rtp_savp2avp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
int rtp_savp2avp(str *s, struct crypto_context *c, struct ssrc_entry_call *ssrc_ctx) {
struct rtp_header *rtp;
unsigned int index;
str payload, to_auth, to_decrypt, auth_tag;


+ 17
- 32
daemon/ssrc.c View File

@ -26,15 +26,6 @@ static mos_calc_fn *mos_calcs[__MOS_TYPES] = {
static void __free_ssrc_entry_call(struct ssrc_entry_call *e);
static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) {
c->parent = parent;
payload_tracker_init(&c->tracker);
while (!c->ssrc_map_out)
c->ssrc_map_out = ssl_random();
c->seq_out = ssl_random();
atomic64_set_na(&c->last_sample, rtpe_now);
c->stats = bufferpool_alloc0(shm_bufferpool, sizeof(*c->stats));
}
static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) {
ent->ssrc = ssrc;
mutex_init(&ent->lock);
@ -43,8 +34,12 @@ static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) {
static struct ssrc_entry *create_ssrc_entry_call(void *uptr) {
struct ssrc_entry_call *ent;
ent = obj_alloc0(struct ssrc_entry_call, __free_ssrc_entry_call);
init_ssrc_ctx(&ent->input_ctx, ent);
init_ssrc_ctx(&ent->output_ctx, ent);
payload_tracker_init(&ent->tracker);
while (!ent->ssrc_map_out)
ent->ssrc_map_out = ssl_random();
ent->seq_out = ssl_random();
atomic64_set_na(&ent->last_sample, rtpe_now);
ent->stats = bufferpool_alloc0(shm_bufferpool, sizeof(*ent->stats));
//ent->seq_out = ssl_random();
//ent->ts_out = ssl_random();
ent->lost_bits = -1;
@ -70,8 +65,7 @@ static void __free_ssrc_entry_call(struct ssrc_entry_call *e) {
g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block);
if (e->sequencers)
g_hash_table_destroy(e->sequencers);
bufferpool_unref(e->input_ctx.stats);
bufferpool_unref(e->output_ctx.stats);
bufferpool_unref(e->stats);
}
static void ssrc_entry_put(void *ep) {
struct ssrc_entry_call *e = ep;
@ -297,14 +291,6 @@ void ssrc_hash_call_init(struct ssrc_hash *sh) {
ssrc_hash_full_init(sh, create_ssrc_entry_call, NULL);
}
struct ssrc_ctx *get_ssrc_ctx(uint32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) {
struct ssrc_entry *s = get_ssrc(ssrc, ht /* , NULL */);
if (G_UNLIKELY(!s))
return NULL;
struct ssrc_ctx *ret = ((void *) s) + dir;
return ret;
}
static void *__do_time_report_item(struct call_media *m, size_t struct_size, size_t reports_queue_offset,
@ -370,10 +356,10 @@ static int64_t __calc_rtt(struct call_media *m, struct crtt_args a)
return 0;
if (a.pt_p)
*a.pt_p = e->output_ctx.tracker.most[0] == 255 ? -1 : e->output_ctx.tracker.most[0];
*a.pt_p = e->tracker.most[0] == 255 ? -1 : e->tracker.most[0];
// grab the opposite side SSRC for the time reports
uint32_t map_ssrc = e->output_ctx.ssrc_map_out;
uint32_t map_ssrc = e->ssrc_map_out;
if (!map_ssrc)
map_ssrc = e->h.ssrc;
obj_put(&e->h);
@ -448,7 +434,7 @@ void ssrc_receiver_report(struct call_media *m, stream_fd *sfd, const struct ssr
int pt;
int64_t rtt = calc_rtt(m,
.ht = &m->ssrc_hash_in,
.ht = &m->ssrc_hash_out,
.tv = tv,
.pt_p = &pt,
.ssrc = rr->ssrc,
@ -580,7 +566,7 @@ void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr,
dlrr->lrr, dlrr->dlrr);
calc_rtt(m,
.ht = &m->ssrc_hash_in,
.ht = &m->ssrc_hash_out,
.tv = tv,
.pt_p = NULL,
.ssrc = dlrr->ssrc,
@ -722,22 +708,21 @@ void ssrc_collect_metrics(struct call_media *media) {
return;
struct packet_stream *ps = media->streams.head->data;
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
struct ssrc_ctx *s = ps->ssrc_in[i];
struct ssrc_entry_call *s = ps->ssrc_in[i];
if (!s)
break; // end of list
struct ssrc_entry_call *e = s->parent;
// exclude zero values - technically possible but unlikely and probably just unset
if (!e->jitter)
if (!s->jitter)
continue;
if (e->input_ctx.tracker.most_len > 0 && e->input_ctx.tracker.most[0] != 255) {
const rtp_payload_type *rpt = get_rtp_payload_type(e->input_ctx.tracker.most[0],
if (s->tracker.most_len > 0 && s->tracker.most[0] != 255) {
const rtp_payload_type *rpt = get_rtp_payload_type(s->tracker.most[0],
&ps->media->codecs);
if (rpt && rpt->clock_rate)
e->jitter = e->jitter * 1000 / rpt->clock_rate;
s->jitter = s->jitter * 1000 / rpt->clock_rate;
}
RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd);
RTPE_SAMPLE_SFD(jitter_measured, s->jitter, ps->selected_sfd);
}
}

+ 1
- 1
include/call.h View File

@ -439,7 +439,7 @@ struct packet_stream {
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
struct ssrc_entry_call *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */


+ 3
- 3
include/codec.h View File

@ -95,7 +95,7 @@ struct codec_packet {
struct rtp_header *rtp;
unsigned long ts;
unsigned int clockrate;
struct ssrc_ctx *ssrc_out;
struct ssrc_entry_call *ssrc_out;
void (*free_func)(void *);
void (*plain_free_func)(void *);
};
@ -151,7 +151,7 @@ struct codec_handler *codec_handler_make_media_player(const rtp_payload_type *sr
str_case_value_ht codec_set);
struct codec_handler *codec_handler_make_dummy(const rtp_payload_type *dst_pt, struct call_media *media,
str_case_value_ht codec_set);
void codec_calc_jitter(struct ssrc_ctx *, unsigned long ts, unsigned int clockrate, int64_t);
void codec_calc_jitter(struct ssrc_entry_call *, unsigned long ts, unsigned int clockrate, int64_t);
void codec_update_all_handlers(struct call_monologue *ml);
void codec_update_all_source_handlers(struct call_monologue *ml, const sdp_ng_flags *flags);
@ -239,7 +239,7 @@ __attribute__((nonnull(1, 2)))
void __codec_handlers_update(struct call_media *receiver, struct call_media *sink, struct chu_args);
void codec_add_dtmf_event(struct codec_ssrc_handler *ch, int code, int level, uint64_t ts, bool injected);
uint64_t codec_last_dtmf_event(struct codec_ssrc_handler *ch);
uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_ctx *);
uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_entry_call *);
void codec_decoder_skip_pts(struct codec_ssrc_handler *ch, uint64_t);
uint64_t codec_decoder_unskip_pts(struct codec_ssrc_handler *ch);
void codec_tracker_update(struct codec_store *, struct codec_store *);


+ 3
- 3
include/media_player.h View File

@ -12,7 +12,7 @@
struct call_media;
struct call_monologue;
struct codec_handler;
struct ssrc_ctx;
struct ssrc_entry_call;
struct packet_stream;
struct codec_packet;
struct media_player;
@ -82,7 +82,7 @@ struct media_player {
struct media_player_media_file *media_file;
uint32_t ssrc;
struct ssrc_ctx *ssrc_out;
struct ssrc_entry_call *ssrc_out;
unsigned long seq;
unsigned long buffer_ts;
unsigned long sync_ts;
@ -122,7 +122,7 @@ struct send_timer {
#define MPO(...) (media_player_opts_t){__VA_ARGS__}
void media_player_new(struct media_player **, struct call_monologue *, struct ssrc_ctx *prev_ssrc, media_player_opts_t *);
void media_player_new(struct media_player **, struct call_monologue *, struct ssrc_entry_call *prev_ssrc, media_player_opts_t *);
bool media_player_add(struct media_player *mp, media_player_opts_t opts);
bool media_player_start(struct media_player *);
long long media_player_stop(struct media_player *);


+ 5
- 5
include/media_socket.h View File

@ -19,7 +19,7 @@
struct media_packet;
struct transport_protocol;
struct ssrc_ctx;
struct ssrc_entry_call;
struct rtpengine_srtp;
struct jb_packet;
struct poller;
@ -29,7 +29,7 @@ TYPED_GQUEUE(stream_fd, stream_fd)
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_ctx *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_entry_call *);
enum transport_protocol_index {
@ -273,7 +273,7 @@ struct media_packet {
struct rtp_header *rtp;
struct rtcp_packet *rtcp;
struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp
struct ssrc_entry_call *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp
str payload;
codec_packet_q packets_out;
@ -316,9 +316,9 @@ void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *);
void __reset_sink_handlers(struct packet_stream *);
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx);
struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx);
void media_packet_copy(struct media_packet *, const struct media_packet *);


+ 3
- 4
include/rtcp.h View File

@ -9,7 +9,6 @@
struct crypto_context;
struct rtcp_packet;
struct ssrc_ctx;
struct rtcp_handler;
struct call_monologue;
@ -18,8 +17,8 @@ extern struct rtcp_handler *rtcp_transcode_handler;
extern struct rtcp_handler *rtcp_sink_handler;
int rtcp_avp2savp(str *, struct crypto_context *, struct ssrc_ctx *);
int rtcp_savp2avp(str *, struct crypto_context *, struct ssrc_ctx *);
int rtcp_avp2savp(str *, struct crypto_context *, struct ssrc_entry_call *);
int rtcp_savp2avp(str *, struct crypto_context *, struct ssrc_entry_call *);
int rtcp_payload(struct rtcp_packet **out, str *p, const str *s);
@ -31,6 +30,6 @@ rtcp_filter_func rtcp_avpf2avp_filter;
void rtcp_init(void);
void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out);
void rtcp_send_report(struct call_media *media, struct ssrc_entry_call *ssrc_out);
#endif

+ 4
- 5
include/rtp.h View File

@ -9,18 +9,17 @@
struct crypto_context;
struct rtp_header;
struct ssrc_hash;
enum ssrc_dir;
struct ssrc_ctx;
struct ssrc_entry_call;
struct codec_store;
typedef GString crypto_debug_string;
const rtp_payload_type *get_rtp_payload_type(unsigned int, struct codec_store *);
int rtp_avp2savp(str *, struct crypto_context *, struct ssrc_ctx *);
int rtp_savp2avp(str *, struct crypto_context *, struct ssrc_ctx *);
int rtp_avp2savp(str *, struct crypto_context *, struct ssrc_entry_call *);
int rtp_savp2avp(str *, struct crypto_context *, struct ssrc_entry_call *);
int rtp_update_index(str *, struct packet_stream *, struct ssrc_ctx *);
int rtp_update_index(str *, struct packet_stream *, struct ssrc_entry_call *);
void rtp_append_mki(str *s, struct crypto_context *c, crypto_debug_string *);
int srtp_payloads(str *to_auth, str *to_decrypt, str *auth_tag, str *mki,


+ 27
- 44
include/ssrc.h View File

@ -39,8 +39,26 @@ struct payload_tracker {
unsigned char last_pts[16];
int last_pt_idx;
};
struct ssrc_ctx {
struct ssrc_entry_call *parent;
struct ssrc_stats_block {
int64_t reported;
uint64_t jitter; // ms
uint64_t rtt; // us - combined from both sides
uint32_t rtt_leg; // RTT only for the leg receiving the RTCP report
uint64_t packetloss; // percent
uint64_t mos; // nominal range of 10 - 50 for MOS values 1.0 to 5.0
};
struct ssrc_entry {
struct obj obj;
GList link;
mutex_t lock;
uint32_t ssrc;
};
struct ssrc_entry_call {
struct ssrc_entry h; // must be first
struct payload_tracker tracker;
// XXX move entire crypto context in here?
@ -61,28 +79,7 @@ struct ssrc_ctx {
sample_duplicates;
int64_t next_rtcp; // for self-generated RTCP reports
};
struct ssrc_stats_block {
int64_t reported;
uint64_t jitter; // ms
uint64_t rtt; // us - combined from both sides
uint32_t rtt_leg; // RTT only for the leg receiving the RTCP report
uint64_t packetloss; // percent
uint64_t mos; // nominal range of 10 - 50 for MOS values 1.0 to 5.0
};
struct ssrc_entry {
struct obj obj;
GList link;
mutex_t lock;
uint32_t ssrc;
};
struct ssrc_entry_call {
struct ssrc_entry h; // must be first
struct ssrc_ctx input_ctx,
output_ctx;
GQueue sender_reports; // as received via RTCP
GQueue rr_time_reports; // as received via RTCP
GQueue stats_blocks; // calculated
@ -106,10 +103,6 @@ struct ssrc_entry_call {
// output only
uint16_t seq_diff;
};
enum ssrc_dir { // these values must not be used externally
SSRC_DIR_INPUT = G_STRUCT_OFFSET(struct ssrc_entry_call, input_ctx),
SSRC_DIR_OUTPUT = G_STRUCT_OFFSET(struct ssrc_entry_call, output_ctx),
};
struct ssrc_time_item {
int64_t received;
@ -209,9 +202,6 @@ INLINE void *get_ssrc(uint32_t ssrc, struct ssrc_hash *ht) {
return get_ssrc_full(ssrc, ht, NULL);
}
struct ssrc_ctx *get_ssrc_ctx(uint32_t, struct ssrc_hash *, enum ssrc_dir); // creates new entry if not found
void ssrc_sender_report(struct call_media *, const struct ssrc_sender_report *, int64_t);
void ssrc_receiver_report(struct call_media *, stream_fd *, const struct ssrc_receiver_report *, int64_t);
void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *rr, int64_t);
@ -226,20 +216,13 @@ void payload_tracker_init(struct payload_tracker *t);
void payload_tracker_add(struct payload_tracker *, int);
#define ssrc_ctx_put(c) \
do { \
struct ssrc_ctx **__cc = (c); \
if ((__cc) && *(__cc)) { \
obj_put(&(*__cc)->parent->h); \
*(__cc) = NULL; \
} \
} while (0)
#define ssrc_ctx_hold(c) \
do { \
if (c) \
obj_hold(&(c)->parent->h); \
} while (0)
#define ssrc_entry_release(c) do { \
if (c) { \
obj_put(&(c)->h); \
c = NULL; \
} \
} while (0)
#define ssrc_entry_hold(c) obj_hold(&(c)->h)
#endif

+ 2
- 1
lib/obj.h View File

@ -114,7 +114,8 @@ INLINE void __obj_put(struct obj *o);
#endif
#define obj_release(op) do { if (op) obj_put_o((struct obj *) op); op = NULL; } while (0)
#define obj_release_o(op) do { if (op) obj_put_o((struct obj *) op); op = NULL; } while (0)
#define obj_release(op) do { if (op) obj_put(op); op = NULL; } while (0)


+ 3
- 6
t/aead-decrypt.c View File

@ -58,14 +58,11 @@ int main(int argc, char **argv) {
};
struct ssrc_entry_call se = {
.input_ctx = {
.parent = &se,
.stats = &stats,
},
.stats = &stats,
};
int ret = rtp_savp2avp(&s, &cc, &se.input_ctx);
int ret = rtp_savp2avp(&s, &cc, &se);
assert(ret == 0);
printf("idx %d ROC %d\n", se.input_ctx.stats->ext_seq, se.input_ctx.stats->ext_seq >> 16);
printf("idx %d ROC %d\n", se.stats->ext_seq, se.stats->ext_seq >> 16);
return 0;
}

+ 4
- 4
t/test-transcode.c View File

@ -261,13 +261,13 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media
.call = &call,
.media = media,
.media_out = other_media,
.ssrc_in = get_ssrc_ctx(ssrc, &media->ssrc_hash_in, SSRC_DIR_INPUT),
.ssrc_in = get_ssrc(ssrc, &media->ssrc_hash_in),
.sfd = &sfd,
};
// from __stream_ssrc()
if (!MEDIA_ISSET(media, TRANSCODING))
mp.ssrc_in->ssrc_map_out = ntohl(ssrc);
mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, &other_media->ssrc_hash_out, SSRC_DIR_OUTPUT);
mp.ssrc_out = get_ssrc(mp.ssrc_in->ssrc_map_out, &other_media->ssrc_hash_out);
payload_tracker_add(&mp.ssrc_in->tracker, pt_in & 0x7f);
int packet_len = sizeof(struct rtp_header) + pl.len;
@ -358,8 +358,8 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media
}
printf("test ok: %s:%i\n\n", file, line);
free(packet);
ssrc_ctx_put(&mp.ssrc_in);
ssrc_ctx_put(&mp.ssrc_out);
ssrc_entry_release(mp.ssrc_in);
ssrc_entry_release(mp.ssrc_out);
}
#define packet(side, pt_in, pload, pt_out, pload_exp) \


Loading…
Cancel
Save