diff --git a/daemon/call.c b/daemon/call.c index 7647d06f2..f5b571b51 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -547,7 +547,7 @@ void call_timer(void *ptr) { GList *i, *l; struct rtpengine_list_entry *ke; struct packet_stream *ps; - int j, update; + int j; struct stream_fd *sfd; struct rtp_stats *rs; unsigned int pt; @@ -639,7 +639,7 @@ void call_timer(void *ptr) { atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); } - update = 0; + bool update = false; if (diff_packets) sfd->call->foreign_media = 0; @@ -656,37 +656,49 @@ void call_timer(void *ptr) { struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx]; mutex_lock(&sink->out_lock); - if (sink->crypto.params.crypto_suite && sink->ssrc_out - && ntohl(ke->target.ssrc) == sink->ssrc_out->parent->h.ssrc - && o->encrypt.last_index - sink->ssrc_out->srtp_index > 0x4000) - { - sink->ssrc_out->srtp_index = o->encrypt.last_index; - update = 1; + for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { + if (!ke->target.ssrc[u]) // end of list + break; + struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]), + sink->ssrc_out, 0); + if (!ctx) + continue; + if (sink->crypto.params.crypto_suite + && o->encrypt.last_index[u] - ctx->srtp_index > 0x4000) + { + ctx->srtp_index = o->encrypt.last_index[u]; + update = true; + } } mutex_unlock(&sink->out_lock); } mutex_lock(&ps->in_lock); - if (ps->ssrc_in && ntohl(ke->target.ssrc) == ps->ssrc_in->parent->h.ssrc) { - atomic64_add(&ps->ssrc_in->octets, diff_bytes); - atomic64_add(&ps->ssrc_in->packets, diff_packets); - atomic64_set(&ps->ssrc_in->last_seq, ke->target.decrypt.last_index); - ps->ssrc_in->srtp_index = ke->target.decrypt.last_index; + for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) { + if (!ke->target.ssrc[u]) // end of list + break; + struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]), + ps->ssrc_in, 0); + if (!ctx) + continue; + atomic64_add(&ctx->octets, diff_bytes); + atomic64_add(&ctx->packets, diff_packets); + atomic64_set(&ctx->last_seq, ke->target.decrypt.last_index[u]); + ctx->srtp_index = ke->target.decrypt.last_index[u]; if (sfd->crypto.params.crypto_suite - && ke->target.decrypt.last_index - - ps->ssrc_in->srtp_index > 0x4000) - update = 1; + && ke->target.decrypt.last_index[u] + - ctx->srtp_index > 0x4000) + update = true; } mutex_unlock(&ps->in_lock); } rwlock_unlock_r(&sfd->call->master_lock); - if (update) { - redis_update_onekey(ps->call, rtpe_redis_write); - } + if (update) + redis_update_onekey(ps->call, rtpe_redis_write); next: g_hash_table_remove(hlp.addr_sfd, &ep); @@ -1062,13 +1074,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) { crypto_reset(&ps->crypto); mutex_lock(&ps->in_lock); - if (ps->ssrc_in) - ps->ssrc_in->srtp_index = 0; + for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) { + if (!ps->ssrc_in[u]) // end of list + break; + ps->ssrc_in[u]->srtp_index = 0; + } mutex_unlock(&ps->in_lock); mutex_lock(&ps->out_lock); - if (ps->ssrc_out) - ps->ssrc_out->srtp_index = 0; + for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) { + if (!ps->ssrc_out[u]) // end of list + break; + ps->ssrc_out[u]->srtp_index = 0; + } mutex_unlock(&ps->out_lock); } @@ -3215,7 +3233,7 @@ void call_destroy(struct call *c) { (unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), FMT_M(addr, ps->endpoint.port), (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - FMT_M(ps->ssrc_in ? ps->ssrc_in->parent->h.ssrc : 0), + FMT_M(ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0), atomic64_get(&ps->stats.packets), atomic64_get(&ps->stats.bytes), atomic64_get(&ps->stats.errors), @@ -3361,8 +3379,10 @@ static void __call_free(void *p) { crypto_cleanup(&ps->crypto); g_queue_clear(&ps->sfds); g_hash_table_destroy(ps->rtp_stats); - ssrc_ctx_put(&ps->ssrc_in); - ssrc_ctx_put(&ps->ssrc_out); + for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) + ssrc_ctx_put(&ps->ssrc_in[u]); + for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) + ssrc_ctx_put(&ps->ssrc_out[u]); g_slice_free1(sizeof(*ps), ps); } diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index a42866711..c48e90f38 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -1672,8 +1672,9 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps BF_PS("media handover", MEDIA_HANDOVER); BF_PS("ICE", ICE); - if (ps->ssrc_in) - bencode_dictionary_add_integer(dict, "SSRC", ps->ssrc_in->parent->h.ssrc); + // XXX convert to list output? + if (ps->ssrc_in[0]) + bencode_dictionary_add_integer(dict, "SSRC", ps->ssrc_in[0]->parent->h.ssrc); stats: if (totals->last_packet < atomic64_get(&ps->last_packet)) diff --git a/daemon/cli.c b/daemon/cli.c index 13568a52d..8185071cd 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -618,7 +618,7 @@ static void cli_incoming_list_callid(str *instr, struct cli_writer *cw) { sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", - ps->ssrc_in ? ps->ssrc_in->parent->h.ssrc : 0, + ps->ssrc_in[0] ? ps->ssrc_in[0]->parent->h.ssrc : 0, atomic64_get(&ps->stats.packets), atomic64_get(&ps->stats.bytes), atomic64_get(&ps->stats.errors), atomic64_get(&ps->last_packet)); diff --git a/daemon/codec.c b/daemon/codec.c index e94ba22dd..628d56c4e 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -807,23 +807,33 @@ static void __rtcp_timer_run(struct codec_timer *ct) { rwlock_lock_r(&rt->call->master_lock); - struct ssrc_ctx *ssrc_out = NULL; + // copy out references to SSRCs for lock-free handling + struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,}; if (media->streams.head) { struct packet_stream *ps = media->streams.head->data; mutex_lock(&ps->out_lock); - ssrc_out = ps->ssrc_out; - if (ssrc_out) - obj_hold(&ssrc_out->parent->h); + for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { + if (!ps->ssrc_out[u]) // end of list + break; + ssrc_out[u] = ps->ssrc_out[u]; + ssrc_ctx_hold(ssrc_out[u]); + } mutex_unlock(&ps->out_lock); } - if (ssrc_out) - rtcp_send_report(media, ssrc_out); + for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { + if (!ssrc_out[u]) // end of list + break; + rtcp_send_report(media, ssrc_out[u]); + } rwlock_unlock_r(&rt->call->master_lock); - if (ssrc_out) - obj_put(&ssrc_out->parent->h); + for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { + if (!ssrc_out[u]) // end of list + break; + ssrc_ctx_put(&ssrc_out[u]); + } out: log_info_clear(); @@ -2174,8 +2184,8 @@ static void __dtx_send_later(struct codec_timer *ct) { struct codec_ssrc_handler *input_ch = (dtxp && dtxp->input_handler) ? obj_get(&dtxp->input_handler->h) : NULL; struct call *call = dtxb->call ? obj_get(dtxb->call) : NULL; - if (!call || !ch || !ps || !ps->ssrc_in - || dtxb->ssrc != ps->ssrc_in->parent->h.ssrc + if (!call || !ch || !ps || !ps->ssrc_in[0] + || dtxb->ssrc != ps->ssrc_in[0]->parent->h.ssrc || dtxb->ct.next.tv_sec == 0) { // shut down or SSRC change ilogs(dtx, LOG_DEBUG, "DTX buffer for %lx has been shut down", (unsigned long) dtxb->ssrc); diff --git a/daemon/dtmf.c b/daemon/dtmf.c index 6cad1edcf..7ef326fb4 100644 --- a/daemon/dtmf.c +++ b/daemon/dtmf.c @@ -314,7 +314,7 @@ const char *dtmf_inject(struct call_media *media, int code, int volume, int dura if (!media->streams.head) return "Media doesn't have an RTP stream"; struct packet_stream *ps = media->streams.head->data; - struct ssrc_ctx *ssrc_in = ps->ssrc_in; + struct ssrc_ctx *ssrc_in = ps->ssrc_in[0]; if (!ssrc_in) return "No SSRC context present for DTMF injection"; // XXX fall back to generating stream diff --git a/daemon/kernel.c b/daemon/kernel.c index 12846e0e6..07c4f56c8 100644 --- a/daemon/kernel.c +++ b/daemon/kernel.c @@ -262,7 +262,7 @@ unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id) return msg.u.stream.stream_idx; } -int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out) { +int kernel_update_stats(const struct re_address *a, struct rtpengine_stats_info *out) { struct rtpengine_message msg; int ret; @@ -278,10 +278,8 @@ int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpeng ilog(LOG_ERROR, "Failed to get stream stats from kernel: %s", strerror(errno)); return -1; } - if (msg.u.stats.ssrc != ssrc) - return -1; - *out = msg.u.stats.ssrc_stats; + *out = msg.u.stats; return 0; } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index d13e87b98..72dcd2d7f 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1066,7 +1066,9 @@ static int __k_null(struct rtpengine_srtp *s, struct packet_stream *stream) { *s = __res_null; return 0; } -static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) { +static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c, + struct ssrc_ctx *ssrc_ctx[RTPE_NUM_SSRC_TRACKING]) +{ if (!c->params.crypto_suite) return -1; @@ -1074,9 +1076,10 @@ static int __k_srtp_crypt(struct rtpengine_srtp *s, struct crypto_context *c, st .cipher = c->params.crypto_suite->kernel_cipher, .hmac = c->params.crypto_suite->kernel_hmac, .mki_len = c->params.mki_len, - .last_index = ssrc_ctx ? ssrc_ctx->srtp_index : 0, .auth_tag_len = c->params.crypto_suite->srtp_auth_tag, }; + for (unsigned int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) + s->last_index[i] = ssrc_ctx[i] ? ssrc_ctx[i]->srtp_index : 0; if (c->params.mki_len) memcpy(s->mki, c->params.mki, c->params.mki_len); memcpy(s->master_key, c->params.master_key, c->params.crypto_suite->master_key_len); @@ -1202,11 +1205,12 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out if (!reti->decrypt.cipher || !reti->decrypt.hmac) return "decryption cipher or HMAC not supported by kernel module"; - if (stream->ssrc_in) { - reti->ssrc = htonl(stream->ssrc_in->parent->h.ssrc); - if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO)) - reti->transcoding = 1; + for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { + if (stream->ssrc_in[u]) + reti->ssrc[u] = htonl(stream->ssrc_in[u]->parent->h.ssrc); } + if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO)) + reti->transcoding = 1; ZERO(stream->kernel_stats); @@ -1279,8 +1283,12 @@ output: __re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint); __re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local); - if (stream->ssrc_in && reti->transcoding) - redi->output.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out); + if (reti->transcoding) { + for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) { + if (stream->ssrc_in[u]) + redi->output.ssrc_out[u] = htonl(stream->ssrc_in[u]->ssrc_map_out); + } + } handler->out->kernel(&redi->output.encrypt, sink); @@ -1376,6 +1384,31 @@ no_kernel: PS_SET(stream, NO_KERNEL_SUPPORT); } +// must be called with appropriate locks (master lock and/or in/out_lock) +int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], + unsigned int start_idx) +{ + for (unsigned int v = 0; v < RTPE_NUM_SSRC_TRACKING; v++) { + // starting point is the same offset as `u` + unsigned int idx = (start_idx + v) % RTPE_NUM_SSRC_TRACKING; + if (!list[idx]) + continue; + if (list[idx]->parent->h.ssrc != ssrc) + continue; + return idx; + } + return -1; +} +// must be called with appropriate locks (master lock and/or in/out_lock) +struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], + unsigned int start_idx) +{ + int idx = __hunt_ssrc_ctx_idx(ssrc, list, start_idx); + if (idx == -1) + return NULL; + return list[idx]; +} + // must be called with appropriate locks (master lock and/or in_lock) static void __stream_update_stats(struct packet_stream *ps, int have_in_lock) { struct re_address local; @@ -1383,57 +1416,52 @@ static void __stream_update_stats(struct packet_stream *ps, int have_in_lock) { if (!have_in_lock) mutex_lock(&ps->in_lock); - struct ssrc_ctx *ssrc_ctx = ps->ssrc_in; - if (!ssrc_ctx) { - if (!have_in_lock) - mutex_unlock(&ps->in_lock); - return; - } - struct ssrc_entry_call *parent = ssrc_ctx->parent; - __re_address_translate_ep(&local, &ps->selected_sfd->socket.local); - struct rtpengine_ssrc_stats stats; - if (kernel_update_stats(&local, htonl(parent->h.ssrc), &stats)) { + struct rtpengine_stats_info stats_info; + if (kernel_update_stats(&local, &stats_info)) { if (!have_in_lock) mutex_unlock(&ps->in_lock); return; } - if (!stats.basic_stats.packets) { - // no change - if (!have_in_lock) - mutex_unlock(&ps->in_lock); - return; - } + for (unsigned int u = 0; u < G_N_ELEMENTS(stats_info.ssrc); u++) { + // check for the right SSRC association + if (!stats_info.ssrc[u]) // end of list + break; + struct ssrc_ctx *ssrc_ctx = __hunt_ssrc_ctx(ntohl(stats_info.ssrc[u]), + ps->ssrc_in, u); + if (!ssrc_ctx) + continue; + struct ssrc_entry_call *parent = ssrc_ctx->parent; - atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets); - atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes); - parent->packets_lost += stats.total_lost; // XXX should be atomic? - atomic64_set(&ssrc_ctx->last_seq, stats.ext_seq); - atomic64_set(&ssrc_ctx->last_ts, stats.timestamp); - parent->jitter = stats.jitter; + if (!stats_info.ssrc_stats[u].basic_stats.packets) // no change + continue; - uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out; + atomic64_add(&ssrc_ctx->packets, stats_info.ssrc_stats[u].basic_stats.packets); + atomic64_add(&ssrc_ctx->octets, stats_info.ssrc_stats[u].basic_stats.bytes); + parent->packets_lost += stats_info.ssrc_stats[u].total_lost; // XXX should be atomic? + atomic64_set(&ssrc_ctx->last_seq, stats_info.ssrc_stats[u].ext_seq); + atomic64_set(&ssrc_ctx->last_ts, stats_info.ssrc_stats[u].timestamp); + parent->jitter = stats_info.ssrc_stats[u].jitter; - if (!have_in_lock) - mutex_unlock(&ps->in_lock); + uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out; - // update opposite outgoing SSRC - if (!have_in_lock) - mutex_lock(&ps->out_lock); - else { + // update opposite outgoing SSRC if (mutex_trylock(&ps->out_lock)) - return; // will have to skip this - } - ssrc_ctx = ps->ssrc_out; - if (ssrc_ctx) { - parent = ssrc_ctx->parent; - if (parent->h.ssrc == ssrc_map_out) { - atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets); - atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes); + continue; // will have to skip this + + ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, ps->ssrc_out, u); + + if (ssrc_ctx) { + parent = ssrc_ctx->parent; + atomic64_add(&ssrc_ctx->packets, stats_info.ssrc_stats[u].basic_stats.packets); + atomic64_add(&ssrc_ctx->octets, stats_info.ssrc_stats[u].basic_stats.bytes); } + mutex_unlock(&ps->out_lock); } - mutex_unlock(&ps->out_lock); + + if (!have_in_lock) + mutex_unlock(&ps->in_lock); } @@ -1611,68 +1639,72 @@ noop: } -// check and update input SSRC pointers -static bool __stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, - struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash) +static bool __stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc, mutex_t *lock, + struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], unsigned int *ctx_idx_p, + uint32_t output_ssrc, + struct ssrc_ctx **output, struct ssrc_hash *ssrc_hash, enum ssrc_dir dir, const char *label) { - uint32_t in_ssrc = ntohl(ssrc_bs); int changed = false; - mutex_lock(&in_srtp->in_lock); - - (*ssrc_in_p) = in_srtp->ssrc_in; - ssrc_ctx_hold(*ssrc_in_p); - if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != in_ssrc)) { - // SSRC mismatch - get the new entry - ssrc_ctx_put(ssrc_in_p); - ssrc_ctx_put(&in_srtp->ssrc_in); - (*ssrc_in_p) = in_srtp->ssrc_in = - get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT, in_srtp->media->monologue); - ssrc_ctx_hold(in_srtp->ssrc_in); + mutex_lock(lock); + + int ctx_idx = __hunt_ssrc_ctx_idx(ssrc, list, 0); + if (ctx_idx == -1) { + // SSRC mismatch - get the new entry: + // move to next slot + ctx_idx = (*ctx_idx_p + 1) % RTPE_NUM_SSRC_TRACKING; + *ctx_idx_p = ctx_idx; + // eject old entry if present + if (list[ctx_idx]) + ssrc_ctx_put(&list[ctx_idx]); + // get new entry + list[ctx_idx] = + get_ssrc_ctx(ssrc, ssrc_hash, dir, ps->media->monologue); changed = true; - ilog(LOG_DEBUG, "Ingress SSRC changed for: %s%s:%d new: %x%s", - FMT_M(sockaddr_print_buf(&in_srtp->endpoint.address), in_srtp->endpoint.port, in_ssrc)); + ilog(LOG_DEBUG, "New %s SSRC for: %s%s:%d SSRC: %x%s", label, + FMT_M(sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port, ssrc)); } + if (ctx_idx != 0) { + // move most recent entry to front of the list + struct ssrc_ctx *tmp = list[0]; + list[0] = list[ctx_idx]; + list[ctx_idx] = tmp; + ctx_idx = 0; + } + + // extract and hold entry (ctx_idx == 0) + *output = list[0]; + ssrc_ctx_hold(*output); - // make sure we reset the output SSRC if we're not transcoding - if (!MEDIA_ISSET(in_srtp->media, TRANSCODE) && !MEDIA_ISSET(in_srtp->media, ECHO)) - (*ssrc_in_p)->ssrc_map_out = in_ssrc; + // reverse SSRC mapping + if (!output_ssrc) { + // make sure we reset the output SSRC if we're not transcoding + if (!MEDIA_ISSET(ps->media, TRANSCODE) && !MEDIA_ISSET(ps->media, ECHO)) + (*output)->ssrc_map_out = ssrc; + } + else { + (*output)->ssrc_map_out = output_ssrc; + } - mutex_unlock(&in_srtp->in_lock); + mutex_unlock(lock); return changed; } +// check and update input SSRC pointers +static bool __stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs, + struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash) +{ + return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in, + &in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress"); +} // check and update output SSRC pointers static bool __stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ssrc_bs, struct ssrc_ctx *ssrc_in, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash) { - uint32_t in_ssrc = ntohl(ssrc_bs); - uint32_t out_ssrc; - bool changed = false; - - out_ssrc = ssrc_in->ssrc_map_out; - mutex_lock(&out_srtp->out_lock); - - (*ssrc_out_p) = out_srtp->ssrc_out; - ssrc_ctx_hold(*ssrc_out_p); - if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != out_ssrc)) { - // SSRC mismatch - get the new entry - ssrc_ctx_put(ssrc_out_p); - ssrc_ctx_put(&out_srtp->ssrc_out); - (*ssrc_out_p) = out_srtp->ssrc_out = - get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT, out_srtp->media->monologue); - ssrc_ctx_hold(out_srtp->ssrc_out); - - changed = 1; - ilog(LOG_DEBUG, "Egress SSRC changed for %s%s:%d new: %x%s", - FMT_M(sockaddr_print_buf(&out_srtp->endpoint.address), out_srtp->endpoint.port, out_ssrc)); - } - - // reverse SSRC mapping - (*ssrc_out_p)->ssrc_map_out = in_ssrc; - - mutex_unlock(&out_srtp->out_lock); - return changed; + return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock, + out_srtp->ssrc_out, + &out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT, + "egress"); } diff --git a/daemon/mqtt.c b/daemon/mqtt.c index 3f3cc50e4..0d4c32666 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -284,10 +284,10 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_add_int_value(json, sfd->socket.local.port); } - if (ps->ssrc_in) { + if (ps->ssrc_in[0]) { json_builder_set_member_name(json, "ingress"); json_builder_begin_object(json); - mqtt_ssrc_stats(ps->ssrc_in, json, ps->media); + mqtt_ssrc_stats(ps->ssrc_in[0], json, ps->media); json_builder_end_object(json); } @@ -295,10 +295,10 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { mutex_lock(&ps->out_lock); - if (ps->ssrc_out) { + if (ps->ssrc_out[0]) { json_builder_set_member_name(json, "egress"); json_builder_begin_object(json); - mqtt_ssrc_stats(ps->ssrc_out, json, ps->media); + mqtt_ssrc_stats(ps->ssrc_out[0], json, ps->media); json_builder_end_object(json); } diff --git a/include/call.h b/include/call.h index 04358dca0..6748003fa 100644 --- a/include/call.h +++ b/include/call.h @@ -28,6 +28,7 @@ #include "statistics.h" #include "codeclib.h" #include "t38.h" +#include "xt_RTPENGINE.h" #define UNDEFINED ((unsigned int) -1) @@ -315,8 +316,10 @@ struct packet_stream { struct endpoint advertised_endpoint; /* RO */ struct endpoint learned_endpoint; /* LOCK: out_lock */ struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ - struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ - *ssrc_out; /* LOCK: out_lock */ + struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */ + *ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */ + unsigned int ssrc_in_idx, // LOCK: in_lock + ssrc_out_idx; // LOCK: out_lock struct send_timer *send_timer; /* RO */ struct jitter_buffer *jb; /* RO */ diff --git a/include/kernel.h b/include/kernel.h index e662d12ae..dbaf3c919 100644 --- a/include/kernel.h +++ b/include/kernel.h @@ -6,6 +6,7 @@ #include #include #include +#include "xt_RTPENGINE.h" @@ -38,7 +39,7 @@ int kernel_add_stream(struct rtpengine_target_info *); int kernel_add_destination(struct rtpengine_destination_info *); int kernel_del_stream(const struct re_address *); GList *kernel_list(void); -int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out); +int kernel_update_stats(const struct re_address *a, struct rtpengine_stats_info *out); unsigned int kernel_add_call(const char *id); int kernel_del_call(unsigned int); diff --git a/include/media_socket.h b/include/media_socket.h index 9a0723969..4648b6c82 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -11,6 +11,7 @@ #include "dtls.h" #include "crypto.h" #include "socket.h" +#include "xt_RTPENGINE.h" @@ -185,6 +186,10 @@ void __stream_unconfirm(struct packet_stream *); void __reset_sink_handlers(struct packet_stream *); void media_update_stats(struct call_media *m); +int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], + unsigned int start_idx); +struct ssrc_ctx *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_ctx *list[RTPE_NUM_SSRC_TRACKING], + unsigned int start_idx); void media_packet_copy(struct media_packet *, const struct media_packet *); void media_packet_release(struct media_packet *); diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index e7d819b3f..f81d6d8b1 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -270,7 +270,7 @@ struct re_crypto_context { unsigned char session_key[32]; unsigned char session_salt[14]; unsigned char session_auth_key[20]; - uint32_t roc; + uint32_t roc[RTPE_NUM_SSRC_TRACKING]; struct crypto_cipher *tfm[2]; struct crypto_shash *shash; struct crypto_aead *aead; @@ -304,7 +304,7 @@ struct rtpengine_target { struct rtpengine_stats_a stats; struct rtpengine_rtp_stats_a rtp_stats[RTPE_NUM_PAYLOAD_TYPES]; spinlock_t ssrc_stats_lock; - struct rtpengine_ssrc_stats ssrc_stats; + struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING]; struct re_crypto_context decrypt; @@ -1427,7 +1427,8 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t } spin_lock_irqsave(&g->decrypt.lock, flags); - opp->target.decrypt.last_index = g->target.decrypt.last_index; + for (i = 0; i < ARRAY_SIZE(opp->target.decrypt.last_index); i++) + opp->target.decrypt.last_index[i] = g->target.decrypt.last_index[i]; spin_unlock_irqrestore(&g->decrypt.lock, flags); _r_lock(&g->outputs_lock, flags); @@ -1588,7 +1589,14 @@ static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context seq_printf(f, "%02x", c->session_auth_key[i]); seq_printf(f, "\n"); - seq_printf(f, " ROC: %u\n", (unsigned int) c->roc); + seq_printf(f, " ROC:"); + for (i = 0; i < ARRAY_SIZE(c->roc); i++) { + if (i == 0) + seq_printf(f, " %u", (unsigned int) c->roc[i]); + else + seq_printf(f, ", %u", (unsigned int) c->roc[i]); + } + seq_printf(f, "\n"); if (s->mki_len) seq_printf(f, " MKI: length %u, %02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x...\n", @@ -1639,8 +1647,18 @@ static int proc_list_show(struct seq_file *f, void *v) { seq_printf(f, " %u bytes replacement payload\n", g->target.payload_types[i].replace_pattern_len); } - if (g->target.ssrc) - seq_printf(f, " SSRC in: %lx\n", (unsigned long) ntohl(g->target.ssrc)); + + seq_printf(f, " SSRC in:"); + for (i = 0; i < ARRAY_SIZE(g->target.ssrc); i++) { + if (!g->target.ssrc[i]) + break; + if (i == 0) + seq_printf(f, " %lx", (unsigned long) ntohl(g->target.ssrc[i])); + else + seq_printf(f, ", %lx", (unsigned long) ntohl(g->target.ssrc[i])); + } + seq_printf(f, "\n"); + proc_list_crypto_print(f, &g->decrypt, &g->target.decrypt, "decryption"); if (g->target.rtcp_mux) seq_printf(f, " option: rtcp-mux\n"); @@ -1662,8 +1680,18 @@ static int proc_list_show(struct seq_file *f, void *v) { seq_printf(f, " output #%u\n", i); proc_list_addr_print(f, "src", &o->output.src_addr); proc_list_addr_print(f, "dst", &o->output.dst_addr); - if (o->output.ssrc_out) - seq_printf(f, " SSRC out: %lx\n", (unsigned long) ntohl(o->output.ssrc_out)); + + seq_printf(f, " SSRC out:"); + for (i = 0; i < ARRAY_SIZE(o->output.ssrc_out); i++) { + if (!o->output.ssrc_out[i]) + break; + if (i == 0) + seq_printf(f, " %lx", (unsigned long) ntohl(o->output.ssrc_out[i])); + else + seq_printf(f, ", %lx", (unsigned long) ntohl(o->output.ssrc_out[i])); + } + seq_printf(f, "\n"); + proc_list_crypto_print(f, &o->encrypt, &o->output.encrypt, "encryption"); } @@ -1753,19 +1781,23 @@ static struct re_dest_addr *find_dest_addr(const struct re_dest_addr_hash *h, co static int table_get_target_stats(struct rtpengine_table *t, struct rtpengine_stats_info *i, int reset) { struct rtpengine_target *g; + unsigned int u; g = get_target(t, &i->local); if (!g) return -ENOENT; - i->ssrc = g->target.ssrc; spin_lock(&g->ssrc_stats_lock); - i->ssrc_stats = g->ssrc_stats; - if (reset) { - g->ssrc_stats.basic_stats.packets = 0; - g->ssrc_stats.basic_stats.bytes = 0; - g->ssrc_stats.total_lost = 0; + for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) { + i->ssrc[u] = g->target.ssrc[u]; + i->ssrc_stats[u] = g->ssrc_stats[u]; + + if (reset) { + g->ssrc_stats[u].basic_stats.packets = 0; + g->ssrc_stats[u].basic_stats.bytes = 0; + g->ssrc_stats[u].total_lost = 0; + } } spin_unlock(&g->ssrc_stats_lock); @@ -2243,6 +2275,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i struct rtpengine_target *og = NULL; int err; unsigned long flags; + unsigned int u; /* validation */ @@ -2276,7 +2309,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i memcpy(&g->target, i, sizeof(*i)); crypto_context_init(&g->decrypt, &g->target.decrypt); spin_lock_init(&g->ssrc_stats_lock); - g->ssrc_stats.lost_bits = -1; + for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) + g->ssrc_stats[u].lost_bits = -1; rwlock_init(&g->outputs_lock); if (i->num_destinations) { @@ -3777,7 +3811,7 @@ error: /* XXX shared code */ static uint64_t packet_index(struct re_crypto_context *c, - struct rtpengine_srtp *s, struct rtp_header *rtp) + struct rtpengine_srtp *s, struct rtp_header *rtp, int ssrc_idx) { uint16_t seq; uint64_t index; @@ -3786,17 +3820,20 @@ static uint64_t packet_index(struct re_crypto_context *c, uint32_t roc; uint32_t v; + if (ssrc_idx == -1) + ssrc_idx = 0; + seq = ntohs(rtp->seq_num); spin_lock_irqsave(&c->lock, flags); /* rfc 3711 section 3.3.1 */ - if (unlikely(!s->last_index)) - s->last_index = seq; + if (unlikely(!s->last_index[ssrc_idx])) + s->last_index[ssrc_idx] = seq; /* rfc 3711 appendix A, modified, and sections 3.3 and 3.3.1 */ - s_l = (s->last_index & 0x00000000ffffULL); - roc = (s->last_index & 0xffffffff0000ULL) >> 16; + s_l = (s->last_index[ssrc_idx] & 0x00000000ffffULL); + roc = (s->last_index[ssrc_idx] & 0xffffffff0000ULL) >> 16; v = 0; if (s_l < 0x8000) { @@ -3812,8 +3849,8 @@ static uint64_t packet_index(struct re_crypto_context *c, } index = (v << 16) | seq; - s->last_index = index; - c->roc = v; + s->last_index[ssrc_idx] = index; + c->roc[ssrc_idx] = v; spin_unlock_irqrestore(&c->lock, flags); @@ -3821,13 +3858,16 @@ static uint64_t packet_index(struct re_crypto_context *c, } static void update_packet_index(struct re_crypto_context *c, - struct rtpengine_srtp *s, uint64_t idx) + struct rtpengine_srtp *s, uint64_t idx, int ssrc_idx) { unsigned long flags; + if (ssrc_idx == -1) + ssrc_idx = 0; + spin_lock_irqsave(&c->lock, flags); - s->last_index = idx; - c->roc = (idx >> 16); + s->last_index[ssrc_idx] = idx; + c->roc[ssrc_idx] = (idx >> 16); spin_unlock_irqrestore(&c->lock, flags); } @@ -3919,7 +3959,7 @@ static int srtp_authenticate(struct re_crypto_context *c, static int srtp_auth_validate(struct re_crypto_context *c, struct rtpengine_srtp *s, struct rtp_parsed *r, - uint64_t *pkt_idx_p) + uint64_t *pkt_idx_p, int ssrc_idx) { unsigned char *auth_tag; unsigned char hmac[20]; @@ -3981,7 +4021,7 @@ static int srtp_auth_validate(struct re_crypto_context *c, ok_update: *pkt_idx_p = pkt_idx; - update_packet_index(c, s, pkt_idx); + update_packet_index(c, s, pkt_idx, ssrc_idx); ok: return 0; } @@ -4257,9 +4297,11 @@ static struct sk_buff *intercept_skb_copy(struct sk_buff *oskb, const struct re_ -static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 arrival_time, int pt_idx) { +static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 arrival_time, int pt_idx, + int ssrc_idx) +{ unsigned long flags; - struct rtpengine_ssrc_stats *s = &g->ssrc_stats; + struct rtpengine_ssrc_stats *s = &g->ssrc_stats[ssrc_idx]; uint16_t old_seq_trunc; uint32_t last_seq; uint16_t seq_diff; @@ -4351,6 +4393,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, int err; int error_nf_action = XT_CONTINUE; int rtp_pt_idx = -2; + int ssrc_idx = -1; unsigned int datalen, pllen; uint32_t *u32; struct rtp_parsed rtp, rtp2; @@ -4453,13 +4496,21 @@ src_check_ok: rtp_pt_idx = rtp_payload_type(rtp.header, &g->target, &g->last_pt); // Pass to userspace if SSRC has changed. - errstr = "SSRC mismatch"; - if (unlikely((g->target.ssrc) && (g->target.ssrc != rtp.header->ssrc))) + // Look for matching SSRC index if any SSRC were given + if (likely(g->target.ssrc[0])) { + errstr = "SSRC mismatch"; + for (ssrc_idx = 0; ssrc_idx < RTPE_NUM_SSRC_TRACKING; ssrc_idx++) { + if (g->target.ssrc[ssrc_idx] == rtp.header->ssrc) + goto found_ssrc; + } + ssrc_idx = -1; goto skip_error; +found_ssrc:; + } - pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header); + pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header, ssrc_idx); errstr = "SRTP authentication tag mismatch"; - if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, &pkt_idx)) + if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, &pkt_idx, ssrc_idx)) goto skip_error; // if RTP, only forward packets of known/passthrough payload types @@ -4472,8 +4523,8 @@ src_check_ok: skb_trim(skb, rtp.header_len + rtp.payload_len); - if (g->target.rtp_stats) - rtp_stats(g, &rtp, ktime_to_us(skb->tstamp), rtp_pt_idx); + if (g->target.rtp_stats && ssrc_idx != -1) + rtp_stats(g, &rtp, ktime_to_us(skb->tstamp), rtp_pt_idx, ssrc_idx); DBG("packet payload decrypted as %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x...\n", rtp.payload[0], rtp.payload[1], rtp.payload[2], rtp.payload[3], @@ -4540,10 +4591,10 @@ no_intercept: if (rtp2.ok) { // SSRC substitution - if (g->target.transcoding && o->output.ssrc_out) - rtp2.header->ssrc = o->output.ssrc_out; + if (g->target.transcoding && o->output.ssrc_out && ssrc_idx != -1) + rtp2.header->ssrc = o->output.ssrc_out[ssrc_idx]; - pkt_idx = packet_index(&o->encrypt, &o->output.encrypt, rtp2.header); + pkt_idx = packet_index(&o->encrypt, &o->output.encrypt, rtp2.header, ssrc_idx); pllen = rtp2.payload_len; srtp_encrypt(&o->encrypt, &o->output.encrypt, &rtp2, pkt_idx); srtp_authenticate(&o->encrypt, &o->output.encrypt, &rtp2, pkt_idx); diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 534e7b0ab..59306e33b 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -5,6 +5,7 @@ #define RTPE_NUM_PAYLOAD_TYPES 32 #define RTPE_MAX_FORWARD_DESTINATIONS 32 +#define RTPE_NUM_SSRC_TRACKING 4 @@ -80,7 +81,7 @@ struct rtpengine_srtp { unsigned int session_key_len; unsigned int session_salt_len; unsigned char mki[256]; /* XXX uses too much memory? */ - uint64_t last_index; + uint64_t last_index[RTPE_NUM_SSRC_TRACKING]; unsigned int auth_tag_len; /* in bytes */ unsigned int mki_len; }; @@ -107,7 +108,7 @@ struct rtpengine_target_info { unsigned int intercept_stream_idx; struct rtpengine_srtp decrypt; - uint32_t ssrc; // Expose the SSRC to userspace when we resync. + uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // Expose the SSRC to userspace when we resync. struct rtpengine_payload_type payload_types[RTPE_NUM_PAYLOAD_TYPES]; /* must be sorted */ unsigned int num_payload_types; @@ -129,7 +130,7 @@ struct rtpengine_output_info { struct re_address dst_addr; struct rtpengine_srtp encrypt; - uint32_t ssrc_out; // Rewrite SSRC + uint32_t ssrc_out[RTPE_NUM_SSRC_TRACKING]; // Rewrite SSRC unsigned char tos; }; @@ -159,8 +160,8 @@ struct rtpengine_packet_info { struct rtpengine_stats_info { struct re_address local; // input - uint32_t ssrc; // output - struct rtpengine_ssrc_stats ssrc_stats; // output + uint32_t ssrc[RTPE_NUM_SSRC_TRACKING]; // output + struct rtpengine_ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING]; // output }; struct rtpengine_noop_info {