From f04332915bb1baeea9846e50e060a9924c5808fd Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 12 Feb 2021 10:01:40 -0500 Subject: [PATCH] TT#91151 add publish/subscribe commands Change-Id: I1842b89efea7fa3af0bd4d045e49da31285cd0e1 --- README.md | 102 +++++- daemon/call.c | 683 ++++++++++++++++++++++++++------------ daemon/call_interfaces.c | 185 ++++++++++- daemon/codec.c | 15 + daemon/control_ng.c | 19 ++ daemon/sdp.c | 3 +- include/call.h | 9 + include/call_interfaces.h | 9 +- include/codec.h | 2 + include/control_ng.h | 4 + lib/auxlib.h | 3 + utils/rtpengine-ng-client | 18 +- 12 files changed, 829 insertions(+), 223 deletions(-) diff --git a/README.md b/README.md index aef25a1be..d9799e6f9 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,8 @@ the following additional features are available: - Playback of pre-recorded streams/announcements - Transcoding between T.38 and PCM (G.711 or other audio codecs) - Silence detection and comfort noise (RFC 3389) payloads +* Media forking +* Publish/subscribe mechanism for N-to-N media forwarding *Rtpengine* does not (yet) support: @@ -579,6 +581,10 @@ a string and determines the type of message. Currently the following commands ar * stop media * play DTMF * statistics +* publish +* subscribe request +* subscribe answer +* unsubscribe The response dictionary must contain at least one key called `result`. The value can be either `ok` or `error`. For the `ping` command, the additional value `pong` is allowed. If the result is `error`, then another key @@ -632,10 +638,19 @@ Optionally included keys are: The SIP `Via` branch as string. Used to additionally refine the matching logic between media streams and calls and call branches. -* `label` +* `label` or `from-label` A custom free-form string which *rtpengine* remembers for this participating endpoint and reports - back in logs and statistics output. + back in logs and statistics output. For some commands (e.g. `block media`) the given label is not + used to set the label of the call participant, but rather to select an existing call participant. + +* `set-label` or `to-label` + + Some commands (e.g. `block media`) use the given `label` to select + an existing call participant. For these commands, `set-label` instead + of `label` can be used to set the label at the same time, either for + the selected call participant (if selected via `from-tag`) or for the + newly created participant (e.g. for `subscribe request`). * `flags` @@ -751,6 +766,14 @@ Optionally included keys are: even if the re-offer lists other codecs as preferred, or in a different order. Recommended to be combined with `single codec`. + - `allow transcoding` + + This flag is only useful in commands that provide an explicit answer SDP to *rtpengine* + (e.g. `subscribe answer`). For these commands, if the answer SDP does not accept all + codecs that were offered, the default behaviour is to reject the answer. With this flag + given, the answer will be accepted even if some codecs were rejected, and codecs will be + transcoded as required. + - `all` Only relevant to the `unblock media` and `unsilence media` @@ -876,6 +899,12 @@ Optionally included keys are: This special keyword is provided only for legacy support and should be considered obsolete. It will be removed in future versions. +* `interface` + + Contains a single string naming one of the configured interfaces, just like `direction` does. The + `interface` option is used instead of `direction` where only one interface is required (e.g. outside + of an offer/answer scenario), for example in the `publish` or `subscribe request` commands. + * `received from` Contains a list of exactly two elements. The first element denotes the address family and the second @@ -2003,6 +2032,75 @@ command. Sample return dictionary: "result": "ok" } +`publish` Message +----------------- + +Similar to an `offer` message except that it is used outside of an offer/answer +scenario. The media described by the SDP is published to *rtpengine* directly, +and other peer can then subscribe to the published media to receive a copy. + +The message must include the key `sdp` which should describe `sendonly` media; +and the key `call-id` and `from-tag` to identify the publisher. Most other keys +and options supported by `offer` are also supported for `publish`. + +The reply message will contain an answer SDP in `sdp`, but unlike with `offer` +this is not a rewritten version of the received SDP, but rather a `recvonly` +answer SDP generated by *rtpengine* locally. Only one codec for each media +section will be listed, and by default this will be the first supported codec +from the published media. This can be influenced with the `codec` options +described above. + +`subscribe request` Message +--------------------------- + +This message is used to request subscription (i.e. receiving a copy of the +media) to an existing call participant, which must have been created either +through the offer/answer mechanism, or through the publish mechanism. + +The call participant is selected in the same way as described under `block +DTMF` except that one call participant must be selected (i.e. the `all` keyword +cannot be used). This message then creates a new call participant, which +corresponds to the subscription. This new call participant will be identified +by a newly generated unique tag, or by the tag given in the `to-tag` key. If a +label is to be set for the newly created subscription, it can be set through +`set-label`. + +The reply message will contain a sendonly offer SDP in `sdp` which by default +will mirror the SDP of the call participant being subscribed to. This offer SDP +can be manipulated with the same flags as used in an `offer` message, including +the option to manipulate the codecs. The reply message will also contain the +`from-tag` (corresponding to the call participant being subscribed to) and the +`to-tag` (corresponding to the subscription, either generated or taken from the +received message). + +`subscribe answer` Message +-------------------------- + +This message is expected to be received after responding to a `subscribe +request` message. The message should contain the same `from-tag` and `to-tag` +is the reply to the `subscribe request` (although `label` etc can also be used +instead of the `from-tag`), as well as the answer SDP in `sdp`. + +By default, the answer SDP must accept all codecs that were presented in the +offer SDP (given in the reply to `subscribe request`). If not all codecs were +accepted, then the `subscribe answer` will be rejected. This behavious can be +changed by including the `allow transcoding` flag in the message. If this flag +is present, then the answer SDP will be accepted as long as at least one valid +codec is present, and the media will be transcoded as required. This also holds +true if some codecs were added for transcoding in the `subscribe request` +message, which means that `allow transcoding` must always be included in +`subscribe answer` if any transcoding is to be allowed. + +The reply message will simply indicate success or failure. If successful, media +forwarding will start to the endpoint given in the answer SDP. + +`unsubscribe` Message +--------------------- + +This message is a counterpart to `subsscribe answer` to stop an established +subscription. The subscription to be stopped is identified by `from-tag` and +`to`tag`. + The *tcp-ng* Control Protocol ========================= diff --git a/daemon/call.c b/daemon/call.c index ce0e67354..465393881 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -83,6 +83,7 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval static void __call_free(void *p); static void __call_cleanup(struct call *c); static void __monologue_stop(struct call_monologue *ml); +static void __dialogue_unkernelize(struct call_monologue *ml); static void media_stop(struct call_media *m); /* called with call->master_lock held in R */ @@ -821,7 +822,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con } static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigned int num_ports, - const struct endpoint *ep, const struct sdp_ng_flags *flags) + const struct endpoint *ep, const struct sdp_ng_flags *flags, bool always_resuse) { GList *l; struct endpoint_map *em; @@ -834,7 +835,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne em = l->data; if (em->logical_intf != media->logical_intf) continue; - if (em->wildcard && em->num_ports >= num_ports) { + if ((em->wildcard || always_resuse) && em->num_ports >= num_ports) { __C_DBG("found a wildcard endpoint map%s", ep ? " and filling it in" : ""); if (ep) { em->endpoint = *ep; @@ -960,7 +961,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_ports) { struct endpoint_map *em; - em = __get_endpoint_map(media, num_ports, NULL, NULL); + em = __get_endpoint_map(media, num_ports, NULL, NULL, false); if (!em) return -1; @@ -1235,12 +1236,11 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru unsigned int port_off = 0; la = A->streams.head; - lb = B->streams.head; + lb = B ? B->streams.head : NULL; while (la) { - assert(lb != NULL); a = la->data; - b = lb->data; + b = lb ? lb->data : NULL; /* RTP */ // reflect media - pretend reflection also for blackhole, as otherwise @@ -1248,7 +1248,7 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru // XXX still necessary for blackhole? if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) __add_sink_handler(&a->rtp_sinks, a); - else + else if (b) __add_sink_handler(&a->rtp_sinks, b); PS_SET(a, RTP); /* XXX technically not correct, could be udptl too */ @@ -1261,18 +1261,20 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru } bf_copy_same(&a->ps_flags, &A->media_flags, SHARED_FLAG_ICE); - PS_CLEAR(b, ZERO_ADDR); - if (is_addr_unspecified(&a->advertised_endpoint.address) - && !(is_trickle_ice_address(&a->advertised_endpoint) - && MEDIA_ISSET(A, TRICKLE_ICE)) - && !(flags && flags->replace_zero_address)) - PS_SET(b, ZERO_ADDR); + if (b) { + PS_CLEAR(b, ZERO_ADDR); + if (is_addr_unspecified(&a->advertised_endpoint.address) + && !(is_trickle_ice_address(&a->advertised_endpoint) + && MEDIA_ISSET(A, TRICKLE_ICE)) + && !(flags && flags->replace_zero_address)) + PS_SET(b, ZERO_ADDR); + } if (__init_stream(a)) return -1; /* RTCP */ - if (!MEDIA_ISSET(B, RTCP_MUX)) { + if (B && lb && b && !MEDIA_ISSET(B, RTCP_MUX)) { lb = lb->next; assert(lb != NULL); b = lb->data; @@ -1283,7 +1285,7 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru else { if (MEDIA_ISSET(A, ECHO) || MEDIA_ISSET(A, BLACKHOLE)) { /* RTCP sink handler added below */ } - else + else if (b) __add_sink_handler(&a->rtcp_sinks, b); PS_SET(a, RTCP); PS_CLEAR(a, IMPLICIT_RTCP); @@ -1302,7 +1304,7 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru if (MEDIA_ISSET(A, RTCP_MUX)) __add_sink_handler(&ax->rtcp_sinks, a); } - else + else if (b) __add_sink_handler(&a->rtcp_sinks, b); PS_CLEAR(a, RTP); PS_SET(a, RTCP); @@ -1326,11 +1328,13 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru bf_copy_same(&a->ps_flags, &A->media_flags, SHARED_FLAG_ICE); PS_CLEAR(a, ZERO_ADDR); - if (is_addr_unspecified(&b->advertised_endpoint.address) - && !(is_trickle_ice_address(&b->advertised_endpoint) - && MEDIA_ISSET(B, TRICKLE_ICE)) - && !(flags && flags->replace_zero_address)) - PS_SET(a, ZERO_ADDR); + if (b) { + if (is_addr_unspecified(&b->advertised_endpoint.address) + && !(is_trickle_ice_address(&b->advertised_endpoint) + && MEDIA_ISSET(B, TRICKLE_ICE)) + && !(flags && flags->replace_zero_address)) + PS_SET(a, ZERO_ADDR); + } if (__init_stream(a)) return -1; @@ -1339,7 +1343,7 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru recording_setup_stream(a); // RTCP la = la->next; - lb = lb->next; + lb = lb ? lb->next : NULL; port_off += 2; } @@ -1394,7 +1398,7 @@ static void __ice_offer(const struct sdp_ng_flags *flags, struct call_media *thi } } - if (flags->opmode == OP_OFFER) { + if (flags->opmode == OP_OFFER || flags->opmode == OP_REQUEST) { switch (flags->ice_lite_option) { case ICE_LITE_OFF: MEDIA_CLEAR(this, ICE_LITE_SELF); @@ -1419,6 +1423,21 @@ static void __ice_offer(const struct sdp_ng_flags *flags, struct call_media *thi if (flags->trickle_ice) MEDIA_SET(this, TRICKLE_ICE); } + else if (flags->opmode == OP_REQUEST) { + // leave source media (`other`) alone + switch (flags->ice_lite_option) { + case ICE_LITE_OFF: + case ICE_LITE_BKW: + MEDIA_CLEAR(this, ICE_LITE_SELF); + break; + case ICE_LITE_FWD: + case ICE_LITE_BOTH: + MEDIA_SET(this, ICE_LITE_SELF); + break; + default: + break; + } + } /* determine roles (even if we don't actually do ICE) */ /* this = receiver, other = sender */ @@ -1434,16 +1453,18 @@ static void __ice_offer(const struct sdp_ng_flags *flags, struct call_media *thi MEDIA_CLEAR(this, ICE_CONTROLLING); } - /* roles are reversed for the other side */ - if (MEDIA_ISSET(other, ICE_LITE_PEER) && !MEDIA_ISSET(other, ICE_LITE_SELF)) - MEDIA_SET(other, ICE_CONTROLLING); - else if (!MEDIA_ISSET(other, INITIALIZED)) { - if (MEDIA_ISSET(other, ICE_LITE_SELF)) - MEDIA_CLEAR(other, ICE_CONTROLLING); - else if (flags->opmode == OP_OFFER) - MEDIA_CLEAR(other, ICE_CONTROLLING); - else + if (flags->opmode == OP_OFFER || flags->opmode == OP_ANSWER) { + /* roles are reversed for the other side */ + if (MEDIA_ISSET(other, ICE_LITE_PEER) && !MEDIA_ISSET(other, ICE_LITE_SELF)) MEDIA_SET(other, ICE_CONTROLLING); + else if (!MEDIA_ISSET(other, INITIALIZED)) { + if (MEDIA_ISSET(other, ICE_LITE_SELF)) + MEDIA_CLEAR(other, ICE_CONTROLLING); + else if (flags->opmode == OP_OFFER) + MEDIA_CLEAR(other, ICE_CONTROLLING); + else + MEDIA_SET(other, ICE_CONTROLLING); + } } } @@ -1472,11 +1493,13 @@ static void __generate_crypto(const struct sdp_ng_flags *flags, struct call_medi { GQueue *cpq = &this->sdes_out; GQueue *cpq_in = &this->sdes_in; - GQueue *offered_cpq = &other->sdes_in; + const GQueue *offered_cpq = &other->sdes_in; if (!flags) return; + bool is_offer = (flags->opmode == OP_OFFER || flags->opmode == OP_REQUEST); + if (!this->protocol || !this->protocol->srtp || MEDIA_ISSET(this, PASSTHRU)) { crypto_params_sdes_queue_clear(cpq); /* clear crypto for the this leg b/c we are in passthrough mode */ @@ -1497,7 +1520,7 @@ static void __generate_crypto(const struct sdp_ng_flags *flags, struct call_medi return; } - if (flags->opmode == OP_OFFER) { + if (is_offer) { /* we always must offer actpass */ MEDIA_SET(this, SETUP_PASSIVE); MEDIA_SET(this, SETUP_ACTIVE); @@ -1510,7 +1533,7 @@ static void __generate_crypto(const struct sdp_ng_flags *flags, struct call_medi MEDIA_CLEAR(this, SETUP_PASSIVE); } - if (flags->opmode == OP_OFFER) { + if (is_offer) { // if neither is enabled yet... if (!MEDIA_ISSET2(this, DTLS, SDES)) { /* we offer both DTLS and SDES by default */ @@ -1539,7 +1562,7 @@ static void __generate_crypto(const struct sdp_ng_flags *flags, struct call_medi /* SDES parameters below */ - if (flags->opmode == OP_OFFER) { + if (is_offer) { // generate full set of params // re-create the entire list - steal for later flushing GQueue cpq_orig = *cpq; @@ -1699,7 +1722,7 @@ cps_match: } skip_sdes: - if (flags->opmode == OP_OFFER) { + if (is_offer) { if (MEDIA_ISSET(this, DTLS) && !this->fp_hash_func && flags->dtls_fingerprint.len) this->fp_hash_func = dtls_find_hash_func(&flags->dtls_fingerprint); } @@ -1773,6 +1796,13 @@ static void __disable_streams(struct call_media *media, unsigned int num_ports) } } +static void __rtcp_mux_set(const struct sdp_ng_flags *flags, struct call_media *media) { + if (flags->rtcp_mux_offer || flags->rtcp_mux_require) + MEDIA_SET(media, RTCP_MUX); + else if (flags->rtcp_mux_demux) + MEDIA_CLEAR(media, RTCP_MUX); +} + static void __rtcp_mux_logic(const struct sdp_ng_flags *flags, struct call_media *media, struct call_media *other_media) { @@ -1797,10 +1827,7 @@ static void __rtcp_mux_logic(const struct sdp_ng_flags *flags, struct call_media if (!MEDIA_ISSET(media, RTCP_MUX)) bf_copy_same(&media->media_flags, &other_media->media_flags, MEDIA_FLAG_RTCP_MUX); /* in our offer, we can override the client's choice */ - if (flags->rtcp_mux_offer || flags->rtcp_mux_require) - MEDIA_SET(media, RTCP_MUX); - else if (flags->rtcp_mux_demux) - MEDIA_CLEAR(media, RTCP_MUX); + __rtcp_mux_set(flags, media); /* we can also control what's going to happen in the answer. it * depends on what was offered, but by default we go with the other @@ -1987,6 +2014,8 @@ static void __endpoint_loop_protect(struct stream_params *sp, struct call_media // if (other_media->protocol && other_media->protocol->tcp) // intf_addr.type = socktype_tcp; intf_addr.addr = sp->rtp_endpoint.address; + if (!intf_addr.addr.family) // dummy/empty address + return; if (!is_local_endpoint(&intf_addr, sp->rtp_endpoint.port)) return; @@ -2002,11 +2031,13 @@ static void __update_media_id(struct call_media *media, struct call_media *other if (!flags) return; - struct call *call = media->call; - struct call_monologue *ml = media->monologue; + struct call *call = other_media->call; + struct call_monologue *ml = media ? media->monologue : NULL; struct call_monologue *other_ml = other_media->monologue; - if (flags->opmode == OP_OFFER) { + if (flags->opmode == OP_OFFER || flags->opmode == OP_OTHER || flags->opmode == OP_PUBLISH + || flags->opmode == OP_REQUEST) + { if (!other_media->media_id.s) { // incoming side: we copy what we received if (sp->media_id.s) @@ -2033,7 +2064,7 @@ static void __update_media_id(struct call_media *media, struct call_media *other ; } } - if (!media->media_id.s) { + if (media && !media->media_id.s) { // outgoing side: we copy from the other side if (other_media->media_id.s) call_str_cpy(call, &media->media_id, &other_media->media_id); @@ -2092,8 +2123,10 @@ static void __update_media_protocol(struct call_media *media, struct call_media STR_FMT(&other_media->type), STR_FMT(&sp->type)); call_str_cpy(other_media->call, &other_media->type, &sp->type); other_media->type_id = codec_get_type(&other_media->type); - call_str_cpy(media->call, &media->type, &sp->type); - media->type_id = other_media->type_id; + if (media) { + call_str_cpy(media->call, &media->type, &sp->type); + media->type_id = other_media->type_id; + } } /* deduct protocol from stream parameters received */ @@ -2107,29 +2140,31 @@ static void __update_media_protocol(struct call_media *media, struct call_media * Answers are a special case: handle OSRTP answer/accept, but otherwise * answer with the same protocol that was offered, unless we're instructed * not to. */ - if (flags && flags->opmode == OP_ANSWER) { - // OSRTP? - if (other_media->protocol && other_media->protocol->rtp - && !other_media->protocol->srtp - && media->protocol && media->protocol->osrtp) - { - // accept it? - if (flags->osrtp_accept) + if (media) { + if (flags && flags->opmode == OP_ANSWER) { + // OSRTP? + if (other_media->protocol && other_media->protocol->rtp + && !other_media->protocol->srtp + && media->protocol && media->protocol->osrtp) + { + // accept it? + if (flags->osrtp_accept) + ; + else + media->protocol = NULL; // reject + } + // pass through any other protocol change? + else if (!flags->protocol_accept) ; else - media->protocol = NULL; // reject + media->protocol = NULL; } - // pass through any other protocol change? - else if (!flags->protocol_accept) - ; else media->protocol = NULL; } - else - media->protocol = NULL; } /* default is to leave the protocol unchanged */ - if (!media->protocol) + if (media && !media->protocol) media->protocol = other_media->protocol; // handler overrides requested by the user @@ -2138,19 +2173,19 @@ static void __update_media_protocol(struct call_media *media, struct call_media /* allow override of outgoing protocol even if we know it already */ /* but only if this is an RTP-based protocol */ - if (flags->transport_protocol + if (media && flags->transport_protocol && proto_is_rtp(other_media->protocol)) media->protocol = flags->transport_protocol; // OSRTP offer requested? - if (media->protocol && media->protocol->rtp && !media->protocol->srtp + if (media && media->protocol && media->protocol->rtp && !media->protocol->srtp && media->protocol->osrtp_proto && flags->osrtp_offer && flags->opmode == OP_OFFER) { media->protocol = &transport_protocols[media->protocol->osrtp_proto]; } // T.38 decoder? - if (other_media->type_id == MT_IMAGE && proto_is(other_media->protocol, PROTO_UDPTL) + if (media && other_media->type_id == MT_IMAGE && proto_is(other_media->protocol, PROTO_UDPTL) && flags->t38_decode) { media->protocol = flags->transport_protocol; @@ -2162,7 +2197,7 @@ static void __update_media_protocol(struct call_media *media, struct call_media } // T.38 encoder? - if (other_media->type_id == MT_AUDIO && proto_is_rtp(other_media->protocol) + if (media && other_media->type_id == MT_AUDIO && proto_is_rtp(other_media->protocol) && flags->t38_force) { media->protocol = &transport_protocols[PROTO_UDPTL]; @@ -2173,7 +2208,7 @@ static void __update_media_protocol(struct call_media *media, struct call_media } // previous T.38 gateway but now stopping? - if (flags->t38_stop) { + if (media && flags->t38_stop) { if (other_media->type_id == MT_AUDIO && proto_is_rtp(other_media->protocol) && media->type_id == MT_IMAGE && proto_is(media->protocol, PROTO_UDPTL)) @@ -2191,7 +2226,7 @@ void codecs_offer_answer(struct call_media *media, struct call_media *other_medi { if (!flags || flags->opmode != OP_ANSWER) { // offer - ilogs(codec, LOG_DEBUG, "Updating receiver side codecs for offerer " STR_FORMAT " #%u", + ilogs(codec, LOG_DEBUG, "Updating codecs for offerer " STR_FORMAT " #%u", STR_FMT(&other_media->monologue->tag), other_media->index); if (flags) { @@ -2218,7 +2253,7 @@ void codecs_offer_answer(struct call_media *media, struct call_media *other_medi if (update_answerer) { // update/create answer/receiver side - ilogs(codec, LOG_DEBUG, "Updating receiver side codecs for answerer " STR_FORMAT " #%u", + ilogs(codec, LOG_DEBUG, "Updating codecs for answerer " STR_FORMAT " #%u", STR_FMT(&media->monologue->tag), media->index); if (flags && flags->reuse_codec) @@ -2249,7 +2284,7 @@ void codecs_offer_answer(struct call_media *media, struct call_media *other_medi } else { // answer - ilogs(codec, LOG_DEBUG, "Updating receiver side codecs for answerer " STR_FORMAT " #%u", + ilogs(codec, LOG_DEBUG, "Updating codecs for answerer " STR_FORMAT " #%u", STR_FMT(&other_media->monologue->tag), other_media->index); if (flags->reuse_codec) @@ -2333,6 +2368,159 @@ static void __update_init_subscribers(struct call_monologue *ml, GQueue *streams } } +static void __call_monologue_init_from_flags(struct call_monologue *ml, struct sdp_ng_flags *flags) { + struct call *call = ml->call; + + call->last_signal = rtpe_now.tv_sec; + call->deleted = 0; + + // reset offer ipv4/ipv6/mixed media stats + if (flags && flags->opmode == OP_OFFER) { + statistics_update_ip46_inc_dec(call, CMC_DECREMENT); + call->is_ipv4_media_offer = 0; + call->is_ipv6_media_offer = 0; + + // reset answer ipv4/ipv6/mixed media stats + } else if (flags && flags->opmode == OP_ANSWER) { + statistics_update_ip46_inc_dec(call, CMC_DECREMENT); + call->is_ipv4_media_answer = 0; + call->is_ipv6_media_answer = 0; + } + + __tos_change(call, flags); + + if (flags && flags->label.s) { + call_str_cpy(call, &ml->label, &flags->label); + g_hash_table_replace(call->labels, &ml->label, ml); + } + +} + +// `media` can be NULL +static int __media_init_from_flags(struct call_media *other_media, struct call_media *media, + struct stream_params *sp, struct sdp_ng_flags *flags) +{ + struct call *call = other_media->call; + + if (flags && flags->fragment) { + // trickle ICE SDP fragment. don't do anything other than update + // the ICE stuff. + if (!MEDIA_ISSET(other_media, TRICKLE_ICE)) + return ERROR_NO_ICE_AGENT; + if (!other_media->ice_agent) + return ERROR_NO_ICE_AGENT; + ice_update(other_media->ice_agent, sp); + return 1; // done, continue + } + + if (flags && flags->opmode == OP_OFFER && flags->reset) { + if (media) + MEDIA_CLEAR(media, INITIALIZED); + MEDIA_CLEAR(other_media, INITIALIZED); + if (media && media->ice_agent) + ice_restart(media->ice_agent); + if (other_media->ice_agent) + ice_restart(other_media->ice_agent); + } + + if (flags && flags->generate_rtcp) { + if (media) + MEDIA_SET(media, RTCP_GEN); + MEDIA_SET(other_media, RTCP_GEN); + } + else if (flags && flags->generate_rtcp_off) { + if (media) + MEDIA_CLEAR(media, RTCP_GEN); + MEDIA_CLEAR(other_media, RTCP_GEN); + } + + if (flags) { + switch (flags->media_echo) { + case MEO_FWD: + MEDIA_SET(media, ECHO); + MEDIA_SET(other_media, BLACKHOLE); + MEDIA_CLEAR(media, BLACKHOLE); + MEDIA_CLEAR(other_media, ECHO); + break; + case MEO_BKW: + MEDIA_SET(media, BLACKHOLE); + MEDIA_SET(other_media, ECHO); + MEDIA_CLEAR(media, ECHO); + MEDIA_CLEAR(other_media, BLACKHOLE); + break; + case MEO_BOTH: + MEDIA_SET(media, ECHO); + MEDIA_SET(other_media, ECHO); + MEDIA_CLEAR(media, BLACKHOLE); + MEDIA_CLEAR(other_media, BLACKHOLE); + break; + case MEO_BLACKHOLE: + MEDIA_SET(media, BLACKHOLE); + MEDIA_SET(other_media, BLACKHOLE); + MEDIA_CLEAR(media, ECHO); + MEDIA_CLEAR(other_media, ECHO); + case MEO_DEFAULT: + break; + } + } + + __update_media_protocol(media, other_media, sp, flags); + __update_media_id(media, other_media, sp, flags); + __endpoint_loop_protect(sp, other_media); + + if (sp->rtp_endpoint.port) { + /* copy parameters advertised by the sender of this message */ + bf_copy_same(&other_media->media_flags, &sp->sp_flags, + SHARED_FLAG_RTCP_MUX | SHARED_FLAG_ASYMMETRIC | SHARED_FLAG_UNIDIRECTIONAL | + SHARED_FLAG_ICE | SHARED_FLAG_TRICKLE_ICE | SHARED_FLAG_ICE_LITE_PEER | + SHARED_FLAG_RTCP_FB); + + // duplicate the entire queue of offered crypto params + crypto_params_sdes_queue_clear(&other_media->sdes_in); + crypto_params_sdes_queue_copy(&other_media->sdes_in, &sp->sdes_params); + + if (other_media->sdes_in.length) { + MEDIA_SET(other_media, SDES); + __sdes_accept(other_media, flags); + } + } + + // codec and RTP payload types handling + if (sp->ptime > 0) { + if (media && !MEDIA_ISSET(media, PTIME_OVERRIDE)) + media->ptime = sp->ptime; + if (!MEDIA_ISSET(other_media, PTIME_OVERRIDE)) + other_media->ptime = sp->ptime; + } + if (media && flags && flags->ptime > 0) { + media->ptime = flags->ptime; + MEDIA_SET(media, PTIME_OVERRIDE); + MEDIA_SET(other_media, PTIME_OVERRIDE); + } + if (flags && flags->rev_ptime > 0) { + other_media->ptime = flags->rev_ptime; + if (media) + MEDIA_SET(media, PTIME_OVERRIDE); + MEDIA_SET(other_media, PTIME_OVERRIDE); + } + if (str_cmp_str(&other_media->format_str, &sp->format_str)) + call_str_cpy(call, &other_media->format_str, &sp->format_str); + if (media && str_cmp_str(&media->format_str, &sp->format_str)) { + // update opposite side format string only if protocols match + if (media->protocol == other_media->protocol) + call_str_cpy(call, &media->format_str, &sp->format_str); + } + + // deduct address family from stream parameters received + other_media->desired_family = sp->rtp_endpoint.address.family; + // for outgoing SDP, use "direction"/DF or default to what was offered + if (media && !media->desired_family) + media->desired_family = other_media->desired_family; + if (media && sp->desired_family) + media->desired_family = sp->desired_family; + + return 0; +} /* called with call->master_lock held in W */ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, @@ -2342,7 +2530,6 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, GList *media_iter, *ml_media, *other_ml_media; struct call_media *media, *other_media; struct endpoint_map *em; - struct call *call; struct call_monologue *other_ml = dialogue[0]; struct call_monologue *monologue = dialogue[1]; @@ -2353,35 +2540,12 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, return -1; } - call = monologue->call; - - call->last_signal = MAX(call->last_signal, rtpe_now.tv_sec); - call->deleted = 0; + __call_monologue_init_from_flags(other_ml, flags); __C_DBG("this="STR_FORMAT" other="STR_FORMAT, STR_FMT(&monologue->tag), STR_FMT(&other_ml->tag)); - __tos_change(call, flags); - - if (flags && flags->label.s) { - call_str_cpy(call, &other_ml->label, &flags->label); - g_hash_table_replace(call->labels, &other_ml->label, other_ml); - } - ml_media = other_ml_media = NULL; - // reset offer ipv4/ipv6/mixed media stats - if (flags && flags->opmode == OP_OFFER) { - statistics_update_ip46_inc_dec(call, CMC_DECREMENT); - call->is_ipv4_media_offer = 0; - call->is_ipv6_media_offer = 0; - - // reset answer ipv4/ipv6/mixed media stats - } else if (flags && flags->opmode == OP_ANSWER) { - statistics_update_ip46_inc_dec(call, CMC_DECREMENT); - call->is_ipv4_media_answer = 0; - call->is_ipv6_media_answer = 0; - } - for (media_iter = streams->head; media_iter; media_iter = media_iter->next) { sp = media_iter->data; __C_DBG("processing media stream #%u", sp->index); @@ -2397,110 +2561,8 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, * THIS side (recipient) before, then the structs will be populated with * details already. */ - if (flags && flags->fragment) { - // trickle ICE SDP fragment. don't do anything other than update - // the ICE stuff. - if (!MEDIA_ISSET(other_media, TRICKLE_ICE)) - return ERROR_NO_ICE_AGENT; - if (!other_media->ice_agent) - return ERROR_NO_ICE_AGENT; - ice_update(other_media->ice_agent, sp); + if (__media_init_from_flags(other_media, media, sp, flags) == 1) continue; - } - - if (flags && flags->opmode == OP_OFFER && flags->reset) { - MEDIA_CLEAR(media, INITIALIZED); - MEDIA_CLEAR(other_media, INITIALIZED); - if (media->ice_agent) - ice_restart(media->ice_agent); - if (other_media->ice_agent) - ice_restart(other_media->ice_agent); - } - - if (flags && flags->generate_rtcp) { - MEDIA_SET(media, RTCP_GEN); - MEDIA_SET(other_media, RTCP_GEN); - } - else if (flags && flags->generate_rtcp_off) { - MEDIA_CLEAR(media, RTCP_GEN); - MEDIA_CLEAR(other_media, RTCP_GEN); - } - - if (flags) { - switch (flags->media_echo) { - case MEO_FWD: - MEDIA_SET(media, ECHO); - MEDIA_SET(other_media, BLACKHOLE); - MEDIA_CLEAR(media, BLACKHOLE); - MEDIA_CLEAR(other_media, ECHO); - break; - case MEO_BKW: - MEDIA_SET(media, BLACKHOLE); - MEDIA_SET(other_media, ECHO); - MEDIA_CLEAR(media, ECHO); - MEDIA_CLEAR(other_media, BLACKHOLE); - break; - case MEO_BOTH: - MEDIA_SET(media, ECHO); - MEDIA_SET(other_media, ECHO); - MEDIA_CLEAR(media, BLACKHOLE); - MEDIA_CLEAR(other_media, BLACKHOLE); - break; - case MEO_BLACKHOLE: - MEDIA_SET(media, BLACKHOLE); - MEDIA_SET(other_media, BLACKHOLE); - MEDIA_CLEAR(media, ECHO); - MEDIA_CLEAR(other_media, ECHO); - case MEO_DEFAULT: - break; - } - } - - __update_media_protocol(media, other_media, sp, flags); - __update_media_id(media, other_media, sp, flags); - __endpoint_loop_protect(sp, other_media); - - if (sp->rtp_endpoint.port) { - /* copy parameters advertised by the sender of this message */ - bf_copy_same(&other_media->media_flags, &sp->sp_flags, - SHARED_FLAG_RTCP_MUX | SHARED_FLAG_ASYMMETRIC | SHARED_FLAG_UNIDIRECTIONAL | - SHARED_FLAG_ICE | SHARED_FLAG_TRICKLE_ICE | SHARED_FLAG_ICE_LITE_PEER | - SHARED_FLAG_RTCP_FB); - - // duplicate the entire queue of offered crypto params - crypto_params_sdes_queue_clear(&other_media->sdes_in); - crypto_params_sdes_queue_copy(&other_media->sdes_in, &sp->sdes_params); - - if (other_media->sdes_in.length) { - MEDIA_SET(other_media, SDES); - __sdes_accept(other_media, flags); - } - } - - // codec and RTP payload types handling - if (sp->ptime > 0) { - if (!MEDIA_ISSET(media, PTIME_OVERRIDE)) - media->ptime = sp->ptime; - if (!MEDIA_ISSET(other_media, PTIME_OVERRIDE)) - other_media->ptime = sp->ptime; - } - if (flags && flags->ptime > 0) { - media->ptime = flags->ptime; - MEDIA_SET(media, PTIME_OVERRIDE); - MEDIA_SET(other_media, PTIME_OVERRIDE); - } - if (flags && flags->rev_ptime > 0) { - other_media->ptime = flags->rev_ptime; - MEDIA_SET(media, PTIME_OVERRIDE); - MEDIA_SET(other_media, PTIME_OVERRIDE); - } - if (str_cmp_str(&other_media->format_str, &sp->format_str)) - call_str_cpy(call, &other_media->format_str, &sp->format_str); - if (str_cmp_str(&media->format_str, &sp->format_str)) { - // update opposite side format string only if protocols match - if (media->protocol == other_media->protocol) - call_str_cpy(call, &media->format_str, &sp->format_str); - } codecs_offer_answer(media, other_media, sp, flags); @@ -2510,14 +2572,6 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, bf_copy(&other_media->media_flags, MEDIA_FLAG_RECV, &sp->sp_flags, SP_FLAG_SEND); bf_copy(&other_media->media_flags, MEDIA_FLAG_SEND, &sp->sp_flags, SP_FLAG_RECV); - /* deduct address family from stream parameters received */ - other_media->desired_family = sp->rtp_endpoint.address.family; - /* for outgoing SDP, use "direction"/DF or default to what was offered */ - if (!media->desired_family) - media->desired_family = other_media->desired_family; - if (sp->desired_family) - media->desired_family = sp->desired_family; - if (sp->rtp_endpoint.port) { /* DTLS stuff */ __dtls_logic(flags, other_media, sp); @@ -2527,7 +2581,6 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, /* SDES and DTLS */ __generate_crypto(flags, media, other_media); - } if (media->desired_family->af == AF_INET) { @@ -2579,7 +2632,7 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, /* get that many ports for each side, and one packet stream for each port, then * assign the ports to the streams */ - em = __get_endpoint_map(media, sp->num_ports, &sp->rtp_endpoint, flags); + em = __get_endpoint_map(media, sp->num_ports, &sp->rtp_endpoint, flags, false); if (!em) { goto error_ports; } @@ -2604,7 +2657,7 @@ int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, // set ipv4/ipv6/mixed media stats if (flags && (flags->opmode == OP_OFFER || flags->opmode == OP_ANSWER)) { - statistics_update_ip46_inc_dec(call, CMC_INCREMENT); + statistics_update_ip46_inc_dec(monologue->call, CMC_INCREMENT); } return 0; @@ -2686,6 +2739,199 @@ static void __subscribe_only_one_offer_answer(struct call_monologue *which, stru __add_subscription(which, to, true); } + + +/* called with call->master_lock held in W */ +int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_flags *flags) { + __call_monologue_init_from_flags(ml, flags); + + GList *media_iter = NULL; + + for (GList *l = streams->head; l; l = l->next) { + struct stream_params *sp = l->data; + struct call_media *media = __get_media(ml, &media_iter, sp, flags); + + __media_init_from_flags(media, NULL, sp, flags); + + codec_store_populate(&media->codecs, &sp->codecs, NULL); + if (codec_store_accept_one(&media->codecs, &flags->codec_accept)) + return -1; + + // the most we can do is receive + bf_copy(&media->media_flags, MEDIA_FLAG_RECV, &sp->sp_flags, SP_FLAG_SEND); + + if (sp->rtp_endpoint.port) { + /* DTLS stuff */ + __dtls_logic(flags, media, sp); + } + + /* local interface selection */ + __init_interface(media, &flags->interface, sp->num_ports); + + if (media->logical_intf == NULL) + return -1; // XXX return error code + + /* ICE stuff - must come after interface and address family selection */ + __ice_offer(flags, media, media); + + MEDIA_SET(media, INITIALIZED); + + if (!sp->rtp_endpoint.port) { + /* Zero port: stream has been rejected. + * RFC 3264, chapter 6: + * If a stream is rejected, the offerer and answerer MUST NOT + * generate media (or RTCP packets) for that stream. */ + __disable_streams(media, sp->num_ports); + continue; + } + + struct endpoint_map *em = __get_endpoint_map(media, sp->num_ports, NULL, flags, true); + if (!em) + return -1; // XXX error - no ports + + __num_media_streams(media, sp->num_ports); + __assign_stream_fds(media, &em->intf_sfds); + + // XXX this should be covered by __update_init_subscribers ? + if (__init_streams(media, NULL, sp, flags)) + return -1; + __ice_start(media); + ice_update(media->ice_agent, sp); + } + + return 0; +} + +/* called with call->master_lock held in W */ +int monologue_subscribe_request(struct call_monologue *src_ml, struct call_monologue *dst_ml, + struct sdp_ng_flags *flags) +{ + __call_monologue_init_from_flags(dst_ml, flags); + + GList *dst_media_it = NULL; + GList *src_media_it = NULL; + + for (GList *l = src_ml->last_in_sdp_streams.head; l; l = l->next) { + struct stream_params *sp = l->data; + + struct call_media *dst_media = __get_media(dst_ml, &dst_media_it, sp, flags); + struct call_media *src_media = __get_media(src_ml, &src_media_it, sp, flags); + + if (__media_init_from_flags(src_media, dst_media, sp, flags) == 1) + continue; + + codec_store_populate(&dst_media->codecs, &src_media->codecs, NULL); + codec_store_strip(&dst_media->codecs, &flags->codec_strip, flags->codec_except); + codec_store_strip(&dst_media->codecs, &flags->codec_consume, flags->codec_except); + codec_store_strip(&dst_media->codecs, &flags->codec_mask, flags->codec_except); + codec_store_offer(&dst_media->codecs, &flags->codec_offer, &sp->codecs); + codec_store_transcode(&dst_media->codecs, &flags->codec_transcode, &sp->codecs); + codec_store_synthesise(&dst_media->codecs, &src_media->codecs); + + codec_handlers_update(dst_media, src_media, flags, sp); + + MEDIA_SET(dst_media, SEND); + MEDIA_CLEAR(dst_media, RECV); + + __rtcp_mux_set(flags, dst_media); + __generate_crypto(flags, dst_media, src_media); + + // interface selection + __init_interface(dst_media, &flags->interface, sp->num_ports); + if (dst_media->logical_intf == NULL) + return -1; // XXX return error code + + __ice_offer(flags, dst_media, src_media); + + struct endpoint_map *em = __get_endpoint_map(dst_media, sp->num_ports, NULL, flags, true); + if (!em) + return -1; // XXX error - no ports + + __num_media_streams(dst_media, sp->num_ports); + __assign_stream_fds(dst_media, &em->intf_sfds); + + if (__init_streams(dst_media, NULL, NULL, flags)) + return -1; + } + + __update_init_subscribers(src_ml, NULL, NULL); + __update_init_subscribers(dst_ml, NULL, NULL); + + return 0; +} +/* called with call->master_lock held in W */ +int monologue_subscribe_answer(struct call_monologue *src_ml, struct call_monologue *dst_ml, + struct sdp_ng_flags *flags, GQueue *streams) +{ + GList *dst_media_it = NULL; + GList *src_media_it = NULL; + + for (GList *l = streams->head; l; l = l->next) { + struct stream_params *sp = l->data; + + struct call_media *dst_media = __get_media(dst_ml, &dst_media_it, sp, flags); + struct call_media *src_media = __get_media(src_ml, &src_media_it, sp, flags); + + if (__media_init_from_flags(dst_media, NULL, sp, flags) == 1) + continue; + + if (flags && flags->allow_transcoding) { + codec_store_populate(&dst_media->codecs, &sp->codecs, flags->codec_set); + codec_store_strip(&dst_media->codecs, &flags->codec_strip, flags->codec_except); + codec_store_offer(&dst_media->codecs, &flags->codec_offer, &sp->codecs); + } + else { + codec_store_populate(&dst_media->codecs, &sp->codecs, NULL); + if (!codec_store_is_full_answer(&src_media->codecs, &dst_media->codecs)) + return -1; + } + + codec_handlers_update(src_media, dst_media, NULL, NULL); + codec_handlers_update(dst_media, src_media, flags, sp); + + __dtls_logic(flags, dst_media, sp); + + if (__init_streams(dst_media, NULL, sp, flags)) + return -1; + + MEDIA_CLEAR(dst_media, RECV); + + // XXX check answer SDP parameters + + MEDIA_SET(dst_media, INITIALIZED); + } + + __unsubscribe_one(dst_ml, src_ml); + __add_subscription(dst_ml, src_ml, false); + + __update_init_subscribers(dst_ml, streams, flags); + __update_init_subscribers(src_ml, NULL, NULL); + + __dialogue_unkernelize(src_ml); + __dialogue_unkernelize(dst_ml); + + return 0; +} + +/* called with call->master_lock held in W */ +int monologue_unsubscribe(struct call_monologue *src_ml, struct call_monologue *dst_ml, + struct sdp_ng_flags *flags) +{ + if (!__unsubscribe_one(dst_ml, src_ml)) + return -1; + + __update_init_subscribers(dst_ml, NULL, NULL); + __update_init_subscribers(src_ml, NULL, NULL); + + __dialogue_unkernelize(src_ml); + __dialogue_unkernelize(dst_ml); + + return 0; +} + + + + static int __rtp_stats_sort(const void *ap, const void *bp) { const struct rtp_stats *a = ap, *b = bp; @@ -3433,6 +3679,15 @@ static void __fix_other_tags(struct call_monologue *one) { struct call_monologue *call_get_monologue(struct call *call, const str *fromtag) { return g_hash_table_lookup(call->tags, fromtag); } +/* must be called with call->master_lock held in W */ +struct call_monologue *call_get_or_create_monologue(struct call *call, const str *fromtag) { + struct call_monologue *ret = call_get_monologue(call, fromtag); + if (!ret) { + ret = __monologue_create(call); + __monologue_tag(ret, fromtag); + } + return ret; +} /* must be called with call->master_lock held in W */ static int call_get_monologue_new(struct call_monologue *dialogue[2], struct call *call, @@ -3443,7 +3698,7 @@ static int call_get_monologue_new(struct call_monologue *dialogue[2], struct cal __C_DBG("getting monologue for tag '"STR_FORMAT"' in call '"STR_FORMAT"'", STR_FMT(fromtag), STR_FMT(&call->callid)); - ret = g_hash_table_lookup(call->tags, fromtag); + ret = call_get_monologue(call, fromtag); if (!ret) { ret = __monologue_create(call); __monologue_tag(ret, fromtag); @@ -3523,12 +3778,12 @@ static int call_get_dialogue(struct call_monologue *dialogue[2], struct call *ca STR_FMT(fromtag), STR_FMT(totag), STR_FMT(&call->callid)); /* we start with the to-tag. if it's not known, we treat it as a branched offer */ - tt = g_hash_table_lookup(call->tags, totag); + tt = call_get_monologue(call, totag); if (!tt) return call_get_monologue_new(dialogue, call, fromtag, totag, viabranch); /* if the from-tag is known already, return that */ - ft = g_hash_table_lookup(call->tags, fromtag); + ft = call_get_monologue(call, fromtag); if (ft) { __C_DBG("found existing dialogue"); @@ -3657,7 +3912,7 @@ int call_delete_branch(const str *callid, const str *branch, match_tag = (totag && totag->len) ? totag : fromtag; - ml = g_hash_table_lookup(c->tags, match_tag); + ml = call_get_monologue(c, match_tag); if (!ml) { if (branch && branch->len) { // also try a via-branch match here @@ -3669,7 +3924,7 @@ int call_delete_branch(const str *callid, const str *branch, // last resort: try the from-tag if we tried the to-tag before and see // if the associated dialogue has an empty tag (unknown) if (match_tag == totag) { - ml = g_hash_table_lookup(c->tags, fromtag); + ml = call_get_monologue(c, fromtag); if (ml && ml->subscriptions.length == 1) { struct call_subscription *cs = ml->subscriptions.head->data; if (cs->monologue->tag.len == 0) diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 3dd4356e2..edca14039 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -871,6 +871,9 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { case CSH_LOOKUP("single-codec"): out->single_codec = 1; break; + case CSH_LOOKUP("allow-transcoding"): + out->allow_transcoding = 1; + break; case CSH_LOOKUP("inject-DTMF"): out->inject_dtmf = 1; break; @@ -907,7 +910,7 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { &out->codec_except)) return; #ifdef WITH_TRANSCODING - if (out->opmode == OP_OFFER) { + if (out->opmode == OP_OFFER || out->opmode == OP_REQUEST || out->opmode == OP_PUBLISH) { if (call_ng_flags_prefix(out, s, "transcode-", call_ng_flags_codec_list, &out->codec_transcode)) return; @@ -957,9 +960,11 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu bencode_dictionary_get_str(input, "from-tag", &out->from_tag); bencode_dictionary_get_str(input, "to-tag", &out->to_tag); bencode_dictionary_get_str(input, "via-branch", &out->via_branch); - bencode_dictionary_get_str(input, "label", &out->label); + bencode_get_alt(input, "label", "from-label", &out->label); + bencode_get_alt(input, "to-label", "set-label", &out->set_label); bencode_dictionary_get_str(input, "address", &out->address); bencode_get_alt(input, "sdp", "SDP", &out->sdp); + bencode_dictionary_get_str(input, "interface", &out->interface); diridx = 0; if ((list = bencode_dictionary_get_expect(input, "direction", BENCODE_LIST))) { @@ -1145,7 +1150,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu call_ng_flags_list(out, dict, "offer", call_ng_flags_codec_list, &out->codec_offer); call_ng_flags_list(out, dict, "except", call_ng_flags_str_ht, &out->codec_except); #ifdef WITH_TRANSCODING - if (opmode == OP_OFFER) { + if (opmode == OP_OFFER || opmode == OP_REQUEST || opmode == OP_PUBLISH) { call_ng_flags_list(out, dict, "transcode", call_ng_flags_codec_list, &out->codec_transcode); call_ng_flags_list(out, dict, "mask", call_ng_flags_codec_list, &out->codec_mask); call_ng_flags_list(out, dict, "set", call_ng_flags_str_ht_split, &out->codec_set); @@ -2044,6 +2049,12 @@ found: __monologue_unkernelize(*monologue); } + // for generic ops, handle set-label here if given + if (opmode == OP_OTHER && flags->set_label.len && *monologue) { + call_str_cpy(*call, &(*monologue)->label, &flags->set_label); + g_hash_table_replace((*call)->labels, &(*monologue)->label, *monologue); + } + return NULL; } @@ -2460,6 +2471,174 @@ found_sink: #endif } + +const char *call_publish_ng(bencode_item_t *input, bencode_item_t *output) { + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + AUTO_CLEANUP(GQueue parsed, sdp_free) = G_QUEUE_INIT; + AUTO_CLEANUP(GQueue streams, sdp_streams_free) = G_QUEUE_INIT; + AUTO_CLEANUP(str sdp_in, str_free_dup) = STR_NULL; + AUTO_CLEANUP(str sdp_out, str_free_dup) = STR_NULL; + + call_ng_process_flags(&flags, input, OP_PUBLISH); + + if (!flags.sdp.s) + return "No SDP body in message"; + if (!flags.call_id.s) + return "No call-id in message"; + if (!flags.from_tag.s) + return "No from-tag in message"; + + str_init_dup_str(&sdp_in, &flags.sdp); + + if (sdp_parse(&sdp_in, &parsed, &flags)) + return "Failed to parse SDP"; + if (sdp_streams(&parsed, &streams, &flags)) + return "Incomplete SDP specification"; + + struct call *call = call_get_or_create(&flags.call_id, false, false); + struct call_monologue *ml = call_get_or_create_monologue(call, &flags.from_tag); + + int ret = monologue_publish(ml, &streams, &flags); + if (ret) + ilog(LOG_ERR, "Publish error"); // XXX close call? handle errors? + + ret = sdp_create(&sdp_out, ml, &flags); + if (!ret) { + save_last_sdp(ml, &sdp_in, &parsed, &streams); + bencode_buffer_destroy_add(output->buffer, g_free, sdp_out.s); + bencode_dictionary_add_str(output, "sdp", &sdp_out); + sdp_out = STR_NULL; // ownership passed to output + } + + rwlock_unlock_w(&call->master_lock); + obj_put(call); + + if (!ret) + return NULL; + return "Failed to create SDP"; +} + + +const char *call_subscribe_request_ng(bencode_item_t *input, bencode_item_t *output) { + const char *err = NULL; + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + char rand_buf[65]; + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + struct call_monologue *source_ml; + + // get source monologue + err = media_block_match(&call, &source_ml, &flags, input, OP_REQUEST); + if (err) + return err; + + if (flags.sdp.len) + ilog(LOG_INFO, "Subscribe-request with SDP received - ignoring SDP"); + + if (!source_ml) + return "No call participant specified"; + if (!source_ml->last_in_sdp.len || !source_ml->last_in_sdp_parsed.length) + return "No SDP known for this from-tag"; + + // the `label=` option was possibly used above to select the from-tag -- + // switch it out with `to-label=` or `set-label=` for monologue_subscribe_request + // below which sets the label based on `label` for a newly created monologue + flags.label = flags.set_label; + + // get destination monologue + if (!flags.to_tag.len) { + // generate one + flags.to_tag = STR_CONST_INIT(rand_buf); + rand_hex_str(flags.to_tag.s, flags.to_tag.len / 2); + } + struct call_monologue *dest_ml = call_get_or_create_monologue(call, &flags.to_tag); + + struct sdp_chopper *chopper = sdp_chopper_new(&source_ml->last_in_sdp); + bencode_buffer_destroy_add(output->buffer, (free_func_t) sdp_chopper_destroy, chopper); + + int ret = monologue_subscribe_request(source_ml, dest_ml, &flags); + if (ret) + return "Failed to request subscription"; + + ret = sdp_replace(chopper, &source_ml->last_in_sdp_parsed, dest_ml, &flags); + if (ret) + return "Failed to rewrite SDP"; + + if (chopper->output->len) + bencode_dictionary_add_string_len(output, "sdp", chopper->output->str, chopper->output->len); + bencode_dictionary_add_str_dup(output, "from-tag", &source_ml->tag); + bencode_dictionary_add_str_dup(output, "to-tag", &dest_ml->tag); + + return NULL; +} + + +const char *call_subscribe_answer_ng(bencode_item_t *input, bencode_item_t *output) { + const char *err = NULL; + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + AUTO_CLEANUP(GQueue parsed, sdp_free) = G_QUEUE_INIT; + AUTO_CLEANUP(GQueue streams, sdp_streams_free) = G_QUEUE_INIT; + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + struct call_monologue *source_ml; + + // get source monologue + err = media_block_match(&call, &source_ml, &flags, input, OP_REQ_ANSWER); + if (err) + return err; + + if (!source_ml) + return "No call participant specified"; + if (!flags.to_tag.s) + return "No to-tag in message"; + if (!flags.sdp.len) + return "No SDP body in message"; + + // get destination monologue + struct call_monologue *dest_ml = call_get_monologue(call, &flags.to_tag); + if (!dest_ml) + return "To-tag not found"; + + if (sdp_parse(&flags.sdp, &parsed, &flags)) + return "Failed to parse SDP"; + if (sdp_streams(&parsed, &streams, &flags)) + return "Incomplete SDP specification"; + + int ret = monologue_subscribe_answer(source_ml, dest_ml, &flags, &streams); + if (ret) + return "Failed to process subscription answer"; + + return NULL; +} + + +const char *call_unsubscribe_ng(bencode_item_t *input, bencode_item_t *output) { + const char *err = NULL; + AUTO_CLEANUP(struct sdp_ng_flags flags, call_ng_free_flags); + AUTO_CLEANUP_NULL(struct call *call, call_unlock_release); + struct call_monologue *source_ml; + + // get source monologue + err = media_block_match(&call, &source_ml, &flags, input, OP_OTHER); + if (err) + return err; + + if (!source_ml) + return "No call participant specified"; + if (!flags.to_tag.s) + return "No to-tag in message"; + + // get destination monologue + struct call_monologue *dest_ml = call_get_or_create_monologue(call, &flags.to_tag); + if (!dest_ml) + return "To-tag not found"; + + int ret = monologue_unsubscribe(source_ml, dest_ml, &flags); + if (ret) + return "Failed to unsubscribe"; + + return NULL; +} + + void call_interfaces_free() { if (info_re) { pcre_free(info_re); diff --git a/daemon/codec.c b/daemon/codec.c index 4fe526685..70ef31380 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -3849,6 +3849,21 @@ void codec_store_synthesise(struct codec_store *dst, struct codec_store *opposit } } +// check all codecs listed in the source are also be present in the answer (dst) +bool codec_store_is_full_answer(const struct codec_store *src, const struct codec_store *dst) { + for (GList *l = src->codec_prefs.head; l; l = l->next) { + const struct rtp_payload_type *src_pt = l->data; + const struct rtp_payload_type *dst_pt = g_hash_table_lookup(dst->codecs, + GINT_TO_POINTER(src_pt->payload_type)); + if (!dst_pt || rtp_payload_type_cmp(src_pt, dst_pt)) { + ilogs(codec, LOG_DEBUG, "Source codec " STR_FORMAT " is not present in the answer", + STR_FMT(&src_pt->encoding_with_params)); + return false; + } + } + return true; +} + static void codec_timers_run(void *p) { struct codec_timer *ct = p; ct->func(ct); diff --git a/daemon/control_ng.c b/daemon/control_ng.c index f5ce5ae4f..b5569d6cd 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -39,12 +39,15 @@ const char *ng_command_strings[NGC_COUNT] = { "stop recording", "start forwarding", "stop forwarding", "block DTMF", "unblock DTMF", "block media", "unblock media", "play media", "stop media", "play DTMF", "statistics", "silence media", "unsilence media", + "publish", "subscribe request", + "subscribe answer", "unsubscribe", }; const char *ng_command_strings_short[NGC_COUNT] = { "Ping", "Offer", "Answer", "Delete", "Query", "List", "StartRec", "StopRec", "StartFwd", "StopFwd", "BlkDTMF", "UnblkDTMF", "BlkMedia", "UnblkMedia", "PlayMedia", "StopMedia", "PlayDTMF", "Stats", "SlnMedia", "UnslnMedia", + "Pub", "SubReq", "SubAns", "Unsub", }; static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) { @@ -306,6 +309,22 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, errstr = statistics_ng(dict, resp); command = NGC_STATISTICS; break; + case CSH_LOOKUP("publish"): + errstr = call_publish_ng(dict, resp); + command = NGC_PUBLISH; + break; + case CSH_LOOKUP("subscribe request"): + errstr = call_subscribe_request_ng(dict, resp); + command = NGC_SUBSCRIBE_REQ; + break; + case CSH_LOOKUP("subscribe answer"): + errstr = call_subscribe_answer_ng(dict, resp); + command = NGC_SUBSCRIBE_ANS; + break; + case CSH_LOOKUP("unsubscribe"): + errstr = call_unsubscribe_ng(dict, resp); + command = NGC_UNSUBSCRIBE; + break; default: errstr = "Unrecognized command"; } diff --git a/daemon/sdp.c b/daemon/sdp.c index f81a223c3..89abe103b 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -2496,7 +2496,8 @@ struct packet_stream *print_rtcp(GString *s, struct call_media *media, GList *rt if (proto_is_rtp(media->protocol)) { if (MEDIA_ISSET(media, RTCP_MUX) && (flags->opmode == OP_ANSWER || flags->opmode == OP_OTHER - || (flags->opmode == OP_OFFER + || flags->opmode == OP_PUBLISH + || ((flags->opmode == OP_OFFER || flags->opmode == OP_REQUEST) && flags->rtcp_mux_require))) { insert_rtcp_attr(s, ps, flags); diff --git a/include/call.h b/include/call.h index 9d5ca2550..a8ee9874e 100644 --- a/include/call.h +++ b/include/call.h @@ -50,6 +50,9 @@ enum stream_address_format { enum call_opmode { OP_OFFER = 0, OP_ANSWER = 1, + OP_REQUEST, + OP_REQ_ANSWER, + OP_PUBLISH, OP_OTHER, }; @@ -562,10 +565,16 @@ int call_get_mono_dialogue(struct call_monologue *dialogue[2], struct call *call const str *totag, const str *viabranch); struct call_monologue *call_get_monologue(struct call *call, const str *fromtag); +struct call_monologue *call_get_or_create_monologue(struct call *call, const str *fromtag); struct call *call_get(const str *callid); int monologue_offer_answer(struct call_monologue *dialogue[2], GQueue *streams, struct sdp_ng_flags *flags); void codecs_offer_answer(struct call_media *media, struct call_media *other_media, struct stream_params *sp, struct sdp_ng_flags *flags); +int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_flags *flags); +int monologue_subscribe_request(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *); +int monologue_subscribe_answer(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *, + GQueue *); +int monologue_unsubscribe(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *); int call_delete_branch(const str *callid, const str *branch, const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay); void call_destroy(struct call *); diff --git a/include/call_interfaces.h b/include/call_interfaces.h index 930fd1ec6..fefc689a7 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -32,11 +32,13 @@ struct sdp_ng_flags { sockaddr_t parsed_received_from; sockaddr_t parsed_media_address; str direction[2]; + str interface; sockfamily_t *address_family; int tos; str record_call_str; str metadata; str label; + str set_label; str address; sockaddr_t xmlrpc_callback; GQueue codec_strip; @@ -110,7 +112,8 @@ struct sdp_ng_flags { loop_protect:1, original_sendrecv:1, single_codec:1, - reuse_codec:1, + reuse_codec:1, + allow_transcoding:1, inject_dtmf:1, t38_decode:1, t38_force:1, @@ -177,6 +180,10 @@ const char *call_stop_media_ng(bencode_item_t *, bencode_item_t *); const char *call_play_dtmf_ng(bencode_item_t *, bencode_item_t *); void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output, struct call_stats *totals); +const char *call_publish_ng(bencode_item_t *, bencode_item_t *); +const char *call_subscribe_request_ng(bencode_item_t *, bencode_item_t *); +const char *call_subscribe_answer_ng(bencode_item_t *, bencode_item_t *); +const char *call_unsubscribe_ng(bencode_item_t *, bencode_item_t *); int call_interfaces_init(void); void call_interfaces_free(void); diff --git a/include/codec.h b/include/codec.h index 1b1505e28..6d7f4bb70 100644 --- a/include/codec.h +++ b/include/codec.h @@ -4,6 +4,7 @@ #include #include +#include #include "str.h" #include "codeclib.h" #include "aux.h" @@ -98,6 +99,7 @@ void codec_store_track(struct codec_store *, GQueue *); void codec_store_transcode(struct codec_store *, GQueue *, struct codec_store *); void codec_store_answer(struct codec_store *dst, struct codec_store *src, struct sdp_ng_flags *flags); void codec_store_synthesise(struct codec_store *dst, struct codec_store *opposite); +bool codec_store_is_full_answer(const struct codec_store *src, const struct codec_store *dst); void codec_add_raw_packet(struct media_packet *mp, unsigned int clockrate); void codec_packet_free(void *); diff --git a/include/control_ng.h b/include/control_ng.h index 0ecf4144d..de3c1ed63 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -31,6 +31,10 @@ enum ng_command { NGC_STATISTICS, NGC_SILENCE_MEDIA, NGC_UNSILENCE_MEDIA, + NGC_PUBLISH, + NGC_SUBSCRIBE_REQ, + NGC_SUBSCRIBE_ANS, + NGC_UNSUBSCRIBE, NGC_COUNT // last, number of elements }; diff --git a/lib/auxlib.h b/lib/auxlib.h index 2bc817b59..6d1b72bec 100644 --- a/lib/auxlib.h +++ b/lib/auxlib.h @@ -301,6 +301,9 @@ INLINE void g_tree_clear(GTree *t) { g_tree_remove(t, k); } } +INLINE void g_string_free_true(GString *s) { + g_string_free(s, TRUE); +} INLINE void __g_string_free(GString **s) { g_string_free(*s, TRUE); } diff --git a/utils/rtpengine-ng-client b/utils/rtpengine-ng-client index 8d3a68b88..8bedb6a18 100755 --- a/utils/rtpengine-ng-client +++ b/utils/rtpengine-ng-client @@ -91,23 +91,27 @@ GetOptions( 'no-jitter-buffer' => \$options{'no jitter buffer'}, 'generate-RTCP' => \$options{'generate RTCP'}, 'single-codec' => \$options{'single codec'}, + 'allow-transcoding' => \$options{'allow transcoding'}, 'reorder-codecs' => \$options{'reorder codecs'}, 'media-echo=s' => \$options{'media echo'}, 'pierce-NAT' => \$options{'pierce NAT'}, 'label=s' => \$options{'label'}, + 'set-label=s' => \$options{'set-label'}, + 'from-label=s' => \$options{'from-label'}, + 'to-label=s' => \$options{'to-label'}, ) or die; my $cmd = shift(@ARGV) or die; my %packet = (command => $cmd); -for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address,file,db-id,code,DTLS-fingerprint,ICE-lite,media echo,label')) { +for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address,file,db-id,code,DTLS-fingerprint,ICE-lite,media echo,label,set-label,from-label,to-label')) { defined($options{$x}) and $packet{$x} = \$options{$x}; } for my $x (split(/,/, 'TOS,delete-delay')) { defined($options{$x}) and $packet{$x} = $options{$x}; } -for my $x (split(/,/, 'trust address,symmetric,asymmetric,unidirectional,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,full rtcp attribute,loop protect,record call,always transcode,all,pad crypto,generate mid,fragment,original sendrecv,symmetric codecs,asymmetric codecs,inject DTMF,generate RTCP,single codec,reorder codecs,pierce NAT,SIP-source-address')) { +for my $x (split(/,/, 'trust address,symmetric,asymmetric,unidirectional,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,full rtcp attribute,loop protect,record call,always transcode,all,pad crypto,generate mid,fragment,original sendrecv,symmetric codecs,asymmetric codecs,inject DTMF,generate RTCP,single codec,reorder codecs,pierce NAT,SIP-source-address,allow transcoding')) { defined($options{$x}) and push(@{$packet{flags}}, $x); } for my $x (split(/,/, 'origin,session connection,sdp version,username,session-name,zero-address')) { @@ -188,9 +192,19 @@ my $resp = $engine->req(\%packet); #print Dumper $resp; #exit; +if (exists($$resp{result}) && $$resp{result} eq 'ok') { + delete $$resp{result}; +} + if (defined($$resp{sdp})) { print("New SDP:\n-----8<-----8<-----8<-----8<-----8<-----\n$$resp{sdp}\n". "----->8----->8----->8----->8----->8-----\n"); + delete $$resp{sdp}; + if (%$resp) { + print("Result dictionary:\n-----8<-----8<-----8<-----8<-----8<-----\n" + . Dumper($resp) + . "----->8----->8----->8----->8----->8-----\n"); + } } else { local $Data::Dumper::Indent = 1;