Browse Source

MT#55283 obsolete SSRC tracking

Change-Id: Ic0a3c7826180e1e1f4a783dcc6d775c92fe38d1f
pull/1938/head
Richard Fuchs 8 months ago
parent
commit
c8dd521e33
12 changed files with 215 additions and 227 deletions
  1. +32
    -38
      daemon/call.c
  2. +56
    -58
      daemon/call_interfaces.c
  3. +2
    -1
      daemon/cli.c
  4. +21
    -27
      daemon/codec.c
  5. +1
    -1
      daemon/dtmf.c
  6. +44
    -49
      daemon/media_socket.c
  7. +41
    -22
      daemon/mqtt.c
  8. +5
    -7
      daemon/ssrc.c
  9. +1
    -1
      daemon/t38.c
  10. +0
    -5
      include/call.h
  11. +4
    -0
      include/ssrc.h
  12. +8
    -18
      t/auto-daemon-tests.pl

+ 32
- 38
daemon/call.c View File

@ -74,6 +74,7 @@ static void media_stop(struct call_media *m);
__attribute__((nonnull(1, 2, 4)))
static struct media_subscription *__subscribe_medias_both_ways(struct call_media * a, struct call_media * b,
bool is_offer, medias_q *);
static void call_stream_crypto_reset(struct packet_stream *ps);
/* called with call->master_lock held in R */
static int call_timer_delete_monologues(call_t *c) {
@ -212,26 +213,6 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
if (active_media)
CALL_CLEAR(sfd->call, FOREIGN_MEDIA);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
struct ssrc_entry_call *ctx = ps->ssrc_in[u];
if (!ctx)
break;
if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L)
payload_tracker_add(&ctx->tracker,
atomic_get_na(&ctx->stats->last_pt));
}
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
struct ssrc_entry_call *ctx = ps->ssrc_out[u];
if (!ctx)
break;
if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L)
payload_tracker_add(&ctx->tracker,
atomic_get_na(&ctx->stats->last_pt));
}
no_sfd:
if (good)
goto next;
@ -263,6 +244,22 @@ next:
ssrc_collect_metrics(media);
if (MEDIA_ISSET(media, TRANSCODING))
hlp->transcoded_media++;
for (__auto_type l = media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *ctx = l->data;
if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L)
payload_tracker_add(&ctx->tracker,
atomic_get_na(&ctx->stats->last_pt));
}
for (__auto_type l = media->ssrc_hash_out.nq.head; l; l = l->next) {
struct ssrc_entry_call *ctx = l->data;
if (rtpe_now - atomic64_get_na(&ctx->stats->last_packet_us) < 2000000L)
payload_tracker_add(&ctx->tracker,
atomic_get_na(&ctx->stats->last_pt));
}
}
if (good) {
@ -1078,27 +1075,27 @@ static void __fill_stream(struct packet_stream *ps, const struct endpoint *epp,
PS_SET(ps, NAT_WAIT);
}
void call_stream_crypto_reset(struct packet_stream *ps) {
static void call_stream_crypto_reset(struct packet_stream *ps) {
ilog(LOG_DEBUG, "Resetting crypto context");
crypto_reset(&ps->crypto);
struct call_media *media = ps->media;
if (PS_ISSET(ps, RTP)) {
mutex_lock(&ps->in_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
if (!ps->ssrc_in[u]) // end of list
break;
atomic_set_na(&ps->ssrc_in[u]->stats->ext_seq, 0);
mutex_lock(&media->ssrc_hash_in.lock);
for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
atomic_set_na(&se->stats->ext_seq, 0);
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&media->ssrc_hash_in.lock);
mutex_lock(&ps->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
if (!ps->ssrc_out[u]) // end of list
break;
atomic_set_na(&ps->ssrc_out[u]->stats->ext_seq, 0);
mutex_lock(&media->ssrc_hash_out.lock);
for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
atomic_set_na(&se->stats->ext_seq, 0);
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&media->ssrc_hash_out.lock);
}
}
@ -4163,6 +4160,7 @@ void call_destroy(call_t *c) {
char *addr = sockaddr_print_buf(&ps->endpoint.address);
endpoint_t *local_endpoint = packet_stream_local_addr(ps);
char *local_addr = sockaddr_print_buf(&local_endpoint->address);
struct ssrc_entry_call *se = call_get_first_ssrc(&ps->media->ssrc_hash_in);
ilog(LOG_INFO, "--------- Port %15s:%-5u <> %s%15s:%-5u%s%s, SSRC %s%" PRIx32 "%s, in "
"%" PRIu64 " p, %" PRIu64 " b, %" PRIu64 " e, %" PRIu64 " ts, "
@ -4171,7 +4169,7 @@ void call_destroy(call_t *c) {
(unsigned int) local_endpoint->port,
FMT_M(addr, ps->endpoint.port),
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0),
FMT_M(se ? se->h.ssrc : 0),
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),
@ -4360,10 +4358,6 @@ static void __call_free(call_t *c) {
crypto_cleanup(&ps->crypto);
t_queue_clear(&ps->sfds);
t_hash_table_destroy(ps->rtp_stats);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++)
ssrc_entry_release(ps->ssrc_in[u]);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++)
ssrc_entry_release(ps->ssrc_out[u]);
bufferpool_unref(ps->stats_in);
bufferpool_unref(ps->stats_out);
g_free(ps);


+ 56
- 58
daemon/call_interfaces.c View File

@ -64,7 +64,8 @@ static void call_ng_flags_list(const ng_parser_t *, parser_arg list,
void (*item_callback)(const ng_parser_t *, parser_arg, helper_arg),
helper_arg);
static void call_ng_flags_esc_str_list(str *s, unsigned int, helper_arg);
static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, const struct ssrc_hash *ht);
static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, parser_arg list,
const struct ssrc_hash *ht);
static str *str_dup_escape(const str *s);
static void call_set_dtmf_block(call_t *call, struct call_monologue *monologue, sdp_ng_flags *flags);
@ -2748,27 +2749,6 @@ static void ng_stats_endpoint(const ng_parser_t *parser, parser_arg dict, const
parser->dict_add_int(dict, "port", ep->port);
}
static void ng_stats_stream_ssrc(const ng_parser_t *parser, parser_arg dict,
struct ssrc_entry_call *const ssrcs[RTPE_NUM_SSRC_TRACKING],
const char *label)
{
parser_arg list = parser->dict_add_list(dict, label);
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
struct ssrc_entry_call *c = ssrcs[i];
if (!c)
break;
parser_arg ssrc = parser->list_add_dict(list);
parser->dict_add_int(ssrc, "SSRC", ssrcs[i]->h.ssrc);
parser->dict_add_int(ssrc, "bytes", atomic64_get_na(&c->stats->bytes));
parser->dict_add_int(ssrc, "packets", atomic64_get_na(&c->stats->packets));
parser->dict_add_int(ssrc, "last RTP timestamp", atomic_get_na(&c->stats->timestamp));
parser->dict_add_int(ssrc, "last RTP seq", atomic_get_na(&c->stats->ext_seq));
}
}
#define BF_PS(k, f) if (PS_ISSET(ps, f)) parser->list_add_string(flags, k)
static void ng_stats_stream(ng_command_ctx_t *ctx, parser_arg list, const struct packet_stream *ps,
@ -2800,6 +2780,10 @@ static void ng_stats_stream(ng_command_ctx_t *ctx, parser_arg list, const struct
parser->dict_add_int(dict, "last kernel packet", atomic64_get_na(&ps->stats_in->last_packet_us) / 1000000L);
parser->dict_add_int(dict, "last user packet", atomic64_get_na(&ps->last_packet_us) / 1000000L);
__auto_type se = call_get_first_ssrc(&ps->media->ssrc_hash_in);
if (se)
parser->dict_add_int(dict, "SSRC", se->h.ssrc);
flags = parser->dict_add_list(dict, "flags");
BF_PS("RTP", RTP);
@ -2814,9 +2798,6 @@ static void ng_stats_stream(ng_command_ctx_t *ctx, parser_arg list, const struct
BF_PS("media handover", MEDIA_HANDOVER);
BF_PS("ICE", ICE);
ng_stats_stream_ssrc(parser, dict, ps->ssrc_in, "ingress SSRCs");
ng_stats_stream_ssrc(parser, dict, ps->ssrc_out, "egress SSRCs");
stats:
if (totals->last_packet_us < packet_stream_last_packet(ps))
totals->last_packet_us = packet_stream_last_packet(ps);
@ -2888,7 +2869,8 @@ static void ng_stats_media(ng_command_ctx_t *ctx, parser_arg list, const struct
BF_M("transcoding", TRANSCODING);
BF_M("block egress", BLOCK_EGRESS);
ng_stats_ssrc(parser, ssrc, &m->ssrc_hash_in); // XXX out
ng_stats_ssrc(parser, ssrc, parser->dict_add_list(dict, "ingress SSRCs"), &m->ssrc_hash_in);
ng_stats_ssrc(parser, NULL, parser->dict_add_list(dict, "egress SSRCs"), &m->ssrc_hash_out);
stats:
for (auto_iter(l, m->streams.head); l; l = l->next) {
@ -3030,45 +3012,61 @@ static void ng_stats_ssrc_mos_entry_dict_avg(const ng_parser_t *parser, parser_a
parser->dict_add_int(subent, "samples", div);
}
static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, const struct ssrc_hash *ht) {
static void ng_stats_ssrc_1(const ng_parser_t *parser, parser_arg ent, struct ssrc_entry_call *se) {
parser->dict_add_int(ent, "bytes", atomic64_get_na(&se->stats->bytes));
parser->dict_add_int(ent, "packets", atomic64_get_na(&se->stats->packets));
parser->dict_add_int(ent, "last RTP timestamp", atomic_get_na(&se->stats->timestamp));
parser->dict_add_int(ent, "last RTP seq", atomic_get_na(&se->stats->ext_seq));
parser->dict_add_int(ent, "cumulative loss", se->packets_lost);
int mos_samples = se->stats_blocks.length - se->no_mos_count;
if (mos_samples < 1) mos_samples = 1;
ng_stats_ssrc_mos_entry_dict_avg(parser, ent, "average MOS", &se->average_mos, mos_samples);
ng_stats_ssrc_mos_entry_dict(parser, ent, "lowest MOS", se->lowest_mos);
ng_stats_ssrc_mos_entry_dict(parser, ent, "highest MOS", se->highest_mos);
parser_arg progdict = parser->dict_add_dict(ent, "MOS progression");
// aim for about 10 entries to the list
GList *listent = se->stats_blocks.head;
struct ssrc_stats_block *sb = listent->data;
int64_t interval
= ((struct ssrc_stats_block *) se->stats_blocks.tail->data)->reported
- sb->reported;
interval /= 10;
parser->dict_add_int(progdict, "interval", interval / 1000000L);
int64_t next_step = sb->reported;
parser_arg entlist = parser->dict_add_list(progdict, "entries");
for (; listent; listent = listent->next) {
sb = listent->data;
if (sb->reported < next_step)
continue;
next_step += interval;
parser_arg cent = parser->list_add_dict(entlist);
ng_stats_ssrc_mos_entry(parser, cent, sb);
}
}
static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, parser_arg list,
const struct ssrc_hash *ht)
{
for (GList *l = ht->nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
char tmp[12];
snprintf(tmp, sizeof(tmp), "%" PRIu32, se->h.ssrc);
if (parser->dict_contains(dict, tmp))
continue;
if (!se->stats_blocks.length || !se->lowest_mos || !se->highest_mos)
continue;
parser_arg ent = parser->dict_add_dict_dup(dict, tmp);
parser->dict_add_int(ent, "cumulative loss", se->packets_lost);
int mos_samples = se->stats_blocks.length - se->no_mos_count;
if (mos_samples < 1) mos_samples = 1;
ng_stats_ssrc_mos_entry_dict_avg(parser, ent, "average MOS", &se->average_mos, mos_samples);
ng_stats_ssrc_mos_entry_dict(parser, ent, "lowest MOS", se->lowest_mos);
ng_stats_ssrc_mos_entry_dict(parser, ent, "highest MOS", se->highest_mos);
parser_arg progdict = parser->dict_add_dict(ent, "MOS progression");
// aim for about 10 entries to the list
GList *listent = se->stats_blocks.head;
struct ssrc_stats_block *sb = listent->data;
int64_t interval
= ((struct ssrc_stats_block *) se->stats_blocks.tail->data)->reported
- sb->reported;
interval /= 10;
parser->dict_add_int(progdict, "interval", interval / 1000000L);
int64_t next_step = sb->reported;
parser_arg entlist = parser->dict_add_list(progdict, "entries");
for (; listent; listent = listent->next) {
sb = listent->data;
if (sb->reported < next_step)
continue;
next_step += interval;
parser_arg cent = parser->list_add_dict(entlist);
ng_stats_ssrc_mos_entry(parser, cent, sb);
parser_arg ent = parser->list_add_dict(list);
parser->dict_add_int(ent, "SSRC", se->h.ssrc);
ng_stats_ssrc_1(parser, ent, se);
if (dict.gen && !parser->dict_contains(dict, tmp)) {
ent = parser->dict_add_dict_dup(dict, tmp);
ng_stats_ssrc_1(parser, ent, se);
}
}
}


+ 2
- 1
daemon/cli.c View File

@ -799,6 +799,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml)
endpoint_t *local_endpoint = packet_stream_local_addr(ps);
local_addr = sockaddr_print_buf(&local_endpoint->address);
struct ssrc_entry_call *se = call_get_first_ssrc(&md->ssrc_hash_in);
cw->cw_printf(cw, "-------- Port %15s:%-5u <> %15s:%-5u%s, SSRC %" PRIx32 ", "
"%" PRIu64 " p, %" PRIu64 " b, %" PRIu64 " e, %" PRIu64 " uts "
@ -808,7 +809,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml)
sockaddr_print_buf(&ps->endpoint.address),
ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
ps->ssrc_in[0] ? ps->ssrc_in[0]->h.ssrc : 0,
se ? se->h.ssrc : 0,
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),


+ 21
- 27
daemon/codec.c View File

@ -1315,32 +1315,24 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
rwlock_lock_r(&rt->call->master_lock);
// copy out references to SSRCs for lock-free handling
struct ssrc_entry_call *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ssrc_out[u] = ps->ssrc_out[u];
ssrc_entry_hold(ssrc_out[u]);
}
mutex_unlock(&ps->out_lock);
GQueue ssrc_out = G_QUEUE_INIT;
mutex_lock(&media->ssrc_hash_out.lock);
for (GList *l = media->ssrc_hash_out.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
g_queue_push_tail(&ssrc_out, ssrc_entry_hold(se));
}
mutex_unlock(&media->ssrc_hash_out.lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ssrc_out[u]) // end of list
break;
// coverity[use : FALSE]
rtcp_send_report(media, ssrc_out[u]);
for (GList *l = ssrc_out.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
rtcp_send_report(media, se);
}
rwlock_unlock_r(&rt->call->master_lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ssrc_out[u]) // end of list
break;
ssrc_entry_release(ssrc_out[u]);
while (ssrc_out.length) {
struct ssrc_entry_call *se = g_queue_pop_head(&ssrc_out);
ssrc_entry_release(se);
}
out:
@ -3769,6 +3761,8 @@ static void __dtx_send_later(struct codec_timer *ct) {
ts = dtxb->head_ts;
}
ps = mp_copy.stream;
struct call_media *media = ps->media;
struct ssrc_entry_call *se = call_get_first_ssrc(&media->ssrc_hash_in);
log_info_stream_fd(mp_copy.sfd);
// copy out other fields so we can unlock
@ -3787,20 +3781,20 @@ static void __dtx_send_later(struct codec_timer *ct) {
shutdown = true;
else if (!ps)
shutdown = true;
else if (!ps->ssrc_in[0])
else if (!se)
shutdown = true;
else if (dtxb->ssrc != ps->ssrc_in[0]->h.ssrc)
else if (dtxb->ssrc != se->h.ssrc)
shutdown = true;
else if (dtxb->ct.next == 0)
shutdown = true;
else {
shutdown = true; // default if no last used PTs are known
for (int i = 0; i < G_N_ELEMENTS(ps->ssrc_in[0]->tracker.last_pts); i++) {
int pt_idx = ps->ssrc_in[0]->tracker.last_pt_idx - i;
pt_idx += G_N_ELEMENTS(ps->ssrc_in[0]->tracker.last_pts);
pt_idx %= G_N_ELEMENTS(ps->ssrc_in[0]->tracker.last_pts);
int last_pt = ps->ssrc_in[0]->tracker.last_pts[pt_idx];
for (int i = 0; i < G_N_ELEMENTS(se->tracker.last_pts); i++) {
int pt_idx = se->tracker.last_pt_idx - i;
pt_idx += G_N_ELEMENTS(se->tracker.last_pts);
pt_idx %= G_N_ELEMENTS(se->tracker.last_pts);
int last_pt = se->tracker.last_pts[pt_idx];
if (last_pt == 255)
break;


+ 1
- 1
daemon/dtmf.c View File

@ -802,7 +802,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura
if (!media->streams.head)
return "Media doesn't have an RTP stream";
struct packet_stream *ps = media->streams.head->data;
struct ssrc_entry_call *ssrc_in = ps->ssrc_in[0];
struct ssrc_entry_call *ssrc_in = call_get_first_ssrc(&media->ssrc_hash_in);
if (!ssrc_in)
return "No SSRC context present for DTMF injection"; // XXX fall back to generating stream


+ 44
- 49
daemon/media_socket.c View File

@ -1512,6 +1512,7 @@ TYPED_GQUEUE(kernel_output, struct rtpengine_destination_info)
typedef struct {
struct rtpengine_target_info reti;
struct ssrc_entry_call *ssrc[RTPE_NUM_SSRC_TRACKING];
kernel_output_q outputs;
rtp_stats_arr *payload_types;
bool blackhole;
@ -1604,11 +1605,15 @@ static const char *kernelize_target(kernelize_state *s, struct packet_stream *st
s->manipulate_pt = s->silenced || ML_ISSET(media->monologue, BLOCK_SHORT);
reti->track_ssrc = 1;
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (stream->ssrc_in[u]) {
reti->ssrc[u] = htonl(stream->ssrc_in[u]->h.ssrc);
reti->ssrc_stats[u] = stream->ssrc_in[u]->stats;
}
unsigned int u = 0;
for (GList *l = stream->media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
if (u >= G_N_ELEMENTS(reti->ssrc))
break;
s->ssrc[u] = se; // no reference needed
reti->ssrc[u] = htonl(se->h.ssrc);
reti->ssrc_stats[u] = se->stats;
u++;
}
recording_stream_kernel_info(stream, reti);
@ -1766,15 +1771,20 @@ static const char *kernelize_one(kernelize_state *s,
redi->output.stats = sink->stats_out;
if (reti->track_ssrc) {
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (sink->ssrc_out[u]) {
// XXX order can be different from ingress?
redi->output.seq_offset[u] = sink->ssrc_out[u]->seq_diff;
redi->output.ssrc_stats[u] = sink->ssrc_out[u]->stats;
}
unsigned int u = 0;
for (GList *l = sink->media->ssrc_hash_out.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
if (u >= G_N_ELEMENTS(redi->output.ssrc_out))
break;
if (redi->output.ssrc_subst && stream->ssrc_in[u])
redi->output.ssrc_out[u] = htonl(stream->ssrc_in[u]->ssrc_map_out);
redi->output.seq_offset[u] = se->seq_diff;
redi->output.ssrc_stats[u] = se->stats;
if (redi->output.ssrc_subst && s->ssrc[u])
redi->output.ssrc_out[u] = htonl(s->ssrc[u]->ssrc_map_out);
u++;
}
}
@ -2069,44 +2079,32 @@ noop:
// returns non-null with reason string if stream should be removed from kernel
static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, mutex_t *lock,
struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p,
static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
uint32_t output_ssrc,
struct ssrc_entry_call **output, struct ssrc_hash *ssrc_hash, const char *label)
struct ssrc_entry_call **output, struct ssrc_hash *ssrc_hash,
const char *label)
{
const char *ret = NULL;
mutex_lock(lock);
int ctx_idx = __hunt_ssrc_ctx_idx(ssrc, list, 0);
if (ctx_idx == -1) {
// SSRC mismatch - get the new entry:
ctx_idx = *ctx_idx_p;
// move to next slot
*ctx_idx_p = (*ctx_idx_p + 1) % RTPE_NUM_SSRC_TRACKING;
// eject old entry if present
if (list[ctx_idx])
ssrc_entry_release(list[ctx_idx]);
// get new entry
list[ctx_idx] =
get_ssrc(ssrc, ssrc_hash);
mutex_lock(&ssrc_hash->lock);
struct ssrc_entry_call *first = call_get_first_ssrc(ssrc_hash);
if (first && first->h.ssrc == ssrc)
ssrc_entry_hold(first);
else
first = NULL;
mutex_unlock(&ssrc_hash->lock);
struct ssrc_entry_call *se = first ?: get_ssrc(ssrc, ssrc_hash);
if (se != first) {
ret = "SSRC changed";
ilog(LOG_DEBUG, "New %s SSRC for: %s%s:%d SSRC: %x%s", label,
FMT_M(sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, ssrc));
}
if (ctx_idx != 0) {
// move most recent entry to front of the list
struct ssrc_entry_call *tmp = list[0];
list[0] = list[ctx_idx];
list[ctx_idx] = tmp;
ctx_idx = 0;
FMT_M(sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, ssrc));
}
// extract and hold entry
ssrc_entry_release(*output);
*output = list[ctx_idx];
ssrc_entry_hold(*output);
*output = se;
// reverse SSRC mapping
if (!output_ssrc)
@ -2114,7 +2112,6 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
else
(*output)->ssrc_map_out = output_ssrc;
mutex_unlock(lock);
return ret;
}
// check and update input SSRC pointers
@ -2122,8 +2119,8 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_entry_call **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, "ingress");
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs),
0, ssrc_in_p, ssrc_hash, "ingress");
}
// check and update output SSRC pointers
// returns non-null with reason string if stream should be removed from kernel
@ -2133,14 +2130,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss
bool ssrc_change)
{
if (ssrc_change)
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash,
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out,
ntohl(ssrc_bs), ssrc_out_p, ssrc_hash,
"egress (mapped)");
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash,
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs),
0, ssrc_out_p, ssrc_hash,
"egress (direct)");
}


+ 41
- 22
daemon/mqtt.c View File

@ -367,17 +367,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_begin_object(json);
mqtt_stream_stats_dir(ps->stats_in, json);
json_builder_set_member_name(json, "SSRC");
json_builder_begin_array(json);
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
if (!ps->ssrc_in[i])
break;
json_builder_begin_object(json);
mqtt_ssrc_stats(ps->ssrc_in[i], json, ps->media);
json_builder_end_object(json);
}
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&ps->in_lock);
@ -388,17 +377,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_begin_object(json);
mqtt_stream_stats_dir(ps->stats_out, json);
json_builder_set_member_name(json, "SSRC");
json_builder_begin_array(json);
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
if (!ps->ssrc_out[i])
break;
json_builder_begin_object(json);
mqtt_ssrc_stats(ps->ssrc_out[i], json, ps->media);
json_builder_end_object(json);
}
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&ps->out_lock);
@ -434,6 +412,47 @@ static void mqtt_media_stats(struct call_media *media, JsonBuilder *json) {
json_builder_add_string_value(json, "inactive");
}
mutex_lock(&media->ssrc_hash_in.lock);
json_builder_set_member_name(json, "ingress");
json_builder_begin_object(json);
json_builder_set_member_name(json, "SSRC");
json_builder_begin_array(json);
for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
json_builder_begin_object(json);
mqtt_ssrc_stats(se, json, media);
json_builder_end_object(json);
}
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&media->ssrc_hash_in.lock);
mutex_lock(&media->ssrc_hash_out.lock);
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
json_builder_set_member_name(json, "SSRC");
json_builder_begin_array(json);
for (GList *l = media->ssrc_hash_out.nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
json_builder_begin_object(json);
mqtt_ssrc_stats(se, json, media);
json_builder_end_object(json);
}
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&media->ssrc_hash_out.lock);
struct packet_stream *ps = media->streams.head ? media->streams.head->data : NULL;
if (ps)
mqtt_stream_stats(ps, json);


+ 5
- 7
daemon/ssrc.c View File

@ -704,11 +704,8 @@ out:
// call master lock held in R
void ssrc_collect_metrics(struct call_media *media) {
if (!media->streams.head)
return;
struct packet_stream *ps = media->streams.head->data;
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
struct ssrc_entry_call *s = ps->ssrc_in[i];
for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) {
struct ssrc_entry_call *s = l->data;
if (!s)
break; // end of list
@ -718,11 +715,12 @@ void ssrc_collect_metrics(struct call_media *media) {
if (s->tracker.most_len > 0 && s->tracker.most[0] != 255) {
const rtp_payload_type *rpt = get_rtp_payload_type(s->tracker.most[0],
&ps->media->codecs);
&media->codecs);
if (rpt && rpt->clock_rate)
s->jitter = s->jitter * 1000 / rpt->clock_rate;
}
RTPE_SAMPLE_SFD(jitter_measured, s->jitter, ps->selected_sfd);
if (media->streams.head)
RTPE_SAMPLE_SFD(jitter_measured, s->jitter, media->streams.head->data->selected_sfd);
}
}

+ 1
- 1
daemon/t38.c View File

@ -416,7 +416,7 @@ int t38_gateway_pair(struct call_media *t38_media, struct call_media *pcm_media,
goto err;
media_player_new(&tg->pcm_player, pcm_media->monologue,
(pcm_media->streams.length ? pcm_media->streams.head->data->ssrc_out[0] : NULL),
call_get_first_ssrc(&pcm_media->ssrc_hash_out),
NULL);
// even though we call media_player_set_media() here, we need to call it again in
// t38_gateway_start because our sink might not have any streams added here yet,


+ 0
- 5
include/call.h View File

@ -439,10 +439,6 @@ struct packet_stream {
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_entry_call *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
int64_t kernel_time_us;
@ -898,7 +894,6 @@ enum thread_looper_action call_timer(void);
void __rtp_stats_update(rtp_stats_ht dst, struct codec_store *);
bool __init_stream(struct packet_stream *ps);
void call_stream_crypto_reset(struct packet_stream *ps);
const rtp_payload_type *__rtp_stats_codec(struct call_media *m);


+ 4
- 0
include/ssrc.h View File

@ -202,6 +202,10 @@ INLINE void *get_ssrc(uint32_t ssrc, struct ssrc_hash *ht) {
return get_ssrc_full(ssrc, ht, NULL);
}
INLINE struct ssrc_entry_call *call_get_first_ssrc(struct ssrc_hash *ht) {
return ht->nq.head ? ht->nq.head->data : NULL;
}
void ssrc_sender_report(struct call_media *, const struct ssrc_sender_report *, int64_t);
void ssrc_receiver_report(struct call_media *, stream_fd *, const struct ssrc_receiver_report *, int64_t);
void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *rr, int64_t);


+ 8
- 18
t/auto-daemon-tests.pl View File

@ -2021,7 +2021,6 @@ $resp = rtpe_req('query', 'unsolicited to-tag w/ via-branch', { });
Test2::Tools::Compare::like($resp, {
'result' => 'ok',
'last redis update' => '0',
'SSRC' => {},
'last signal' => qr//,
'tags' => {
ft() => {
@ -2034,17 +2033,17 @@ Test2::Tools::Compare::like($resp, {
],
'medias' => [
{
'ingress SSRCs' => [],
'egress SSRCs' => [],
'index' => '1',
'streams' => [
{
'last user packet' => qr//,
'egress SSRCs' => [],
'advertised endpoint' => {
'address' => '198.51.100.1',
'family' => 'IPv4',
'port' => '3000'
},
'ingress SSRCs' => [],
'flags' => [
'RTP',
'filled'
@ -2071,13 +2070,11 @@ Test2::Tools::Compare::like($resp, {
'last packet' => qr//
},
{
'egress SSRCs' => [],
'advertised endpoint' => {
'address' => '198.51.100.1',
'family' => 'IPv4',
'port' => '3001'
},
'ingress SSRCs' => [],
'flags' => [
'RTCP',
'filled'
@ -2127,6 +2124,8 @@ Test2::Tools::Compare::like($resp, {
'tag' => tt(),
'medias' => [
{
'ingress SSRCs' => [],
'egress SSRCs' => [],
'streams' => [
{
'stats_out' => {
@ -2134,8 +2133,6 @@ Test2::Tools::Compare::like($resp, {
'packets' => '0',
'errors' => '0'
},
'egress SSRCs' => [],
'ingress SSRCs' => [],
'advertised endpoint' => {
'port' => '4000',
'family' => 'IPv4',
@ -2182,13 +2179,11 @@ Test2::Tools::Compare::like($resp, {
'RTCP',
'filled'
],
'ingress SSRCs' => [],
'advertised endpoint' => {
'port' => '4001',
'address' => '198.51.100.1',
'family' => 'IPv4'
},
'egress SSRCs' => [],
'last user packet' => qr//,
'stats_out' => {
'bytes' => '0',
@ -2311,7 +2306,6 @@ $resp = rtpe_req('query', 'unsolicited to-tag w/ via-branch', { });
Test2::Tools::Compare::like($resp, {
'result' => 'ok',
'last redis update' => '0',
'SSRC' => {},
'last signal' => qr//,
'tags' => {
ft() => {
@ -2324,17 +2318,17 @@ Test2::Tools::Compare::like($resp, {
],
'medias' => [
{
'ingress SSRCs' => [],
'egress SSRCs' => [],
'index' => '1',
'streams' => [
{
'last user packet' => qr//,
'egress SSRCs' => [],
'advertised endpoint' => {
'address' => '198.51.100.1',
'family' => 'IPv4',
'port' => '3000'
},
'ingress SSRCs' => [],
'flags' => [
'RTP',
'filled'
@ -2361,13 +2355,11 @@ Test2::Tools::Compare::like($resp, {
'last packet' => qr//
},
{
'egress SSRCs' => [],
'advertised endpoint' => {
'address' => '198.51.100.1',
'family' => 'IPv4',
'port' => '3001'
},
'ingress SSRCs' => [],
'flags' => [
'RTCP',
'filled'
@ -2417,6 +2409,8 @@ Test2::Tools::Compare::like($resp, {
'tag' => tt(),
'medias' => [
{
'ingress SSRCs' => [],
'egress SSRCs' => [],
'streams' => [
{
'stats_out' => {
@ -2424,8 +2418,6 @@ Test2::Tools::Compare::like($resp, {
'packets' => '0',
'errors' => '0'
},
'egress SSRCs' => [],
'ingress SSRCs' => [],
'advertised endpoint' => {
'port' => '4000',
'family' => 'IPv4',
@ -2472,13 +2464,11 @@ Test2::Tools::Compare::like($resp, {
'RTCP',
'filled'
],
'ingress SSRCs' => [],
'advertised endpoint' => {
'port' => '4001',
'address' => '198.51.100.1',
'family' => 'IPv4'
},
'egress SSRCs' => [],
'last user packet' => qr//,
'stats_out' => {
'bytes' => '0',


Loading…
Cancel
Save