From 04a83027a057c36ef763966bab9845b6fdb52872 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 14 Sep 2018 14:13:47 -0400 Subject: [PATCH] TT#43557 implement media blocking Change-Id: I336cf7203c1236b3e596310690a89ce1c3fd8bf4 --- README.md | 24 ++++++++ daemon/call.c | 2 +- daemon/call_interfaces.c | 123 ++++++++++++++++++++++++++++++++++++++ daemon/codec.c | 7 +++ daemon/control_ng.c | 8 +++ daemon/media_socket.c | 2 + daemon/redis.c | 7 +++ include/call.h | 10 ++++ include/call_interfaces.h | 3 + include/control_ng.h | 2 + t/transcode-test.c | 42 +++++++++++++ utils/rtpengine-ng-client | 6 +- 12 files changed, 233 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 534834739..3ee6f2641 100644 --- a/README.md +++ b/README.md @@ -1058,6 +1058,8 @@ a string and determines the type of message. Currently the following commands ar * stop recording * block DTMF * unblock DTMF +* block media +* unblock media The response dictionary must contain at least one key called `result`. The value can be either `ok` or `error`. For the `ping` command, the additional value `pong` is allowed. If the result is `error`, then another key @@ -1231,6 +1233,12 @@ Optionally included keys are: This flag should be given as part of the `answer` message. + - `all` + + Only relevant to the `unblock media` message. Instructs *rtpengine* to remove not only a + full-call media block, but also remove directional media blocks that were imposed on + individual participants. + * `replace` Similar to the `flags` list. Controls which parts of the SDP body should be rewritten. @@ -1904,3 +1912,19 @@ events (RFC 4733 type packets) for a call, respectively. When DTMF blocking is enabled for a call, DTMF event packets will not be forwarded to the receiving peer. If DTMF logging is enabled, DTMF events will still be logged to syslog while blocking is enabled. Blocking of DTMF events happens for an entire call and can be enabled and disabled at any time during call runtime. + +`block media` and `unblock media` Messages +------------------------------------------ +Analogous to `block DTMF` and `unblock DTMF` but blocks media packets instead of DTMF packets. DTMF packets +can still pass through when media blocking is enabled. + +Media can be blocked for an entire call if only the `call-id` key is present in the message, or can be blocked +directionally for individual participants. Participants can be selected by their SIP tag if the `from-tag` key +is included in the message, or they can be selected by their SDP media address if the `address` key is included +in the message. In the latter case, the address can be an IPv4 or IPv6 address, and any participant that is +found to have a matching address advertised as their SDP media address will have their originating media +packets blocked (or unblocked). + +Unblocking media for the entire call (i.e. only `call-id` is given) does not automatically unblock media for +participants which had their media blocked directionally, unless the string `all` is included in the `flags` +section of the message. diff --git a/daemon/call.c b/daemon/call.c index 97a0a4ada..45422a5b1 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2334,7 +2334,7 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) { } /* must be called with call->master_lock held in W */ -static void __monologue_unkernelize(struct call_monologue *monologue) { +void __monologue_unkernelize(struct call_monologue *monologue) { GList *l, *m; struct call_media *media; struct packet_stream *stream; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index dec2a3cc6..0c628079c 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -607,6 +607,8 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) { out->media_handover = 1; else if (!str_cmp(s, "reset")) out->reset = 1; + else if (!str_cmp(s, "all")) + out->all = 1; else if (!str_cmp(s, "port-latching")) out->port_latching = 1; else if (!str_cmp(s, "record-call")) @@ -1355,6 +1357,54 @@ const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output return NULL; } +static const char *media_block_match(struct call **call, struct call_monologue **monologue, + bencode_item_t *input) +{ + str callid; + str s; + + *call = NULL; + *monologue = NULL; + + if (!bencode_dictionary_get_str(input, "call-id", &callid)) + return "No call-id in message"; + *call = call_get_opmode(&callid, OP_OTHER); + if (!*call) + return "Unknown call-id"; + + // directional block? + if (bencode_dictionary_get_str(input, "from-tag", &s)) { + *monologue = call_get_mono_dialogue(*call, &s, NULL, NULL); + if (!*monologue) + return "From-tag given, but no such tag exists"; + } + else if (bencode_dictionary_get_str(input, "address", &s)) { + sockaddr_t addr; + if (sockaddr_parse_any_str(&addr, &s)) + return "Failed to parse network address"; + // walk our structures to find a matching stream + for (GList *l = (*call)->monologues.head; l; l = l->next) { + *monologue = l->data; + for (GList *k = (*monologue)->medias.head; k; k = k->next) { + struct call_media *media = k->data; + if (!media->streams.head) + continue; + struct packet_stream *ps = media->streams.head->data; + if (!sockaddr_eq(&addr, &ps->advertised_endpoint.address)) + continue; + ilog(LOG_DEBUG, "Matched address %s to tag '" STR_FORMAT "'", + sockaddr_print_buf(&addr), STR_FMT(&(*monologue)->tag)); + goto found; + } + } + return "Failed to match address to any tag"; +found: + ; + } + + return NULL; +} + const char *call_block_dtmf_ng(bencode_item_t *input, bencode_item_t *output) { str callid; struct call *call; @@ -1393,6 +1443,79 @@ const char *call_unblock_dtmf_ng(bencode_item_t *input, bencode_item_t *output) return NULL; } +const char *call_block_media_ng(bencode_item_t *input, bencode_item_t *output) { + struct call *call; + struct call_monologue *monologue; + const char *errstr = NULL; + + errstr = media_block_match(&call, &monologue, input); + if (errstr) + goto out; + + if (monologue) { + ilog(LOG_INFO, "Blocking directional media (tag '" STR_FORMAT ")", + STR_FMT(&monologue->tag)); + monologue->block_media = 1; + __monologue_unkernelize(monologue); + } + else { + ilog(LOG_INFO, "Blocking media (entire call)"); + call->block_media = 1; + __call_unkernelize(call); + } + + errstr = NULL; +out: + if (call) { + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + return errstr; +} + +const char *call_unblock_media_ng(bencode_item_t *input, bencode_item_t *output) { + struct call *call; + struct call_monologue *monologue; + const char *errstr = NULL; + struct sdp_ng_flags flags; + + errstr = media_block_match(&call, &monologue, input); + if (errstr) + goto out; + + call_ng_process_flags(&flags, input); + + if (monologue) { + ilog(LOG_INFO, "Unblocking directional media (tag '" STR_FORMAT ")", + STR_FMT(&monologue->tag)); + monologue->block_media = 0; + __monologue_unkernelize(monologue); + } + else { + ilog(LOG_INFO, "Unblocking media (entire call)"); + call->block_media = 0; + if (flags.all) { + for (GList *l = call->monologues.head; l; l = l->next) { + monologue = l->data; + monologue->block_media = 0; + } + } + __call_unkernelize(call); + } + + errstr = NULL; +out: + if (call) { + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + + + return NULL; +} + int call_interfaces_init() { const char *errptr; diff --git a/daemon/codec.c b/daemon/codec.c index 53d756001..36c3fbf41 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -516,6 +516,9 @@ void codec_add_raw_packet(struct media_packet *mp) { g_queue_push_tail(&mp->packets_out, p); } static int handler_func_passthrough(struct codec_handler *h, struct media_packet *mp) { + if (mp->call->block_media || mp->media->monologue->block_media) + return 0; + codec_add_raw_packet(mp); return 0; } @@ -806,6 +809,8 @@ struct rtp_payload_type *codec_make_payload_type(const str *codec_str, struct ca static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_packet *mp) { if (G_UNLIKELY(!mp->rtp)) return handler_func_passthrough(h, mp); + if (mp->call->block_media || mp->media->monologue->block_media) + return 0; // substitute out SSRC mp->rtp->ssrc = htonl(mp->ssrc_in->ssrc_map_out); @@ -978,6 +983,8 @@ static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet static int handler_func_transcode(struct codec_handler *h, struct media_packet *mp) { if (G_UNLIKELY(!mp->rtp)) return handler_func_passthrough(h, mp); + if (mp->call->block_media || mp->media->monologue->block_media) + return 0; assert((mp->rtp->m_pt & 0x7f) == h->source_pt.payload_type); diff --git a/daemon/control_ng.c b/daemon/control_ng.c index cd5607a1c..8a279afa8 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -224,6 +224,14 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin errstr = call_unblock_dtmf_ng(dict, resp); g_atomic_int_inc(&cur->unblock_dtmf); } + else if (!str_cmp(&cmd, "block media")) { + errstr = call_block_media_ng(dict, resp); + g_atomic_int_inc(&cur->block_media); + } + else if (!str_cmp(&cmd, "unblock media")) { + errstr = call_unblock_media_ng(dict, resp); + g_atomic_int_inc(&cur->unblock_media); + } else { errstr = "Unrecognized command"; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index c7d548c3b..1d45a8890 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -979,6 +979,8 @@ void kernelize(struct packet_stream *stream) { goto no_kernel; if (!stream->selected_sfd) goto no_kernel; + if (stream->media->monologue->block_media || call->block_media) + goto no_kernel; ilog(LOG_INFO, "Kernelizing media stream: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port); diff --git a/daemon/redis.c b/daemon/redis.c index 5f28d9c9f..2b2fc72bd 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1199,6 +1199,7 @@ static int redis_streams(struct call *c, struct redis_list *streams) { static int redis_tags(struct call *c, struct redis_list *tags) { unsigned int i; + int ii; struct redis_hash *rh; struct call_monologue *ml; str s; @@ -1219,6 +1220,8 @@ static int redis_tags(struct call *c, struct redis_list *tags) { if (!redis_hash_get_str(&s, rh, "label")) call_str_cpy(c, &ml->label, &s); redis_hash_get_time_t(&ml->deleted, rh, "deleted"); + if (!redis_hash_get_int(&ii, rh, "block_media")) + ml->block_media = ii ? 1 : 0; tags->ptrs[i] = ml; } @@ -1581,6 +1584,8 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type sockaddr_parse_any_str(&c->created_from_addr, &id); if (!redis_hash_get_int(&i, &call, "block_dtmf")) c->block_dtmf = i ? 1 : 0; + if (!redis_hash_get_int(&i, &call, "block_media")) + c->block_media = i ? 1 : 0; err = "missing 'redis_hosted_db' value"; if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) @@ -1862,6 +1867,7 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db); JSON_SET_SIMPLE_STR("recording_metadata",&c->metadata); JSON_SET_SIMPLE("block_dtmf","%i",c->block_dtmf ? 1 : 0); + JSON_SET_SIMPLE("block_media","%i",c->block_media ? 1 : 0); if ((rec = c->recording)) { JSON_SET_SIMPLE_CSTR("recording_meta_prefix",rec->meta_prefix); @@ -1959,6 +1965,7 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE("created","%llu",(long long unsigned) ml->created); JSON_SET_SIMPLE("active","%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1); JSON_SET_SIMPLE("deleted","%llu",(long long unsigned) ml->deleted); + JSON_SET_SIMPLE("block_media","%i",ml->block_media ? 1 : 0); if (ml->tag.s) JSON_SET_SIMPLE_STR("tag",&ml->tag); diff --git a/include/call.h b/include/call.h index c646277c1..5bef7ccfc 100644 --- a/include/call.h +++ b/include/call.h @@ -364,6 +364,8 @@ struct call_monologue { GHashTable *other_tags; struct call_monologue *active_dialogue; GQueue medias; + + int block_media:1; }; struct call { @@ -401,6 +403,7 @@ struct call { str metadata; int block_dtmf:1; + int block_media:1; }; @@ -434,6 +437,7 @@ struct call_media *call_media_new(struct call *call); enum call_stream_state call_stream_state_machine(struct packet_stream *); void call_media_state_machine(struct call_media *m); void call_media_unkernelize(struct call_media *media); +void __monologue_unkernelize(struct call_monologue *monologue); int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address_format format, int *len, const struct local_intf *ifa, int keep_unspec); @@ -505,5 +509,11 @@ INLINE struct packet_stream *packet_stream_sink(struct packet_stream *ps) { ret = ps->rtcp_sink; return ret; } +INLINE void __call_unkernelize(struct call *call) { + for (GList *l = call->monologues.head; l; l = l->next) { + struct call_monologue *ml = l->data; + __monologue_unkernelize(ml); + } +} #endif diff --git a/include/call_interfaces.h b/include/call_interfaces.h index d735710b3..4667bb954 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -57,6 +57,7 @@ struct sdp_ng_flags { media_handover:1, dtls_passive:1, reset:1, + all:1, record_call:1, loop_protect:1, always_transcode:1, @@ -96,6 +97,8 @@ const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *); const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *); const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *); +const char *call_block_media_ng(bencode_item_t *, bencode_item_t *); +const char *call_unblock_media_ng(bencode_item_t *, bencode_item_t *); void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output, struct call_stats *totals); diff --git a/include/control_ng.h b/include/control_ng.h index 0d2bec3a8..afe409640 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -21,6 +21,8 @@ struct control_ng_stats { int stop_recording; int block_dtmf; int unblock_dtmf; + int block_media; + int unblock_media; int errors; }; diff --git a/t/transcode-test.c b/t/transcode-test.c index fcb950034..6ac72ed27 100644 --- a/t/transcode-test.c +++ b/t/transcode-test.c @@ -858,6 +858,20 @@ int main() { dtmf(""); // final audio RTP test packet_seq_exp(A, 8, PCMA_payload, 1000960, 212, 8, PCMA_payload, 5); // expected seq is 207+5 for PT 8 + packet_seq_exp(A, 8, PCMA_payload, 1001120, 213, 8, PCMA_payload, 1); + // media blocking + ml_A.block_media = 1; + packet_seq_exp(A, 8, PCMA_payload, 1001280, 214, -1, "", 0); + packet_seq_exp(A, 8, PCMA_payload, 1001440, 215, -1, "", 0); + ml_A.block_media = 0; + packet_seq_exp(A, 8, PCMA_payload, 1001600, 216, 8, PCMA_payload, 3); + call.block_media = 1; + packet_seq_exp(A, 8, PCMA_payload, 1001760, 217, -1, "", 0); + packet_seq_exp(A, 8, PCMA_payload, 1001920, 218, -1, "", 0); + call.block_media = 0; + packet_seq_exp(A, 8, PCMA_payload, 1002080, 219, 8, PCMA_payload, 3); + ml_B.block_media = 1; + packet_seq_exp(A, 8, PCMA_payload, 1002240, 220, 8, PCMA_payload, 1); end(); // DTMF passthrough w/ transcoding - blocking @@ -913,6 +927,20 @@ int main() { dtmf(""); // final audio RTP test packet_seq_exp(A, 8, PCMA_payload, 1000960, 212, 0, PCMU_payload, 1); // expected seq is 207+1 for PT 8 + packet_seq_exp(A, 8, PCMA_payload, 1001120, 213, 0, PCMU_payload, 1); + // media blocking + ml_A.block_media = 1; + packet_seq_exp(A, 8, PCMA_payload, 1001280, 214, -1, "", 0); + packet_seq_exp(A, 8, PCMA_payload, 1001440, 215, -1, "", 0); + ml_A.block_media = 0; + packet_seq_exp(A, 8, PCMA_payload, 1001600, 214, 0, PCMU_payload, 1); // cheat with the seq here - 216 would get held by the jitter buffer + call.block_media = 1; + packet_seq_exp(A, 8, PCMA_payload, 1001760, 215, -1, "", 0); + packet_seq_exp(A, 8, PCMA_payload, 1001920, 216, -1, "", 0); + call.block_media = 0; + packet_seq_exp(A, 8, PCMA_payload, 1002080, 215, 0, PCMU_payload, 1); + ml_B.block_media = 1; + packet_seq_exp(A, 8, PCMA_payload, 1002240, 216, 0, PCMU_payload, 1); end(); // plain DTMF passthrough w/o transcoding w/ implicit primary payload type - blocking @@ -967,6 +995,20 @@ int main() { dtmf(""); // final audio RTP test packet_seq_exp(A, 0, PCMU_payload, 1000960, 212, 0, PCMU_payload, 5); // expected seq is 207+5 for PT 8 + packet_seq_exp(A, 0, PCMU_payload, 1001120, 213, 0, PCMU_payload, 1); + // media blocking + ml_A.block_media = 1; + packet_seq_exp(A, 0, PCMU_payload, 1001280, 214, -1, "", 0); + packet_seq_exp(A, 0, PCMU_payload, 1001440, 215, -1, "", 0); + ml_A.block_media = 0; + packet_seq_exp(A, 0, PCMU_payload, 1001600, 216, 0, PCMU_payload, 3); + call.block_media = 1; + packet_seq_exp(A, 0, PCMU_payload, 1001760, 217, -1, "", 0); + packet_seq_exp(A, 0, PCMU_payload, 1001920, 218, -1, "", 0); + call.block_media = 0; + packet_seq_exp(A, 0, PCMU_payload, 1002080, 219, 0, PCMU_payload, 3); + ml_B.block_media = 1; + packet_seq_exp(A, 0, PCMU_payload, 1002240, 220, 0, PCMU_payload, 1); end(); return 0; diff --git a/utils/rtpengine-ng-client b/utils/rtpengine-ng-client index 6fe0d8d6a..b1a61ab45 100755 --- a/utils/rtpengine-ng-client +++ b/utils/rtpengine-ng-client @@ -58,19 +58,21 @@ GetOptions( 'xmlrpc-callback=s' => \$options{'xmlrpc-callback'}, 'always-transcode' => \$options{'always transcode'}, 'metadata=s' => \$options{'metadata'}, + 'all' => \$options{'all'}, + 'address=s' => \$options{'address'}, ) or die; my $cmd = shift(@ARGV) or die; my %packet = (command => $cmd); -for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata')) { +for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address')) { defined($options{$x}) and $packet{$x} = \$options{$x}; } for my $x (split(/,/, 'TOS,delete-delay')) { defined($options{$x}) and $packet{$x} = $options{$x}; } -for my $x (split(/,/, 'trust address,symmetric,asymmetric,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,loop protect,record call,always transcode')) { +for my $x (split(/,/, 'trust address,symmetric,asymmetric,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,loop protect,record call,always transcode,all')) { defined($options{$x}) and push(@{$packet{flags}}, $x); } for my $x (split(/,/, 'origin,session connection')) {