Browse Source

TT#43557 implement media blocking

Change-Id: I336cf7203c1236b3e596310690a89ce1c3fd8bf4
changes/40/23540/8
Richard Fuchs 7 years ago
parent
commit
04a83027a0
12 changed files with 233 additions and 3 deletions
  1. +24
    -0
      README.md
  2. +1
    -1
      daemon/call.c
  3. +123
    -0
      daemon/call_interfaces.c
  4. +7
    -0
      daemon/codec.c
  5. +8
    -0
      daemon/control_ng.c
  6. +2
    -0
      daemon/media_socket.c
  7. +7
    -0
      daemon/redis.c
  8. +10
    -0
      include/call.h
  9. +3
    -0
      include/call_interfaces.h
  10. +2
    -0
      include/control_ng.h
  11. +42
    -0
      t/transcode-test.c
  12. +4
    -2
      utils/rtpengine-ng-client

+ 24
- 0
README.md View File

@ -1058,6 +1058,8 @@ a string and determines the type of message. Currently the following commands ar
* stop recording
* block DTMF
* unblock DTMF
* block media
* unblock media
The response dictionary must contain at least one key called `result`. The value can be either `ok` or `error`.
For the `ping` command, the additional value `pong` is allowed. If the result is `error`, then another key
@ -1231,6 +1233,12 @@ Optionally included keys are:
This flag should be given as part of the `answer` message.
- `all`
Only relevant to the `unblock media` message. Instructs *rtpengine* to remove not only a
full-call media block, but also remove directional media blocks that were imposed on
individual participants.
* `replace`
Similar to the `flags` list. Controls which parts of the SDP body should be rewritten.
@ -1904,3 +1912,19 @@ events (RFC 4733 type packets) for a call, respectively.
When DTMF blocking is enabled for a call, DTMF event packets will not be forwarded to the receiving peer.
If DTMF logging is enabled, DTMF events will still be logged to syslog while blocking is enabled. Blocking
of DTMF events happens for an entire call and can be enabled and disabled at any time during call runtime.
`block media` and `unblock media` Messages
------------------------------------------
Analogous to `block DTMF` and `unblock DTMF` but blocks media packets instead of DTMF packets. DTMF packets
can still pass through when media blocking is enabled.
Media can be blocked for an entire call if only the `call-id` key is present in the message, or can be blocked
directionally for individual participants. Participants can be selected by their SIP tag if the `from-tag` key
is included in the message, or they can be selected by their SDP media address if the `address` key is included
in the message. In the latter case, the address can be an IPv4 or IPv6 address, and any participant that is
found to have a matching address advertised as their SDP media address will have their originating media
packets blocked (or unblocked).
Unblocking media for the entire call (i.e. only `call-id` is given) does not automatically unblock media for
participants which had their media blocked directionally, unless the string `all` is included in the `flags`
section of the message.

+ 1
- 1
daemon/call.c View File

@ -2334,7 +2334,7 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) {
}
/* must be called with call->master_lock held in W */
static void __monologue_unkernelize(struct call_monologue *monologue) {
void __monologue_unkernelize(struct call_monologue *monologue) {
GList *l, *m;
struct call_media *media;
struct packet_stream *stream;


+ 123
- 0
daemon/call_interfaces.c View File

@ -607,6 +607,8 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) {
out->media_handover = 1;
else if (!str_cmp(s, "reset"))
out->reset = 1;
else if (!str_cmp(s, "all"))
out->all = 1;
else if (!str_cmp(s, "port-latching"))
out->port_latching = 1;
else if (!str_cmp(s, "record-call"))
@ -1355,6 +1357,54 @@ const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output
return NULL;
}
static const char *media_block_match(struct call **call, struct call_monologue **monologue,
bencode_item_t *input)
{
str callid;
str s;
*call = NULL;
*monologue = NULL;
if (!bencode_dictionary_get_str(input, "call-id", &callid))
return "No call-id in message";
*call = call_get_opmode(&callid, OP_OTHER);
if (!*call)
return "Unknown call-id";
// directional block?
if (bencode_dictionary_get_str(input, "from-tag", &s)) {
*monologue = call_get_mono_dialogue(*call, &s, NULL, NULL);
if (!*monologue)
return "From-tag given, but no such tag exists";
}
else if (bencode_dictionary_get_str(input, "address", &s)) {
sockaddr_t addr;
if (sockaddr_parse_any_str(&addr, &s))
return "Failed to parse network address";
// walk our structures to find a matching stream
for (GList *l = (*call)->monologues.head; l; l = l->next) {
*monologue = l->data;
for (GList *k = (*monologue)->medias.head; k; k = k->next) {
struct call_media *media = k->data;
if (!media->streams.head)
continue;
struct packet_stream *ps = media->streams.head->data;
if (!sockaddr_eq(&addr, &ps->advertised_endpoint.address))
continue;
ilog(LOG_DEBUG, "Matched address %s to tag '" STR_FORMAT "'",
sockaddr_print_buf(&addr), STR_FMT(&(*monologue)->tag));
goto found;
}
}
return "Failed to match address to any tag";
found:
;
}
return NULL;
}
const char *call_block_dtmf_ng(bencode_item_t *input, bencode_item_t *output) {
str callid;
struct call *call;
@ -1393,6 +1443,79 @@ const char *call_unblock_dtmf_ng(bencode_item_t *input, bencode_item_t *output)
return NULL;
}
const char *call_block_media_ng(bencode_item_t *input, bencode_item_t *output) {
struct call *call;
struct call_monologue *monologue;
const char *errstr = NULL;
errstr = media_block_match(&call, &monologue, input);
if (errstr)
goto out;
if (monologue) {
ilog(LOG_INFO, "Blocking directional media (tag '" STR_FORMAT ")",
STR_FMT(&monologue->tag));
monologue->block_media = 1;
__monologue_unkernelize(monologue);
}
else {
ilog(LOG_INFO, "Blocking media (entire call)");
call->block_media = 1;
__call_unkernelize(call);
}
errstr = NULL;
out:
if (call) {
rwlock_unlock_w(&call->master_lock);
obj_put(call);
}
return errstr;
}
const char *call_unblock_media_ng(bencode_item_t *input, bencode_item_t *output) {
struct call *call;
struct call_monologue *monologue;
const char *errstr = NULL;
struct sdp_ng_flags flags;
errstr = media_block_match(&call, &monologue, input);
if (errstr)
goto out;
call_ng_process_flags(&flags, input);
if (monologue) {
ilog(LOG_INFO, "Unblocking directional media (tag '" STR_FORMAT ")",
STR_FMT(&monologue->tag));
monologue->block_media = 0;
__monologue_unkernelize(monologue);
}
else {
ilog(LOG_INFO, "Unblocking media (entire call)");
call->block_media = 0;
if (flags.all) {
for (GList *l = call->monologues.head; l; l = l->next) {
monologue = l->data;
monologue->block_media = 0;
}
}
__call_unkernelize(call);
}
errstr = NULL;
out:
if (call) {
rwlock_unlock_w(&call->master_lock);
obj_put(call);
}
return NULL;
}
int call_interfaces_init() {
const char *errptr;


+ 7
- 0
daemon/codec.c View File

@ -516,6 +516,9 @@ void codec_add_raw_packet(struct media_packet *mp) {
g_queue_push_tail(&mp->packets_out, p);
}
static int handler_func_passthrough(struct codec_handler *h, struct media_packet *mp) {
if (mp->call->block_media || mp->media->monologue->block_media)
return 0;
codec_add_raw_packet(mp);
return 0;
}
@ -806,6 +809,8 @@ struct rtp_payload_type *codec_make_payload_type(const str *codec_str, struct ca
static int handler_func_passthrough_ssrc(struct codec_handler *h, struct media_packet *mp) {
if (G_UNLIKELY(!mp->rtp))
return handler_func_passthrough(h, mp);
if (mp->call->block_media || mp->media->monologue->block_media)
return 0;
// substitute out SSRC
mp->rtp->ssrc = htonl(mp->ssrc_in->ssrc_map_out);
@ -978,6 +983,8 @@ static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet
static int handler_func_transcode(struct codec_handler *h, struct media_packet *mp) {
if (G_UNLIKELY(!mp->rtp))
return handler_func_passthrough(h, mp);
if (mp->call->block_media || mp->media->monologue->block_media)
return 0;
assert((mp->rtp->m_pt & 0x7f) == h->source_pt.payload_type);


+ 8
- 0
daemon/control_ng.c View File

@ -224,6 +224,14 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
errstr = call_unblock_dtmf_ng(dict, resp);
g_atomic_int_inc(&cur->unblock_dtmf);
}
else if (!str_cmp(&cmd, "block media")) {
errstr = call_block_media_ng(dict, resp);
g_atomic_int_inc(&cur->block_media);
}
else if (!str_cmp(&cmd, "unblock media")) {
errstr = call_unblock_media_ng(dict, resp);
g_atomic_int_inc(&cur->unblock_media);
}
else
{
errstr = "Unrecognized command";


+ 2
- 0
daemon/media_socket.c View File

@ -979,6 +979,8 @@ void kernelize(struct packet_stream *stream) {
goto no_kernel;
if (!stream->selected_sfd)
goto no_kernel;
if (stream->media->monologue->block_media || call->block_media)
goto no_kernel;
ilog(LOG_INFO, "Kernelizing media stream: %s:%d", sockaddr_print_buf(&stream->endpoint.address), stream->endpoint.port);


+ 7
- 0
daemon/redis.c View File

@ -1199,6 +1199,7 @@ static int redis_streams(struct call *c, struct redis_list *streams) {
static int redis_tags(struct call *c, struct redis_list *tags) {
unsigned int i;
int ii;
struct redis_hash *rh;
struct call_monologue *ml;
str s;
@ -1219,6 +1220,8 @@ static int redis_tags(struct call *c, struct redis_list *tags) {
if (!redis_hash_get_str(&s, rh, "label"))
call_str_cpy(c, &ml->label, &s);
redis_hash_get_time_t(&ml->deleted, rh, "deleted");
if (!redis_hash_get_int(&ii, rh, "block_media"))
ml->block_media = ii ? 1 : 0;
tags->ptrs[i] = ml;
}
@ -1581,6 +1584,8 @@ static void json_restore_call(struct redis *r, const str *callid, enum call_type
sockaddr_parse_any_str(&c->created_from_addr, &id);
if (!redis_hash_get_int(&i, &call, "block_dtmf"))
c->block_dtmf = i ? 1 : 0;
if (!redis_hash_get_int(&i, &call, "block_media"))
c->block_media = i ? 1 : 0;
err = "missing 'redis_hosted_db' value";
if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db"))
@ -1862,6 +1867,7 @@ char* redis_encode_json(struct call *c) {
JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db);
JSON_SET_SIMPLE_STR("recording_metadata",&c->metadata);
JSON_SET_SIMPLE("block_dtmf","%i",c->block_dtmf ? 1 : 0);
JSON_SET_SIMPLE("block_media","%i",c->block_media ? 1 : 0);
if ((rec = c->recording)) {
JSON_SET_SIMPLE_CSTR("recording_meta_prefix",rec->meta_prefix);
@ -1959,6 +1965,7 @@ char* redis_encode_json(struct call *c) {
JSON_SET_SIMPLE("created","%llu",(long long unsigned) ml->created);
JSON_SET_SIMPLE("active","%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1);
JSON_SET_SIMPLE("deleted","%llu",(long long unsigned) ml->deleted);
JSON_SET_SIMPLE("block_media","%i",ml->block_media ? 1 : 0);
if (ml->tag.s)
JSON_SET_SIMPLE_STR("tag",&ml->tag);


+ 10
- 0
include/call.h View File

@ -364,6 +364,8 @@ struct call_monologue {
GHashTable *other_tags;
struct call_monologue *active_dialogue;
GQueue medias;
int block_media:1;
};
struct call {
@ -401,6 +403,7 @@ struct call {
str metadata;
int block_dtmf:1;
int block_media:1;
};
@ -434,6 +437,7 @@ struct call_media *call_media_new(struct call *call);
enum call_stream_state call_stream_state_machine(struct packet_stream *);
void call_media_state_machine(struct call_media *m);
void call_media_unkernelize(struct call_media *media);
void __monologue_unkernelize(struct call_monologue *monologue);
int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address_format format,
int *len, const struct local_intf *ifa, int keep_unspec);
@ -505,5 +509,11 @@ INLINE struct packet_stream *packet_stream_sink(struct packet_stream *ps) {
ret = ps->rtcp_sink;
return ret;
}
INLINE void __call_unkernelize(struct call *call) {
for (GList *l = call->monologues.head; l; l = l->next) {
struct call_monologue *ml = l->data;
__monologue_unkernelize(ml);
}
}
#endif

+ 3
- 0
include/call_interfaces.h View File

@ -57,6 +57,7 @@ struct sdp_ng_flags {
media_handover:1,
dtls_passive:1,
reset:1,
all:1,
record_call:1,
loop_protect:1,
always_transcode:1,
@ -96,6 +97,8 @@ const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *);
const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *);
const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *);
const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *);
const char *call_block_media_ng(bencode_item_t *, bencode_item_t *);
const char *call_unblock_media_ng(bencode_item_t *, bencode_item_t *);
void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output,
struct call_stats *totals);


+ 2
- 0
include/control_ng.h View File

@ -21,6 +21,8 @@ struct control_ng_stats {
int stop_recording;
int block_dtmf;
int unblock_dtmf;
int block_media;
int unblock_media;
int errors;
};


+ 42
- 0
t/transcode-test.c View File

@ -858,6 +858,20 @@ int main() {
dtmf("");
// final audio RTP test
packet_seq_exp(A, 8, PCMA_payload, 1000960, 212, 8, PCMA_payload, 5); // expected seq is 207+5 for PT 8
packet_seq_exp(A, 8, PCMA_payload, 1001120, 213, 8, PCMA_payload, 1);
// media blocking
ml_A.block_media = 1;
packet_seq_exp(A, 8, PCMA_payload, 1001280, 214, -1, "", 0);
packet_seq_exp(A, 8, PCMA_payload, 1001440, 215, -1, "", 0);
ml_A.block_media = 0;
packet_seq_exp(A, 8, PCMA_payload, 1001600, 216, 8, PCMA_payload, 3);
call.block_media = 1;
packet_seq_exp(A, 8, PCMA_payload, 1001760, 217, -1, "", 0);
packet_seq_exp(A, 8, PCMA_payload, 1001920, 218, -1, "", 0);
call.block_media = 0;
packet_seq_exp(A, 8, PCMA_payload, 1002080, 219, 8, PCMA_payload, 3);
ml_B.block_media = 1;
packet_seq_exp(A, 8, PCMA_payload, 1002240, 220, 8, PCMA_payload, 1);
end();
// DTMF passthrough w/ transcoding - blocking
@ -913,6 +927,20 @@ int main() {
dtmf("");
// final audio RTP test
packet_seq_exp(A, 8, PCMA_payload, 1000960, 212, 0, PCMU_payload, 1); // expected seq is 207+1 for PT 8
packet_seq_exp(A, 8, PCMA_payload, 1001120, 213, 0, PCMU_payload, 1);
// media blocking
ml_A.block_media = 1;
packet_seq_exp(A, 8, PCMA_payload, 1001280, 214, -1, "", 0);
packet_seq_exp(A, 8, PCMA_payload, 1001440, 215, -1, "", 0);
ml_A.block_media = 0;
packet_seq_exp(A, 8, PCMA_payload, 1001600, 214, 0, PCMU_payload, 1); // cheat with the seq here - 216 would get held by the jitter buffer
call.block_media = 1;
packet_seq_exp(A, 8, PCMA_payload, 1001760, 215, -1, "", 0);
packet_seq_exp(A, 8, PCMA_payload, 1001920, 216, -1, "", 0);
call.block_media = 0;
packet_seq_exp(A, 8, PCMA_payload, 1002080, 215, 0, PCMU_payload, 1);
ml_B.block_media = 1;
packet_seq_exp(A, 8, PCMA_payload, 1002240, 216, 0, PCMU_payload, 1);
end();
// plain DTMF passthrough w/o transcoding w/ implicit primary payload type - blocking
@ -967,6 +995,20 @@ int main() {
dtmf("");
// final audio RTP test
packet_seq_exp(A, 0, PCMU_payload, 1000960, 212, 0, PCMU_payload, 5); // expected seq is 207+5 for PT 8
packet_seq_exp(A, 0, PCMU_payload, 1001120, 213, 0, PCMU_payload, 1);
// media blocking
ml_A.block_media = 1;
packet_seq_exp(A, 0, PCMU_payload, 1001280, 214, -1, "", 0);
packet_seq_exp(A, 0, PCMU_payload, 1001440, 215, -1, "", 0);
ml_A.block_media = 0;
packet_seq_exp(A, 0, PCMU_payload, 1001600, 216, 0, PCMU_payload, 3);
call.block_media = 1;
packet_seq_exp(A, 0, PCMU_payload, 1001760, 217, -1, "", 0);
packet_seq_exp(A, 0, PCMU_payload, 1001920, 218, -1, "", 0);
call.block_media = 0;
packet_seq_exp(A, 0, PCMU_payload, 1002080, 219, 0, PCMU_payload, 3);
ml_B.block_media = 1;
packet_seq_exp(A, 0, PCMU_payload, 1002240, 220, 0, PCMU_payload, 1);
end();
return 0;


+ 4
- 2
utils/rtpengine-ng-client View File

@ -58,19 +58,21 @@ GetOptions(
'xmlrpc-callback=s' => \$options{'xmlrpc-callback'},
'always-transcode' => \$options{'always transcode'},
'metadata=s' => \$options{'metadata'},
'all' => \$options{'all'},
'address=s' => \$options{'address'},
) or die;
my $cmd = shift(@ARGV) or die;
my %packet = (command => $cmd);
for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata')) {
for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address')) {
defined($options{$x}) and $packet{$x} = \$options{$x};
}
for my $x (split(/,/, 'TOS,delete-delay')) {
defined($options{$x}) and $packet{$x} = $options{$x};
}
for my $x (split(/,/, 'trust address,symmetric,asymmetric,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,loop protect,record call,always transcode')) {
for my $x (split(/,/, 'trust address,symmetric,asymmetric,force,strict source,media handover,sip source address,reset,port latching,no rtcp attribute,loop protect,record call,always transcode,all')) {
defined($options{$x}) and push(@{$packet{flags}}, $x);
}
for my $x (split(/,/, 'origin,session connection')) {


Loading…
Cancel
Save