diff --git a/README.md b/README.md index 841fa92fe..c9851a961 100644 --- a/README.md +++ b/README.md @@ -866,6 +866,15 @@ Optionally included keys are: Identical to setting `generate RTCP = on`. + - `RTCP mirror` + + Useful only for `subscribe request` message. Instructs + *rtpengine* to not only create a one-way subscription for both + RTP and RTCP from the source to the sink, but also create a + reverse subscription for RTCP only from the sink back to the + source. This makes it possible for the media source to receive + feedback from all media receivers (sinks). + - `debug` or `debugging` Enabled full debug logging for this call, regardless of global log level settings. diff --git a/daemon/call.c b/daemon/call.c index 352a23d99..744b950de 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1353,10 +1353,11 @@ void free_sink_handler(void *p) { struct sink_handler *sh = p; g_slice_free1(sizeof(*sh), sh); } -void __add_sink_handler(GQueue *q, struct packet_stream *sink) { +void __add_sink_handler(GQueue *q, struct packet_stream *sink, bool rtcp_only) { struct sink_handler *sh = g_slice_alloc0(sizeof(*sh)); sh->sink = sink; sh->kernel_output_idx = -1; + sh->rtcp_only = rtcp_only ? 1 : 0; g_queue_push_tail(q, sh); } @@ -1372,7 +1373,7 @@ static void __reset_streams(struct call_media *media) { // B can be NULL // XXX this function seems to do two things - stream init (with B NULL) and sink init - split up? static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp, - const struct sdp_ng_flags *flags) { + const struct sdp_ng_flags *flags, bool rtcp_only) { GList *la, *lb; struct packet_stream *a, *ax, *b; unsigned int port_off = 0; @@ -1394,9 +1395,9 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru // we get SSRC flip-flops on the opposite side // XXX still necessary for blackhole? if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) - __add_sink_handler(&a->rtp_sinks, a); + __add_sink_handler(&a->rtp_sinks, a, rtcp_only); else if (b) - __add_sink_handler(&a->rtp_sinks, b); + __add_sink_handler(&a->rtp_sinks, b, rtcp_only); PS_SET(a, RTP); /* XXX technically not correct, could be udptl too */ __rtp_stats_update(a->rtp_stats, &A->codecs); @@ -1433,7 +1434,7 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) { /* RTCP sink handler added below */ } else if (b) - __add_sink_handler(&a->rtcp_sinks, b); + __add_sink_handler(&a->rtcp_sinks, b, rtcp_only); PS_SET(a, RTCP); PS_CLEAR(a, IMPLICIT_RTCP); } @@ -1447,12 +1448,12 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru a = la->data; if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) { - __add_sink_handler(&a->rtcp_sinks, a); + __add_sink_handler(&a->rtcp_sinks, a, rtcp_only); if (MEDIA_ISSET(A, RTCP_MUX)) - __add_sink_handler(&ax->rtcp_sinks, a); + __add_sink_handler(&ax->rtcp_sinks, a, rtcp_only); } else if (b) - __add_sink_handler(&a->rtcp_sinks, b); + __add_sink_handler(&a->rtcp_sinks, b, rtcp_only); PS_CLEAR(a, RTP); PS_SET(a, RTCP); a->rtcp_sibling = NULL; @@ -2484,6 +2485,7 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams // create media iterators for all subscribers GList *sub_medias[ml->subscribers.length]; + bool subs_rtcp_only[ml->subscribers.length]; unsigned int num_subs = 0; for (GList *l = ml->subscribers.head; l; l = l->next) { struct call_subscription *cs = l->data; @@ -2492,6 +2494,7 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams // skip into correct media section for multi-ml subscriptions for (unsigned int offset = cs->media_offset; offset && sub_medias[num_subs]; offset--) sub_medias[num_subs] = sub_medias[num_subs]->next; + subs_rtcp_only[num_subs] = cs->rtcp_only ? true : false; num_subs++; } // keep num_subs as shortcut to ml->subscribers.length @@ -2517,8 +2520,9 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams struct call_media *sub_media = sub_medias[i]->data; sub_medias[i] = sub_medias[i]->next; + bool rtcp_only = subs_rtcp_only[i]; - if (__init_streams(media, sub_media, sp, flags)) + if (__init_streams(media, sub_media, sp, flags, rtcp_only)) ilog(LOG_WARN, "Error initialising streams"); } @@ -2939,7 +2943,7 @@ static void __unsubscribe_from_all(struct call_monologue *ml) { } } void __add_subscription(struct call_monologue *which, struct call_monologue *to, bool offer_answer, - unsigned int offset) + unsigned int offset, bool rtcp_only) { if (g_hash_table_lookup(which->subscriptions_ht, to)) { ilog(LOG_DEBUG, "Tag '" STR_FORMAT_M "' is already subscribed to '" STR_FORMAT_M "'", @@ -2956,6 +2960,8 @@ void __add_subscription(struct call_monologue *which, struct call_monologue *to, to_rev_cs->monologue = which; which_cs->media_offset = offset; to_rev_cs->media_offset = offset; + which_cs->rtcp_only = rtcp_only ? 1 : 0; + to_rev_cs->rtcp_only = rtcp_only ? 1 : 0; // keep offer-answer subscriptions first in the list if (!offer_answer) { g_queue_push_tail(&which->subscriptions, which_cs); @@ -2977,8 +2983,8 @@ void __add_subscription(struct call_monologue *which, struct call_monologue *to, static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct call_monologue *b) { __unsubscribe_all_offer_answer_subscribers(a); __unsubscribe_all_offer_answer_subscribers(b); - __add_subscription(a, b, true, 0); - __add_subscription(b, a, true, 0); + __add_subscription(a, b, true, 0, false); + __add_subscription(b, a, true, 0, false); } @@ -3037,7 +3043,7 @@ int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_ __assign_stream_fds(media, &em->intf_sfds); // XXX this should be covered by __update_init_subscribers ? - if (__init_streams(media, NULL, sp, flags)) + if (__init_streams(media, NULL, sp, flags, false)) return -1; __ice_start(media); ice_update(media->ice_agent, sp, false); @@ -3050,7 +3056,7 @@ int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct call_monologue *dst_ml, struct sdp_ng_flags *flags, GList **src_media_it, GList **dst_media_it, unsigned int *index) { - unsigned int idx_diff = 0; + unsigned int idx_diff = 0, rev_idx_diff = 0; for (GList *l = src_ml->last_in_sdp_streams.head; l; l = l->next) { struct stream_params *sp = l->data; @@ -3061,6 +3067,8 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca // track media index difference if one ml is subscribed to multiple other mls if (idx_diff == 0 && dst_media->index > src_media->index) idx_diff = dst_media->index - src_media->index; + if (rev_idx_diff == 0 && src_media->index > dst_media->index) + rev_idx_diff = src_media->index - dst_media->index; if (__media_init_from_flags(src_media, dst_media, sp, flags) == 1) continue; @@ -3100,11 +3108,13 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca __num_media_streams(dst_media, num_ports); __assign_stream_fds(dst_media, &em->intf_sfds); - if (__init_streams(dst_media, NULL, NULL, flags)) + if (__init_streams(dst_media, NULL, NULL, flags, false)) return -1; } - __add_subscription(dst_ml, src_ml, false, idx_diff); + __add_subscription(dst_ml, src_ml, false, idx_diff, false); + if (flags->rtcp_mirror) + __add_subscription(src_ml, dst_ml, false, rev_idx_diff, true); __update_init_subscribers(src_ml, NULL, NULL, flags->opmode); __update_init_subscribers(dst_ml, NULL, NULL, flags->opmode); @@ -3180,7 +3190,7 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, struct sdp_ng_flag __dtls_logic(flags, dst_media, sp); - if (__init_streams(dst_media, NULL, sp, flags)) + if (__init_streams(dst_media, NULL, sp, flags, false)) return -1; MEDIA_CLEAR(dst_media, RECV); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 4ddea33b9..40a34202c 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -904,6 +904,12 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { case CSH_LOOKUP("nat-wait"): out->nat_wait = 1; break; + case CSH_LOOKUP("RTCP-mirror"): + case CSH_LOOKUP("mirror-RTCP"): + case CSH_LOOKUP("rtcp-mirror"): + case CSH_LOOKUP("mirror-rtcp"): + out->rtcp_mirror = 1; + break; default: // handle values aliases from other dictionaries if (call_ng_flags_prefix(out, s, "from-tags-", call_ng_flags_esc_str_list, diff --git a/daemon/janus.c b/daemon/janus.c index 367b117c9..dc7421dee 100644 --- a/daemon/janus.c +++ b/daemon/janus.c @@ -430,6 +430,7 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan flags.rtcp_mux_require = 1; flags.no_rtcp_attr = 1; flags.sdes_off = 1; + flags.rtcp_mirror = 1; AUTO_CLEANUP(GQueue srcs, call_subscriptions_clear) = G_QUEUE_INIT; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 857c9c95b..487e4fe2f 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1294,6 +1294,8 @@ output: handler->out->kernel(&redi->output.encrypt, sink); + redi->output.rtcp_only = sink_handler ? (sink_handler->rtcp_only ? 1 : 0) : 0; + mutex_unlock(&sink->out_lock); if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) { @@ -2426,6 +2428,10 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (phc->rtcp_discard) goto next; } + else { + if (sh->rtcp_only) + goto next; + } if (PS_ISSET(sh->sink, NAT_WAIT) && !PS_ISSET(sh->sink, RECEIVED)) { ilog(LOG_DEBUG | LOG_FLAG_LIMIT, diff --git a/daemon/redis.c b/daemon/redis.c index dd803e509..6c80c2ff9 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1586,11 +1586,14 @@ static int rbl_subs_cb(str *s, GQueue *q, struct redis_list *list, void *ptr) { unsigned int media_offset = 0; bool offer_answer = false; + bool rtcp_only = false; if (!str_token_sep(&token, s, '/')) { media_offset = str_to_i(&token, 0); if (!str_token_sep(&token, s, '/')) { offer_answer = str_to_i(&token, 0) ? true : false; + if (!str_token_sep(&token, s, '/')) + rtcp_only = str_to_i(&token, 0) ? true : false; } } @@ -1599,7 +1602,7 @@ static int rbl_subs_cb(str *s, GQueue *q, struct redis_list *list, void *ptr) { if (!other_ml) return -1; - __add_subscription(ml, other_ml, offer_answer, media_offset); + __add_subscription(ml, other_ml, offer_answer, media_offset, rtcp_only); return 0; } @@ -1620,7 +1623,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ other_ml = l->data; if (!other_ml) return -1; - __add_subscription(ml, other_ml, true, 0); + __add_subscription(ml, other_ml, true, 0, false); } g_queue_clear(&q); @@ -1630,7 +1633,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ other_ml = l->data; if (!other_ml) return -1; - __add_subscription(ml, other_ml, false, 0); + __add_subscription(ml, other_ml, false, 0, false); } g_queue_clear(&q); } @@ -1644,7 +1647,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ if (!ml->subscriptions.length) { other_ml = redis_list_get_ptr(tags, &tags->rh[i], "active"); if (other_ml) - __add_subscription(ml, other_ml, true, 0); + __add_subscription(ml, other_ml, true, 0, false); } if (json_build_list(&q, c, "other_tags", i, tags, root_reader)) @@ -1674,6 +1677,20 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ return 0; } +static struct call_subscription *__find_subscriber(struct call_monologue *ml, struct packet_stream *sink) { + if (!ml || !sink || !sink->media) + return NULL; + struct call_monologue *find_ml = sink->media->monologue; + + for (GList *l = ml->subscribers.head; l; l = l->next) { + struct call_subscription *cs = l->data; + struct call_monologue *sub_ml = cs->monologue; + if (find_ml == sub_ml) + return cs; + } + return NULL; +} + static int json_link_streams(struct call *c, struct redis_list *streams, struct redis_list *sfds, struct redis_list *medias, JsonReader *root_reader) { @@ -1684,6 +1701,7 @@ static int json_link_streams(struct call *c, struct redis_list *streams, for (i = 0; i < streams->len; i++) { ps = streams->ptrs[i]; + struct call_monologue *ps_ml = ps->media ? ps->media->monologue : NULL; ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); @@ -1698,7 +1716,8 @@ static int json_link_streams(struct call *c, struct redis_list *streams, struct packet_stream *sink = l->data; if (!sink) return -1; - __add_sink_handler(&ps->rtp_sinks, sink); + struct call_subscription *cs = __find_subscriber(ps_ml, sink); + __add_sink_handler(&ps->rtp_sinks, sink, cs ? cs->rtcp_only : false); } g_queue_clear(&q); @@ -1706,7 +1725,7 @@ static int json_link_streams(struct call *c, struct redis_list *streams, if (!ps->rtp_sinks.length) { struct packet_stream *sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); if (sink) - __add_sink_handler(&ps->rtp_sinks, sink); + __add_sink_handler(&ps->rtp_sinks, sink, false); } if (json_build_list(&q, c, "rtcp_sinks", i, streams, root_reader)) @@ -1715,7 +1734,7 @@ static int json_link_streams(struct call *c, struct redis_list *streams, struct packet_stream *sink = l->data; if (!sink) return -1; - __add_sink_handler(&ps->rtcp_sinks, sink); + __add_sink_handler(&ps->rtcp_sinks, sink, false); } g_queue_clear(&q); @@ -1723,7 +1742,7 @@ static int json_link_streams(struct call *c, struct redis_list *streams, if (!ps->rtcp_sinks.length) { struct packet_stream *sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); if (sink) - __add_sink_handler(&ps->rtcp_sinks, sink); + __add_sink_handler(&ps->rtcp_sinks, sink, false); } if (ps->media) @@ -2464,10 +2483,11 @@ char* redis_encode_json(struct call *c) { json_builder_begin_array(builder); for (k = ml->subscriptions.head; k; k = k->next) { struct call_subscription *cs = k->data; - JSON_ADD_STRING("%u/%u/%u", + JSON_ADD_STRING("%u/%u/%u/%u", cs->monologue->unique_id, cs->media_offset, - cs->offer_answer); + cs->offer_answer, + cs->rtcp_only); } json_builder_end_array(builder); } diff --git a/include/call.h b/include/call.h index abccb910c..2d6b715a8 100644 --- a/include/call.h +++ b/include/call.h @@ -429,6 +429,7 @@ struct call_subscription { GList *link; // link into the corresponding opposite list unsigned int media_offset; // 0 if media indexes match up unsigned int offer_answer:1; // bidirectional, exclusive + unsigned int rtcp_only:1; }; /* half a dialogue */ @@ -641,9 +642,9 @@ void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); void __add_subscription(struct call_monologue *ml, struct call_monologue *other, bool offer_answer, - unsigned int media_offset); + unsigned int media_offset, bool rtcp_only); void free_sink_handler(void *); -void __add_sink_handler(GQueue *, struct packet_stream *); +void __add_sink_handler(GQueue *, struct packet_stream *, bool rtcp_only); void call_subscription_free(void *); void call_subscriptions_clear(GQueue *q); diff --git a/include/call_interfaces.h b/include/call_interfaces.h index ca4d5dec1..e789fde7e 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -107,6 +107,7 @@ struct sdp_ng_flags { rtcp_mux_accept:1, rtcp_mux_reject:1, ice_reject:1, + rtcp_mirror:1, trickle_ice:1, no_rtcp_attr:1, full_rtcp_attr:1, diff --git a/include/media_socket.h b/include/media_socket.h index dd27f0046..7d811685e 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -131,6 +131,7 @@ struct sink_handler { struct packet_stream *sink; const struct streamhandler *handler; int kernel_output_idx; + unsigned int rtcp_only:1; }; struct media_packet { str raw; diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index 18ca6520b..c3fef1b1c 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -1699,6 +1699,8 @@ static int proc_list_show(struct seq_file *f, void *v) { (unsigned long) ntohl(o->output.ssrc_out[j])); } seq_printf(f, "\n"); + if (o->output.rtcp_only) + seq_printf(f, " option: RTCP only\n"); proc_list_crypto_print(f, &o->encrypt, &o->output.encrypt, "encryption"); } diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 59306e33b..30b46aed4 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -133,6 +133,7 @@ struct rtpengine_output_info { uint32_t ssrc_out[RTPE_NUM_SSRC_TRACKING]; // Rewrite SSRC unsigned char tos; + unsigned int rtcp_only:1; }; struct rtpengine_destination_info { diff --git a/t/auto-daemon-tests-pubsub.pl b/t/auto-daemon-tests-pubsub.pl index cd25c23da..7751bfdea 100755 --- a/t/auto-daemon-tests-pubsub.pl +++ b/t/auto-daemon-tests-pubsub.pl @@ -16,7 +16,7 @@ autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 my ($sock_a, $sock_b, $sock_c, $sock_d, $port_a, $port_b, $port_c, $ssrc_a, $ssrc_b, $resp, - $sock_ax, $sock_bx, $port_ax, $port_bx, $port_d, $sock_e, $port_e, + $sock_ax, $sock_bx, $port_ax, $port_bx, $port_d, $sock_e, $port_e, $sock_cx, $port_cx, $srtp_ctx_a, $srtp_ctx_b, $srtp_ctx_a_rev, $srtp_ctx_b_rev, $ufrag_a, $ufrag_b, @ret1, @ret2, @ret3, @ret4, $srtp_key_a, $srtp_key_b, $ts, $seq, $tag_medias, $media_labels, $ftr, $ttr, $fts, $ttr2); @@ -26,6 +26,260 @@ my ($sock_a, $sock_b, $sock_c, $sock_d, $port_a, $port_b, $port_c, $ssrc_a, $ssr use_json(1); + +($sock_a, $sock_ax, $sock_b, $sock_bx, $sock_c, $sock_cx) = + new_call( + [qw(198.51.100.17 6000)], + [qw(198.51.100.17 6001)], + [qw(198.51.100.17 6002)], + [qw(198.51.100.17 6003)], + [qw(198.51.100.17 6004)], + [qw(198.51.100.17 6005)], + ); + +($port_a, $port_ax) = offer('simple sub, no RTCP mirror', + { }, < ft() }, < $ttr }, < ft(), flags => ['mirror RTCP'] }, < $ttr }, <