diff --git a/daemon/call.c b/daemon/call.c index 8b6097ad2..4840511ee 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1368,13 +1368,14 @@ static void __reset_streams(struct call_media *media) { struct packet_stream *ps = l->data; g_queue_clear_full(&ps->rtp_sinks, free_sink_handler); g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler); + g_queue_clear_full(&ps->rtp_mirrors, free_sink_handler); } } // called once on media A for each sink media B // B can be NULL // XXX this function seems to do two things - stream init (with B NULL) and sink init - split up? static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp, - const struct sdp_ng_flags *flags, bool rtcp_only, bool transcoding) { + const struct sdp_ng_flags *flags, bool rtcp_only, bool transcoding, bool egress) { GList *la, *lb; struct packet_stream *a, *ax, *b; unsigned int port_off = 0; @@ -1395,7 +1396,9 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru // reflect media - pretend reflection also for blackhole, as otherwise // we get SSRC flip-flops on the opposite side // XXX still necessary for blackhole? - if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) + if (egress && b) + __add_sink_handler(&a->rtp_mirrors, b, rtcp_only, transcoding); + else if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) __add_sink_handler(&a->rtp_sinks, a, rtcp_only, transcoding); else if (b) __add_sink_handler(&a->rtp_sinks, b, rtcp_only, transcoding); @@ -1448,6 +1451,9 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru assert(la != NULL); a = la->data; + if (egress) + goto no_rtcp; + if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) { __add_sink_handler(&a->rtcp_sinks, a, rtcp_only, transcoding); if (MEDIA_ISSET(A, RTCP_MUX)) @@ -1488,6 +1494,7 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru if (__init_stream(a)) return -1; +no_rtcp: recording_setup_stream(ax); // RTP recording_setup_stream(a); // RTCP @@ -2507,6 +2514,7 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams GList *sub_medias[ml->subscribers.length]; bool subs_rtcp_only[ml->subscribers.length]; bool subs_tc[ml->subscribers.length]; + bool subs_egress[ml->subscribers.length]; unsigned int num_subs = 0; for (GList *l = ml->subscribers.head; l; l = l->next) { struct call_subscription *cs = l->data; @@ -2517,6 +2525,7 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams sub_medias[num_subs] = sub_medias[num_subs]->next; subs_rtcp_only[num_subs] = cs->rtcp_only ? true : false; subs_tc[num_subs] = cs->transcoding ? true : false; + subs_egress[num_subs] = cs->egress ? true : false; num_subs++; } // keep num_subs as shortcut to ml->subscribers.length @@ -2544,8 +2553,9 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams sub_medias[i] = sub_medias[i]->next; bool rtcp_only = subs_rtcp_only[i]; bool tc = subs_tc[i]; + bool egress = subs_egress[i]; - if (__init_streams(media, sub_media, sp, flags, rtcp_only, tc)) + if (__init_streams(media, sub_media, sp, flags, rtcp_only, tc, egress)) ilog(LOG_WARN, "Error initialising streams"); } @@ -2968,7 +2978,7 @@ static void __unsubscribe_from_all(struct call_monologue *ml) { } } void __add_subscription(struct call_monologue *which, struct call_monologue *to, bool offer_answer, - unsigned int offset, bool rtcp_only) + unsigned int offset, bool rtcp_only, bool egress) { if (g_hash_table_lookup(which->subscriptions_ht, to)) { ilog(LOG_DEBUG, "Tag '" STR_FORMAT_M "' is already subscribed to '" STR_FORMAT_M "'", @@ -3002,14 +3012,16 @@ void __add_subscription(struct call_monologue *which, struct call_monologue *to, } which_cs->offer_answer = offer_answer ? 1 : 0; to_rev_cs->offer_answer = which_cs->offer_answer; + which_cs->egress = egress ? 1 : 0; + to_rev_cs->egress = which_cs->egress; g_hash_table_insert(which->subscriptions_ht, to, to_rev_cs->link); g_hash_table_insert(to->subscribers_ht, which, which_cs->link); } static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct call_monologue *b) { __unsubscribe_all_offer_answer_subscribers(a); __unsubscribe_all_offer_answer_subscribers(b); - __add_subscription(a, b, true, 0, false); - __add_subscription(b, a, true, 0, false); + __add_subscription(a, b, true, 0, false, false); + __add_subscription(b, a, true, 0, false, false); } @@ -3068,7 +3080,7 @@ int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_ __assign_stream_fds(media, &em->intf_sfds); // XXX this should be covered by __update_init_subscribers ? - if (__init_streams(media, NULL, sp, flags, false, false)) + if (__init_streams(media, NULL, sp, flags, false, false, false)) return -1; __ice_start(media); ice_update(media->ice_agent, sp, false); @@ -3133,13 +3145,13 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca __num_media_streams(dst_media, num_ports); __assign_stream_fds(dst_media, &em->intf_sfds); - if (__init_streams(dst_media, NULL, NULL, flags, false, false)) + if (__init_streams(dst_media, NULL, NULL, flags, false, false, false)) return -1; } - __add_subscription(dst_ml, src_ml, false, idx_diff, false); + __add_subscription(dst_ml, src_ml, false, idx_diff, false, flags->egress ? true : false); if (flags->rtcp_mirror) - __add_subscription(src_ml, dst_ml, false, rev_idx_diff, true); + __add_subscription(src_ml, dst_ml, false, rev_idx_diff, true, flags->egress ? true : false); __update_init_subscribers(src_ml, NULL, NULL, flags->opmode); __update_init_subscribers(dst_ml, NULL, NULL, flags->opmode); @@ -3218,7 +3230,7 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, struct sdp_ng_flag __dtls_logic(flags, dst_media, sp); - if (__init_streams(dst_media, NULL, sp, flags, false, false)) + if (__init_streams(dst_media, NULL, sp, flags, false, false, false)) return -1; MEDIA_CLEAR(dst_media, RECV); @@ -3351,6 +3363,7 @@ static void __call_cleanup(struct call *c) { g_queue_clear_full(&ps->rtp_sinks, free_sink_handler); g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler); + g_queue_clear_full(&ps->rtp_mirrors, free_sink_handler); } for (GList *l = c->medias.head; l; l = l->next) { diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 40a34202c..868845888 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -795,6 +795,9 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { case CSH_LOOKUP("all"): out->all = 1; break; + case CSH_LOOKUP("egress"): + out->egress = 1; + break; case CSH_LOOKUP("SIPREC"): case CSH_LOOKUP("siprec"): out->siprec = 1; diff --git a/daemon/codec.c b/daemon/codec.c index de4f73164..604b99887 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -2086,6 +2086,25 @@ void codec_packet_free(void *pp) { ssrc_ctx_put(&p->ssrc_out); g_slice_free1(sizeof(*p), p); } +bool codec_packet_copy(struct codec_packet *p) { + char *buf = malloc(p->s.len + RTP_BUFFER_TAIL_ROOM); + if (!buf) + return false; + memcpy(buf, p->s.s, p->s.len); + p->s.s = buf; + p->free_func = free; + return true; +} +struct codec_packet *codec_packet_dup(struct codec_packet *p) { + struct codec_packet *dup = g_slice_alloc0(sizeof(*p)); + *dup = *p; + codec_packet_copy(dup); + if (dup->ssrc_out) + ssrc_ctx_hold(dup->ssrc_out); + if (dup->rtp) + dup->rtp = (void *) dup->s.s; + return dup; +} diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 77e8c5aee..e409549b0 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2239,12 +2239,8 @@ static int media_packet_queue_dup(GQueue *q) { struct codec_packet *p = l->data; if (p->free_func) // nothing to do, already private continue; - char *buf = malloc(p->s.len + RTP_BUFFER_TAIL_ROOM); - if (!buf) + if (!codec_packet_copy(p)) return -1; - memcpy(buf, p->s.s, p->s.len); - p->s.s = buf; - p->free_func = free; } return 0; } @@ -2486,6 +2482,52 @@ static int stream_packet(struct packet_handler_ctx *phc) { goto err_next; } + // egress mirroring + + if (!phc->rtcp) { + for (GList *mirror_link = phc->mp.stream->rtp_mirrors.head; mirror_link; + mirror_link = mirror_link->next) + { + struct packet_handler_ctx mirror_phc = *phc; + mirror_phc.mp.ssrc_out = NULL; + g_queue_init(&mirror_phc.mp.packets_out); + + struct sink_handler *mirror_sh = mirror_link->data; + struct packet_stream *mirror_sink = mirror_sh->sink; + + media_packet_rtcp_mux(&mirror_phc, mirror_sh); + media_packet_rtp_out(&mirror_phc, mirror_sh); + media_packet_set_encrypt(&mirror_phc, mirror_sh); + + for (GList *pack = phc->mp.packets_out.head; pack; pack = pack->next) { + struct codec_packet *p = pack->data; + g_queue_push_tail(&mirror_phc.mp.packets_out, codec_packet_dup(p)); + } + + ret = __media_packet_encrypt(&mirror_phc); + if (ret) + goto next_mirror; + + mutex_lock(&mirror_sink->out_lock); + + if (!mirror_sink->advertised_endpoint.port + || (is_addr_unspecified(&mirror_sink->advertised_endpoint.address) + && !is_trickle_ice_address(&mirror_sink->advertised_endpoint))) + { + mutex_unlock(&mirror_sink->out_lock); + goto next; + } + + ret = media_socket_dequeue(&mirror_phc.mp, mirror_sink); + + mutex_unlock(&mirror_sink->out_lock); + +next_mirror: + media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left + ssrc_ctx_put(&mirror_phc.mp.ssrc_out); + } + } + ret = __media_packet_encrypt(phc); errno = ENOTTY; if (ret) diff --git a/daemon/redis.c b/daemon/redis.c index e655b0c5e..583eeb6ea 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1587,13 +1587,17 @@ static int rbl_subs_cb(str *s, GQueue *q, struct redis_list *list, void *ptr) { unsigned int media_offset = 0; bool offer_answer = false; bool rtcp_only = false; + bool egress = false; if (!str_token_sep(&token, s, '/')) { media_offset = str_to_i(&token, 0); if (!str_token_sep(&token, s, '/')) { offer_answer = str_to_i(&token, 0) ? true : false; - if (!str_token_sep(&token, s, '/')) + if (!str_token_sep(&token, s, '/')) { rtcp_only = str_to_i(&token, 0) ? true : false; + if (!str_token_sep(&token, s, '/')) + egress = str_to_i(&token, 0) ? true : false; + } } } @@ -1602,7 +1606,7 @@ static int rbl_subs_cb(str *s, GQueue *q, struct redis_list *list, void *ptr) { if (!other_ml) return -1; - __add_subscription(ml, other_ml, offer_answer, media_offset, rtcp_only); + __add_subscription(ml, other_ml, offer_answer, media_offset, rtcp_only, egress); return 0; } @@ -1623,7 +1627,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ other_ml = l->data; if (!other_ml) return -1; - __add_subscription(ml, other_ml, true, 0, false); + __add_subscription(ml, other_ml, true, 0, false, false); } g_queue_clear(&q); @@ -1633,7 +1637,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ other_ml = l->data; if (!other_ml) return -1; - __add_subscription(ml, other_ml, false, 0, false); + __add_subscription(ml, other_ml, false, 0, false, false); } g_queue_clear(&q); } @@ -1647,7 +1651,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ if (!ml->subscriptions.length) { other_ml = redis_list_get_ptr(tags, &tags->rh[i], "active"); if (other_ml) - __add_subscription(ml, other_ml, true, 0, false); + __add_subscription(ml, other_ml, true, 0, false, false); } if (json_build_list(&q, c, "other_tags", i, tags, root_reader)) @@ -1717,6 +1721,8 @@ static int json_link_streams(struct call *c, struct redis_list *streams, if (!sink) return -1; struct call_subscription *cs = __find_subscriber(ps_ml, sink); + if (cs->egress) + continue; __add_sink_handler(&ps->rtp_sinks, sink, cs ? cs->rtcp_only : false, false); } g_queue_clear(&q); @@ -2483,11 +2489,12 @@ char* redis_encode_json(struct call *c) { json_builder_begin_array(builder); for (k = ml->subscriptions.head; k; k = k->next) { struct call_subscription *cs = k->data; - JSON_ADD_STRING("%u/%u/%u/%u", + JSON_ADD_STRING("%u/%u/%u/%u/%u", cs->monologue->unique_id, cs->media_offset, cs->offer_answer, - cs->rtcp_only); + cs->rtcp_only, + cs->egress); } json_builder_end_array(builder); } diff --git a/include/call.h b/include/call.h index 3fe60a8b8..a2d22c679 100644 --- a/include/call.h +++ b/include/call.h @@ -328,6 +328,7 @@ struct packet_stream { GQueue rtp_sinks; // LOCK: call->master_lock, in_lock for streamhandler GQueue rtcp_sinks; // LOCK: call->master_lock, in_lock for streamhandler struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */ + GQueue rtp_mirrors; // LOCK: call->master_lock, in_lock for streamhandler struct endpoint endpoint; /* LOCK: out_lock */ struct endpoint detected_endpoints[4]; /* LOCK: out_lock */ struct timeval ep_detect_signal; /* LOCK: out_lock */ @@ -431,6 +432,7 @@ struct call_subscription { unsigned int offer_answer:1; // bidirectional, exclusive unsigned int rtcp_only:1; unsigned int transcoding:1; + unsigned int egress:1; }; /* half a dialogue */ @@ -643,7 +645,7 @@ void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); void __add_subscription(struct call_monologue *ml, struct call_monologue *other, bool offer_answer, - unsigned int media_offset, bool rtcp_only); + unsigned int media_offset, bool rtcp_only, bool egress); void free_sink_handler(void *); void __add_sink_handler(GQueue *, struct packet_stream *, bool rtcp_only, bool transcoding); diff --git a/include/call_interfaces.h b/include/call_interfaces.h index e789fde7e..e531c6ee1 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -123,6 +123,7 @@ struct sdp_ng_flags { osrtp_offer:1, reset:1, all:1, + egress:1, siprec:1, fragment:1, record_call:1, diff --git a/include/codec.h b/include/codec.h index 6a8d379a9..ffe6f612c 100644 --- a/include/codec.h +++ b/include/codec.h @@ -112,6 +112,8 @@ bool codec_store_is_full_answer(const struct codec_store *src, const struct code void codec_add_raw_packet(struct media_packet *mp, unsigned int clockrate); void codec_packet_free(void *); +struct codec_packet *codec_packet_dup(struct codec_packet *p); +bool codec_packet_copy(struct codec_packet *p); void payload_type_free(struct rtp_payload_type *p); struct rtp_payload_type *rtp_payload_type_dup(const struct rtp_payload_type *pt); diff --git a/t/auto-daemon-tests-pubsub.pl b/t/auto-daemon-tests-pubsub.pl index 6d100ec64..1d1f89b9f 100755 --- a/t/auto-daemon-tests-pubsub.pl +++ b/t/auto-daemon-tests-pubsub.pl @@ -27,6 +27,206 @@ use_json(1); +($sock_a, $sock_b, $sock_c) = + new_call( + [qw(198.51.100.17 6146)], + [qw(198.51.100.17 6148)], + [qw(198.51.100.17 6150)], + ); + +($port_a) = offer('egress sub', + { }, < ft(), flags => ['egress'] }, < $ttr }, < {transcode => ['PCMA']} }, < ft(), flags => ['egress'] }, < $ttr }, < \$options{'all'}, 'siprec' => \$options{'SIPREC'}, 'SIPREC' => \$options{'SIPREC'}, + 'egress' => \$options{'egress'}, 'address=s' => \$options{'address'}, 'pad-crypto' => \$options{'pad crypto'}, 'generate-mid' => \$options{'generate mid'}, @@ -137,7 +138,7 @@ for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address, for my $x (split(/,/, 'TOS,delete-delay,delay-buffer,volume,frequency,trigger-end-time,trigger-end-digits,DTMF-delay')) { defined($options{$x}) and $packet{$x} = $options{$x}; } -for my $x (split(/,/, 'trust address,symmetric,asymmetric,unidirectional,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,full rtcp attribute,loop protect,record call,always transcode,all,SIPREC,pad crypto,generate mid,fragment,original sendrecv,symmetric codecs,asymmetric codecs,inject DTMF,detect DTMF,generate RTCP,single codec,no codec renegotiation,pierce NAT,SIP-source-address,allow transcoding,trickle ICE,reject ICE')) { +for my $x (split(/,/, 'trust address,symmetric,asymmetric,unidirectional,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,full rtcp attribute,loop protect,record call,always transcode,all,SIPREC,pad crypto,generate mid,fragment,original sendrecv,symmetric codecs,asymmetric codecs,inject DTMF,detect DTMF,generate RTCP,single codec,no codec renegotiation,pierce NAT,SIP-source-address,allow transcoding,trickle ICE,reject ICE,egress')) { defined($options{$x}) and push(@{$packet{flags}}, $x); } for my $x (split(/,/, 'origin,session connection,sdp version,username,session-name,zero-address')) {