Browse Source

TT#145450 add support for tracking multiple SSRCs per stream

Change-Id: I885fbf7973133af8d7c9184eb0e75f5401309c8e
pull/1388/head
Richard Fuchs 4 years ago
parent
commit
24f607752c
13 changed files with 310 additions and 188 deletions
  1. +46
    -26
      daemon/call.c
  2. +3
    -2
      daemon/call_interfaces.c
  3. +1
    -1
      daemon/cli.c
  4. +20
    -10
      daemon/codec.c
  5. +1
    -1
      daemon/dtmf.c
  6. +2
    -4
      daemon/kernel.c
  7. +126
    -94
      daemon/media_socket.c
  8. +4
    -4
      daemon/mqtt.c
  9. +5
    -2
      include/call.h
  10. +2
    -1
      include/kernel.h
  11. +5
    -0
      include/media_socket.h
  12. +89
    -38
      kernel-module/xt_RTPENGINE.c
  13. +6
    -5
      kernel-module/xt_RTPENGINE.h

+ 46
- 26
daemon/call.c View File

@ -547,7 +547,7 @@ void call_timer(void *ptr) {
GList *i, *l;
struct rtpengine_list_entry *ke;
struct packet_stream *ps;
int j, update;
int j;
struct stream_fd *sfd;
struct rtp_stats *rs;
unsigned int pt;
@ -639,7 +639,7 @@ void call_timer(void *ptr) {
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
}
update = 0;
bool update = false;
if (diff_packets)
sfd->call->foreign_media = 0;
@ -656,37 +656,49 @@ void call_timer(void *ptr) {
struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx];
mutex_lock(&sink->out_lock);
if (sink->crypto.params.crypto_suite && sink->ssrc_out
&& ntohl(ke->target.ssrc) == sink->ssrc_out->parent->h.ssrc
&& o->encrypt.last_index - sink->ssrc_out->srtp_index > 0x4000)
{
sink->ssrc_out->srtp_index = o->encrypt.last_index;
update = 1;
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]),
sink->ssrc_out, 0);
if (!ctx)
continue;
if (sink->crypto.params.crypto_suite
&& o->encrypt.last_index[u] - ctx->srtp_index > 0x4000)
{
ctx->srtp_index = o->encrypt.last_index[u];
update = true;
}
}
mutex_unlock(&sink->out_lock);
}
mutex_lock(&ps->in_lock);
if (ps->ssrc_in && ntohl(ke->target.ssrc) == ps->ssrc_in->parent->h.ssrc) {
atomic64_add(&ps->ssrc_in->octets, diff_bytes);
atomic64_add(&ps->ssrc_in->packets, diff_packets);
atomic64_set(&ps->ssrc_in->last_seq, ke->target.decrypt.last_index);
ps->ssrc_in->srtp_index = ke->target.decrypt.last_index;
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]),
ps->ssrc_in, 0);
if (!ctx)
continue;
atomic64_add(&ctx->octets, diff_bytes);
atomic64_add(&ctx->packets, diff_packets);
atomic64_set(&ctx->last_seq, ke->target.decrypt.last_index[u]);
ctx->srtp_index = ke->target.decrypt.last_index[u];
if (sfd->crypto.params.crypto_suite
&& ke->target.decrypt.last_index
- ps->ssrc_in->srtp_index > 0x4000)
update = 1;
&& ke->target.decrypt.last_index[u]
- ctx->srtp_index > 0x4000)
update = true;
}
mutex_unlock(&ps->in_lock);
}
rwlock_unlock_r(&sfd->call->master_lock);
if (update) {
redis_update_onekey(ps->call, rtpe_redis_write);
}
if (update)
redis_update_onekey(ps->call, rtpe_redis_write);
next:
g_hash_table_remove(hlp.addr_sfd, &ep);
@ -1062,13 +1074,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) {
crypto_reset(&ps->crypto);
mutex_lock(&ps->in_lock);
if (ps->ssrc_in)
ps->ssrc_in->srtp_index = 0;
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
if (!ps->ssrc_in[u]) // end of list
break;
ps->ssrc_in[u]->srtp_index = 0;
}
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
if (ps->ssrc_out)
ps->ssrc_out->srtp_index = 0;
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ps->ssrc_out[u]->srtp_index = 0;
}
mutex_unlock(&ps->out_lock);
}
@ -3215,7 +3233,7 @@ void call_destroy(struct call *c) {
(unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
FMT_M(addr, ps->endpoint.port),
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
FMT_M(ps->ssrc_in ? ps->ssrc_in->parent->h.ssrc : 0),
FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0),
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes),
atomic64_get(&ps->stats.errors),
@ -3361,8 +3379,10 @@ static void __call_free(void *p) {
crypto_cleanup(&ps->crypto);
g_queue_clear(&ps->sfds);
g_hash_table_destroy(ps->rtp_stats);
ssrc_ctx_put(&ps->ssrc_in);
ssrc_ctx_put(&ps->ssrc_out);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++)
ssrc_ctx_put(&ps->ssrc_in[u]);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++)
ssrc_ctx_put(&ps->ssrc_out[u]);
g_slice_free1(sizeof(*ps), ps);
}


+ 3
- 2
daemon/call_interfaces.c View File

@ -1672,8 +1672,9 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
BF_PS("media handover", MEDIA_HANDOVER);
BF_PS("ICE", ICE);
if (ps->ssrc_in)
bencode_dictionary_add_integer(dict, "SSRC", ps->ssrc_in->parent->h.ssrc);
// XXX convert to list output?
if (ps->ssrc_in[0])
bencode_dictionary_add_integer(dict, "SSRC", ps->ssrc_in[0]->parent->h.ssrc);
stats:
if (totals->last_packet < atomic64_get(&ps->last_packet))


+ 1
- 1
daemon/cli.c View File

@ -618,7 +618,7 @@ static void cli_incoming_list_callid(str *instr, struct cli_writer *cw) {
sockaddr_print_buf(&ps->endpoint.address),
ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
ps->ssrc_in ? ps->ssrc_in->parent->h.ssrc : 0,
ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0,
atomic64_get(&ps->stats.packets),
atomic64_get(&ps->stats.bytes), atomic64_get(&ps->stats.errors),
atomic64_get(&ps->last_packet));


+ 20
- 10
daemon/codec.c View File

@ -807,23 +807,33 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
rwlock_lock_r(&rt->call->master_lock);
struct ssrc_ctx *ssrc_out = NULL;
// copy out references to SSRCs for lock-free handling
struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
ssrc_out = ps->ssrc_out;
if (ssrc_out)
obj_hold(&ssrc_out->parent->h);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ssrc_out[u] = ps->ssrc_out[u];
ssrc_ctx_hold(ssrc_out[u]);
}
mutex_unlock(&ps->out_lock);
}
if (ssrc_out)
rtcp_send_report(media, ssrc_out);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ssrc_out[u]) // end of list
break;
rtcp_send_report(media, ssrc_out[u]);
}
rwlock_unlock_r(&rt->call->master_lock);
if (ssrc_out)
obj_put(&ssrc_out->parent->h);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ssrc_out[u]) // end of list
break;
ssrc_ctx_put(&ssrc_out[u]);
}
out:
log_info_clear();
@ -2174,8 +2184,8 @@ static void __dtx_send_later(struct codec_timer *ct) {
struct codec_ssrc_handler *input_ch = (dtxp && dtxp->input_handler) ? obj_get(&dtxp->input_handler->h) : NULL;
struct call *call = dtxb->call ? obj_get(dtxb->call) : NULL;
if (!call || !ch || !ps || !ps->ssrc_in
|| dtxb->ssrc != ps->ssrc_in->parent->h.ssrc
if (!call || !ch || !ps || !ps->ssrc_in[0]
|| dtxb->ssrc != ps->ssrc_in[0]->parent->h.ssrc
|| dtxb->ct.next.tv_sec == 0) {
// shut down or SSRC change
ilogs(dtx, LOG_DEBUG, "DTX buffer for %lx has been shut down", (unsigned long) dtxb->ssrc);


+ 1
- 1
daemon/dtmf.c View File

@ -314,7 +314,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura
if (!media->streams.head)
return "Media doesn't have an RTP stream";
struct packet_stream *ps = media->streams.head->data;
struct ssrc_ctx *ssrc_in = ps->ssrc_in;
struct ssrc_ctx *ssrc_in = ps->ssrc_in[0];
if (!ssrc_in)
return "No SSRC context present for DTMF injection"; // XXX fall back to generating stream


+ 2
- 4
daemon/kernel.c View File

@ -262,7 +262,7 @@ unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id)
return msg.u.stream.stream_idx;
}
int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out) {
int kernel_update_stats(const struct re_address *a, struct rtpengine_stats_info *out) {
struct rtpengine_message msg;
int ret;
@ -278,10 +278,8 @@ int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpeng
ilog(LOG_ERROR, "Failed to get stream stats from kernel: %s", strerror(errno));
return -1;
}
if (msg.u.stats.ssrc != ssrc)
return -1;
*out = msg.u.stats.ssrc_stats;
*out = msg.u.stats;
return 0;
}

+ 126
- 94
daemon/media_socket.c View File

@ -1066,7 +1066,9 @@ static int __k_null(struct rtpengine_srtp *s, struct packet_stream *stream) {
*s = __res_null;
return 0;
}
static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c,
struct ssrc_ctx *ssrc_ctx[RTPE_NUM_SSRC_TRACKING])
{
if (!c->params.crypto_suite)
return -1;
@ -1074,9 +1076,10 @@ static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c, st
.cipher = c->params.crypto_suite->kernel_cipher,
.hmac = c->params.crypto_suite->kernel_hmac,
.mki_len = c->params.mki_len,
.last_index = ssrc_ctx ? ssrc_ctx->srtp_index : 0,
.auth_tag_len = c->params.crypto_suite->srtp_auth_tag,
};
for (unsigned int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++)
s->last_index[i] = ssrc_ctx[i] ? ssrc_ctx[i]->srtp_index : 0;
if (c->params.mki_len)
memcpy(s->mki, c->params.mki, c->params.mki_len);
memcpy(s->master_key, c->params.master_key, c->params.crypto_suite->master_key_len);
@ -1202,11 +1205,12 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
if (!reti->decrypt.cipher || !reti->decrypt.hmac)
return "decryption cipher or HMAC not supported by kernel module";
if (stream->ssrc_in) {
reti->ssrc = htonl(stream->ssrc_in->parent->h.ssrc);
if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO))
reti->transcoding = 1;
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (stream->ssrc_in[u])
reti->ssrc[u] = htonl(stream->ssrc_in[u]->parent->h.ssrc);
}
if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO))
reti->transcoding = 1;
ZERO(stream->kernel_stats);
@ -1279,8 +1283,12 @@ output:
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
if (stream->ssrc_in && reti->transcoding)
redi->output.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out);
if (reti->transcoding) {
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (stream->ssrc_in[u])
redi->output.ssrc_out[u] = htonl(stream->ssrc_in[u]->ssrc_map_out);
}
}
handler->out->kernel(&redi->output.encrypt, sink);
@ -1376,6 +1384,31 @@ no_kernel:
PS_SET(stream, NO_KERNEL_SUPPORT);
}
// must be called with appropriate locks (master lock and/or in/out_lock)
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx)
{
for (unsigned int v = 0; v < RTPE_NUM_SSRC_TRACKING; v++) {
// starting point is the same offset as `u`
unsigned int idx = (start_idx + v) % RTPE_NUM_SSRC_TRACKING;
if (!list[idx])
continue;
if (list[idx]->parent->h.ssrc != ssrc)
continue;
return idx;
}
return -1;
}
// must be called with appropriate locks (master lock and/or in/out_lock)
struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx)
{
int idx = __hunt_ssrc_ctx_idx(ssrc, list, start_idx);
if (idx == -1)
return NULL;
return list[idx];
}
// must be called with appropriate locks (master lock and/or in_lock)
static void __stream_update_stats(struct packet_stream *ps, int have_in_lock) {
struct re_address local;
@ -1383,57 +1416,52 @@ static void __stream_update_stats(struct packet_stream *ps, int have_in_lock) {
if (!have_in_lock)
mutex_lock(&ps->in_lock);
struct ssrc_ctx *ssrc_ctx = ps->ssrc_in;
if (!ssrc_ctx) {
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
return;
}
struct ssrc_entry_call *parent = ssrc_ctx->parent;
__re_address_translate_ep(&local, &ps->selected_sfd->socket.local);
struct rtpengine_ssrc_stats stats;
if (kernel_update_stats(&local, htonl(parent->h.ssrc), &stats)) {
struct rtpengine_stats_info stats_info;
if (kernel_update_stats(&local, &stats_info)) {
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
return;
}
if (!stats.basic_stats.packets) {
// no change
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
return;
}
for (unsigned int u = 0; u < G_N_ELEMENTS(stats_info.ssrc); u++) {
// check for the right SSRC association
if (!stats_info.ssrc[u]) // end of list
break;
struct ssrc_ctx *ssrc_ctx = __hunt_ssrc_ctx(ntohl(stats_info.ssrc[u]),
ps->ssrc_in, u);
if (!ssrc_ctx)
continue;
struct ssrc_entry_call *parent = ssrc_ctx->parent;
atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes);
parent->packets_lost += stats.total_lost; // XXX should be atomic?
atomic64_set(&ssrc_ctx->last_seq, stats.ext_seq);
atomic64_set(&ssrc_ctx->last_ts, stats.timestamp);
parent->jitter = stats.jitter;
if (!stats_info.ssrc_stats[u].basic_stats.packets) // no change
continue;
uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out;
atomic64_add(&ssrc_ctx->packets, stats_info.ssrc_stats[u].basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats_info.ssrc_stats[u].basic_stats.bytes);
parent->packets_lost += stats_info.ssrc_stats[u].total_lost; // XXX should be atomic?
atomic64_set(&ssrc_ctx->last_seq, stats_info.ssrc_stats[u].ext_seq);
atomic64_set(&ssrc_ctx->last_ts, stats_info.ssrc_stats[u].timestamp);
parent->jitter = stats_info.ssrc_stats[u].jitter;
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out;
// update opposite outgoing SSRC
if (!have_in_lock)
mutex_lock(&ps->out_lock);
else {
// update opposite outgoing SSRC
if (mutex_trylock(&ps->out_lock))
return; // will have to skip this
}
ssrc_ctx = ps->ssrc_out;
if (ssrc_ctx) {
parent = ssrc_ctx->parent;
if (parent->h.ssrc == ssrc_map_out) {
atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes);
continue; // will have to skip this
ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, ps->ssrc_out, u);
if (ssrc_ctx) {
parent = ssrc_ctx->parent;
atomic64_add(&ssrc_ctx->packets, stats_info.ssrc_stats[u].basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats_info.ssrc_stats[u].basic_stats.bytes);
}
mutex_unlock(&ps->out_lock);
}
mutex_unlock(&ps->out_lock);
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
}
@ -1611,68 +1639,72 @@ noop:
}
// check and update input SSRC pointers
static bool __stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
static bool __stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, mutex_t *lock,
struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p,
uint32_t output_ssrc,
struct ssrc_ctx **output, struct ssrc_hash *ssrc_hash, enum ssrc_dir dir, const char *label)
{
uint32_t in_ssrc = ntohl(ssrc_bs);
int changed = false;
mutex_lock(&in_srtp->in_lock);
(*ssrc_in_p) = in_srtp->ssrc_in;
ssrc_ctx_hold(*ssrc_in_p);
if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != in_ssrc)) {
// SSRC mismatch - get the new entry
ssrc_ctx_put(ssrc_in_p);
ssrc_ctx_put(&in_srtp->ssrc_in);
(*ssrc_in_p) = in_srtp->ssrc_in =
get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT, in_srtp->media->monologue);
ssrc_ctx_hold(in_srtp->ssrc_in);
mutex_lock(lock);
int ctx_idx = __hunt_ssrc_ctx_idx(ssrc, list, 0);
if (ctx_idx == -1) {
// SSRC mismatch - get the new entry:
// move to next slot
ctx_idx = (*ctx_idx_p + 1) % RTPE_NUM_SSRC_TRACKING;
*ctx_idx_p = ctx_idx;
// eject old entry if present
if (list[ctx_idx])
ssrc_ctx_put(&list[ctx_idx]);
// get new entry
list[ctx_idx] =
get_ssrc_ctx(ssrc, ssrc_hash, dir, ps->media->monologue);
changed = true;
ilog(LOG_DEBUG, "Ingress SSRC changed for: %s%s:%d new: %x%s",
FMT_M(sockaddr_print_buf(&in_srtp->endpoint.address), in_srtp->endpoint.port, in_ssrc));
ilog(LOG_DEBUG, "New %s SSRC for: %s%s:%d SSRC: %x%s", label,
FMT_M(sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, ssrc));
}
if (ctx_idx != 0) {
// move most recent entry to front of the list
struct ssrc_ctx *tmp = list[0];
list[0] = list[ctx_idx];
list[ctx_idx] = tmp;
ctx_idx = 0;
}
// extract and hold entry (ctx_idx == 0)
*output = list[0];
ssrc_ctx_hold(*output);
// make sure we reset the output SSRC if we're not transcoding
if (!MEDIA_ISSET(in_srtp->media, TRANSCODE) && !MEDIA_ISSET(in_srtp->media, ECHO))
(*ssrc_in_p)->ssrc_map_out = in_ssrc;
// reverse SSRC mapping
if (!output_ssrc) {
// make sure we reset the output SSRC if we're not transcoding
if (!MEDIA_ISSET(ps->media, TRANSCODE) && !MEDIA_ISSET(ps->media, ECHO))
(*output)->ssrc_map_out = ssrc;
}
else {
(*output)->ssrc_map_out = output_ssrc;
}
mutex_unlock(&in_srtp->in_lock);
mutex_unlock(lock);
return changed;
}
// check and update input SSRC pointers
static bool __stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress");
}
// check and update output SSRC pointers
static bool __stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ssrc_bs,
struct ssrc_ctx *ssrc_in, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash)
{
uint32_t in_ssrc = ntohl(ssrc_bs);
uint32_t out_ssrc;
bool changed = false;
out_ssrc = ssrc_in->ssrc_map_out;
mutex_lock(&out_srtp->out_lock);
(*ssrc_out_p) = out_srtp->ssrc_out;
ssrc_ctx_hold(*ssrc_out_p);
if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != out_ssrc)) {
// SSRC mismatch - get the new entry
ssrc_ctx_put(ssrc_out_p);
ssrc_ctx_put(&out_srtp->ssrc_out);
(*ssrc_out_p) = out_srtp->ssrc_out =
get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT, out_srtp->media->monologue);
ssrc_ctx_hold(out_srtp->ssrc_out);
changed = 1;
ilog(LOG_DEBUG, "Egress SSRC changed for %s%s:%d new: %x%s",
FMT_M(sockaddr_print_buf(&out_srtp->endpoint.address), out_srtp->endpoint.port, out_ssrc));
}
// reverse SSRC mapping
(*ssrc_out_p)->ssrc_map_out = in_ssrc;
mutex_unlock(&out_srtp->out_lock);
return changed;
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress");
}


+ 4
- 4
daemon/mqtt.c View File

@ -284,10 +284,10 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_add_int_value(json, sfd->socket.local.port);
}
if (ps->ssrc_in) {
if (ps->ssrc_in[0]) {
json_builder_set_member_name(json, "ingress");
json_builder_begin_object(json);
mqtt_ssrc_stats(ps->ssrc_in, json, ps->media);
mqtt_ssrc_stats(ps->ssrc_in[0], json, ps->media);
json_builder_end_object(json);
}
@ -295,10 +295,10 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->out_lock);
if (ps->ssrc_out) {
if (ps->ssrc_out[0]) {
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_ssrc_stats(ps->ssrc_out, json, ps->media);
mqtt_ssrc_stats(ps->ssrc_out[0], json, ps->media);
json_builder_end_object(json);
}


+ 5
- 2
include/call.h View File

@ -28,6 +28,7 @@
#include "statistics.h"
#include "codeclib.h"
#include "t38.h"
#include "xt_RTPENGINE.h"
#define UNDEFINED ((unsigned int) -1)
@ -315,8 +316,10 @@ struct packet_stream {
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */
*ssrc_out; /* LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, // LOCK: in_lock
ssrc_out_idx; // LOCK: out_lock
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */


+ 2
- 1
include/kernel.h View File

@ -6,6 +6,7 @@
#include <sys/types.h>
#include <glib.h>
#include <netinet/in.h>
#include "xt_RTPENGINE.h"
@ -38,7 +39,7 @@ int kernel_add_stream(struct rtpengine_target_info *);
int kernel_add_destination(struct rtpengine_destination_info *);
int kernel_del_stream(const struct re_address *);
GList *kernel_list(void);
int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out);
int kernel_update_stats(const struct re_address *a, struct rtpengine_stats_info *out);
unsigned int kernel_add_call(const char *id);
int kernel_del_call(unsigned int);


+ 5
- 0
include/media_socket.h View File

@ -11,6 +11,7 @@
#include "dtls.h"
#include "crypto.h"
#include "socket.h"
#include "xt_RTPENGINE.h"
@ -185,6 +186,10 @@ void __stream_unconfirm(struct packet_stream *);
void __reset_sink_handlers(struct packet_stream *);
void media_update_stats(struct call_media *m);
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx);
struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx);
void media_packet_copy(struct media_packet *, const struct media_packet *);
void media_packet_release(struct media_packet *);


+ 89
- 38
kernel-module/xt_RTPENGINE.c View File

@ -270,7 +270,7 @@ struct re_crypto_context {
unsigned char session_key[32];
unsigned char session_salt[14];
unsigned char session_auth_key[20];
uint32_t roc;
uint32_t roc[RTPE_NUM_SSRC_TRACKING];
struct crypto_cipher *tfm[2];
struct crypto_shash *shash;
struct crypto_aead *aead;
@ -304,7 +304,7 @@ struct rtpengine_target {
struct rtpengine_stats_a stats;
struct rtpengine_rtp_stats_a rtp_stats[RTPE_NUM_PAYLOAD_TYPES];
spinlock_t ssrc_stats_lock;
struct rtpengine_ssrc_stats ssrc_stats;
struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING];
struct re_crypto_context decrypt;
@ -1427,7 +1427,8 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
}
spin_lock_irqsave(&g->decrypt.lock, flags);
opp->target.decrypt.last_index = g->target.decrypt.last_index;
for (i = 0; i < ARRAY_SIZE(opp->target.decrypt.last_index); i++)
opp->target.decrypt.last_index[i] = g->target.decrypt.last_index[i];
spin_unlock_irqrestore(&g->decrypt.lock, flags);
_r_lock(&g->outputs_lock, flags);
@ -1588,7 +1589,14 @@ static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context
seq_printf(f, "%02x", c->session_auth_key[i]);
seq_printf(f, "\n");
seq_printf(f, " ROC: %u\n", (unsigned int) c->roc);
seq_printf(f, " ROC:");
for (i = 0; i < ARRAY_SIZE(c->roc); i++) {
if (i == 0)
seq_printf(f, " %u", (unsigned int) c->roc[i]);
else
seq_printf(f, ", %u", (unsigned int) c->roc[i]);
}
seq_printf(f, "\n");
if (s->mki_len)
seq_printf(f, " MKI: length %u, %02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x...\n",
@ -1639,8 +1647,18 @@ static int proc_list_show(struct seq_file *f, void *v) {
seq_printf(f, " %u bytes replacement payload\n",
g->target.payload_types[i].replace_pattern_len);
}
if (g->target.ssrc)
seq_printf(f, " SSRC in: %lx\n", (unsigned long) ntohl(g->target.ssrc));
seq_printf(f, " SSRC in:");
for (i = 0; i < ARRAY_SIZE(g->target.ssrc); i++) {
if (!g->target.ssrc[i])
break;
if (i == 0)
seq_printf(f, " %lx", (unsigned long) ntohl(g->target.ssrc[i]));
else
seq_printf(f, ", %lx", (unsigned long) ntohl(g->target.ssrc[i]));
}
seq_printf(f, "\n");
proc_list_crypto_print(f, &g->decrypt, &g->target.decrypt, "decryption");
if (g->target.rtcp_mux)
seq_printf(f, " option: rtcp-mux\n");
@ -1662,8 +1680,18 @@ static int proc_list_show(struct seq_file *f, void *v) {
seq_printf(f, " output #%u\n", i);
proc_list_addr_print(f, "src", &o->output.src_addr);
proc_list_addr_print(f, "dst", &o->output.dst_addr);
if (o->output.ssrc_out)
seq_printf(f, " SSRC out: %lx\n", (unsigned long) ntohl(o->output.ssrc_out));
seq_printf(f, " SSRC out:");
for (i = 0; i < ARRAY_SIZE(o->output.ssrc_out); i++) {
if (!o->output.ssrc_out[i])
break;
if (i == 0)
seq_printf(f, " %lx", (unsigned long) ntohl(o->output.ssrc_out[i]));
else
seq_printf(f, ", %lx", (unsigned long) ntohl(o->output.ssrc_out[i]));
}
seq_printf(f, "\n");
proc_list_crypto_print(f, &o->encrypt, &o->output.encrypt, "encryption");
}
@ -1753,19 +1781,23 @@ static struct re_dest_addr *find_dest_addr(const struct re_dest_addr_hash *h, co
static int table_get_target_stats(struct rtpengine_table *t, struct rtpengine_stats_info *i, int reset) {
struct rtpengine_target *g;
unsigned int u;
g = get_target(t, &i->local);
if (!g)
return -ENOENT;
i->ssrc = g->target.ssrc;
spin_lock(&g->ssrc_stats_lock);
i->ssrc_stats = g->ssrc_stats;
if (reset) {
g->ssrc_stats.basic_stats.packets = 0;
g->ssrc_stats.basic_stats.bytes = 0;
g->ssrc_stats.total_lost = 0;
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
i->ssrc[u] = g->target.ssrc[u];
i->ssrc_stats[u] = g->ssrc_stats[u];
if (reset) {
g->ssrc_stats[u].basic_stats.packets = 0;
g->ssrc_stats[u].basic_stats.bytes = 0;
g->ssrc_stats[u].total_lost = 0;
}
}
spin_unlock(&g->ssrc_stats_lock);
@ -2243,6 +2275,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
struct rtpengine_target *og = NULL;
int err;
unsigned long flags;
unsigned int u;
/* validation */
@ -2276,7 +2309,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
memcpy(&g->target, i, sizeof(*i));
crypto_context_init(&g->decrypt, &g->target.decrypt);
spin_lock_init(&g->ssrc_stats_lock);
g->ssrc_stats.lost_bits = -1;
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++)
g->ssrc_stats[u].lost_bits = -1;
rwlock_init(&g->outputs_lock);
if (i->num_destinations) {
@ -3777,7 +3811,7 @@ error:
/* XXX shared code */
static uint64_t packet_index(struct re_crypto_context *c,
struct rtpengine_srtp *s, struct rtp_header *rtp)
struct rtpengine_srtp *s, struct rtp_header *rtp, int ssrc_idx)
{
uint16_t seq;
uint64_t index;
@ -3786,17 +3820,20 @@ static uint64_t packet_index(struct re_crypto_context *c,
uint32_t roc;
uint32_t v;
if (ssrc_idx == -1)
ssrc_idx = 0;
seq = ntohs(rtp->seq_num);
spin_lock_irqsave(&c->lock, flags);
/* rfc 3711 section 3.3.1 */
if (unlikely(!s->last_index))
s->last_index = seq;
if (unlikely(!s->last_index[ssrc_idx]))
s->last_index[ssrc_idx] = seq;
/* rfc 3711 appendix A, modified, and sections 3.3 and 3.3.1 */
s_l = (s->last_index & 0x00000000ffffULL);
roc = (s->last_index & 0xffffffff0000ULL) >> 16;
s_l = (s->last_index[ssrc_idx] & 0x00000000ffffULL);
roc = (s->last_index[ssrc_idx] & 0xffffffff0000ULL) >> 16;
v = 0;
if (s_l < 0x8000) {
@ -3812,8 +3849,8 @@ static uint64_t packet_index(struct re_crypto_context *c,
}
index = (v << 16) | seq;
s->last_index = index;
c->roc = v;
s->last_index[ssrc_idx] = index;
c->roc[ssrc_idx] = v;
spin_unlock_irqrestore(&c->lock, flags);
@ -3821,13 +3858,16 @@ static uint64_t packet_index(struct re_crypto_context *c,
}
static void update_packet_index(struct re_crypto_context *c,
struct rtpengine_srtp *s, uint64_t idx)
struct rtpengine_srtp *s, uint64_t idx, int ssrc_idx)
{
unsigned long flags;
if (ssrc_idx == -1)
ssrc_idx = 0;
spin_lock_irqsave(&c->lock, flags);
s->last_index = idx;
c->roc = (idx >> 16);
s->last_index[ssrc_idx] = idx;
c->roc[ssrc_idx] = (idx >> 16);
spin_unlock_irqrestore(&c->lock, flags);
}
@ -3919,7 +3959,7 @@ static int srtp_authenticate(struct re_crypto_context *c,
static int srtp_auth_validate(struct re_crypto_context *c,
struct rtpengine_srtp *s, struct rtp_parsed *r,
uint64_t *pkt_idx_p)
uint64_t *pkt_idx_p, int ssrc_idx)
{
unsigned char *auth_tag;
unsigned char hmac[20];
@ -3981,7 +4021,7 @@ static int srtp_auth_validate(struct re_crypto_context *c,
ok_update:
*pkt_idx_p = pkt_idx;
update_packet_index(c, s, pkt_idx);
update_packet_index(c, s, pkt_idx, ssrc_idx);
ok:
return 0;
}
@ -4257,9 +4297,11 @@ static struct sk_buff *intercept_skb_copy(struct sk_buff *oskb, const struct re_
static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 arrival_time, int pt_idx) {
static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 arrival_time, int pt_idx,
int ssrc_idx)
{
unsigned long flags;
struct rtpengine_ssrc_stats *s = &g->ssrc_stats;
struct rtpengine_ssrc_stats *s = &g->ssrc_stats[ssrc_idx];
uint16_t old_seq_trunc;
uint32_t last_seq;
uint16_t seq_diff;
@ -4351,6 +4393,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
int err;
int error_nf_action = XT_CONTINUE;
int rtp_pt_idx = -2;
int ssrc_idx = -1;
unsigned int datalen, pllen;
uint32_t *u32;
struct rtp_parsed rtp, rtp2;
@ -4453,13 +4496,21 @@ src_check_ok:
rtp_pt_idx = rtp_payload_type(rtp.header, &g->target, &g->last_pt);
// Pass to userspace if SSRC has changed.
errstr = "SSRC mismatch";
if (unlikely((g->target.ssrc) && (g->target.ssrc != rtp.header->ssrc)))
// Look for matching SSRC index if any SSRC were given
if (likely(g->target.ssrc[0])) {
errstr = "SSRC mismatch";
for (ssrc_idx = 0; ssrc_idx < RTPE_NUM_SSRC_TRACKING; ssrc_idx++) {
if (g->target.ssrc[ssrc_idx] == rtp.header->ssrc)
goto found_ssrc;
}
ssrc_idx = -1;
goto skip_error;
found_ssrc:;
}
pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header);
pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header, ssrc_idx);
errstr = "SRTP authentication tag mismatch";
if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, &pkt_idx))
if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, &pkt_idx, ssrc_idx))
goto skip_error;
// if RTP, only forward packets of known/passthrough payload types
@ -4472,8 +4523,8 @@ src_check_ok:
skb_trim(skb, rtp.header_len + rtp.payload_len);
if (g->target.rtp_stats)
rtp_stats(g, &rtp, ktime_to_us(skb->tstamp), rtp_pt_idx);
if (g->target.rtp_stats && ssrc_idx != -1)
rtp_stats(g, &rtp, ktime_to_us(skb->tstamp), rtp_pt_idx, ssrc_idx);
DBG("packet payload decrypted as %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x...\n",
rtp.payload[0], rtp.payload[1], rtp.payload[2], rtp.payload[3],
@ -4540,10 +4591,10 @@ no_intercept:
if (rtp2.ok) {
// SSRC substitution
if (g->target.transcoding && o->output.ssrc_out)
rtp2.header->ssrc = o->output.ssrc_out;
if (g->target.transcoding && o->output.ssrc_out && ssrc_idx != -1)
rtp2.header->ssrc = o->output.ssrc_out[ssrc_idx];
pkt_idx = packet_index(&o->encrypt, &o->output.encrypt, rtp2.header);
pkt_idx = packet_index(&o->encrypt, &o->output.encrypt, rtp2.header, ssrc_idx);
pllen = rtp2.payload_len;
srtp_encrypt(&o->encrypt, &o->output.encrypt, &rtp2, pkt_idx);
srtp_authenticate(&o->encrypt, &o->output.encrypt, &rtp2, pkt_idx);


+ 6
- 5
kernel-module/xt_RTPENGINE.h View File

@ -5,6 +5,7 @@
#define RTPE_NUM_PAYLOAD_TYPES 32
#define RTPE_MAX_FORWARD_DESTINATIONS 32
#define RTPE_NUM_SSRC_TRACKING 4
@ -80,7 +81,7 @@ struct rtpengine_srtp {
unsigned int session_key_len;
unsigned int session_salt_len;
unsigned char mki[256]; /* XXX uses too much memory? */
uint64_t last_index;
uint64_t last_index[RTPE_NUM_SSRC_TRACKING];
unsigned int auth_tag_len; /* in bytes */
unsigned int mki_len;
};
@ -107,7 +108,7 @@ struct rtpengine_target_info {
unsigned int intercept_stream_idx;
struct rtpengine_srtp decrypt;
uint32_t ssrc; // Expose the SSRC to userspace when we resync.
uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // Expose the SSRC to userspace when we resync.
struct rtpengine_payload_type payload_types[RTPE_NUM_PAYLOAD_TYPES]; /* must be sorted */
unsigned int num_payload_types;
@ -129,7 +130,7 @@ struct rtpengine_output_info {
struct re_address dst_addr;
struct rtpengine_srtp encrypt;
uint32_t ssrc_out; // Rewrite SSRC
uint32_t ssrc_out[RTPE_NUM_SSRC_TRACKING]; // Rewrite SSRC
unsigned char tos;
};
@ -159,8 +160,8 @@ struct rtpengine_packet_info {
struct rtpengine_stats_info {
struct re_address local; // input
uint32_t ssrc; // output
struct rtpengine_ssrc_stats ssrc_stats; // output
uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // output
struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING]; // output
};
struct rtpengine_noop_info {


Loading…
Cancel
Save