Browse Source

TT#52651 add start/stop forwarding commands and party selection logic

Change-Id: I8ef7e288d3a3e485bd2fa14e1a2407a0c8d94bac
changes/72/27372/14
Richard Fuchs 7 years ago
parent
commit
cb2dbd2a92
14 changed files with 183 additions and 19 deletions
  1. +79
    -1
      daemon/call_interfaces.c
  2. +1
    -1
      daemon/codec.c
  3. +8
    -0
      daemon/control_ng.c
  4. +44
    -8
      daemon/recording.c
  5. +3
    -0
      include/call.h
  6. +2
    -0
      include/call_interfaces.h
  7. +2
    -0
      include/control_ng.h
  8. +19
    -2
      recording-daemon/decoder.c
  9. +4
    -2
      recording-daemon/metafile.c
  10. +12
    -4
      recording-daemon/packet.c
  11. +6
    -0
      recording-daemon/stream.c
  12. +1
    -0
      recording-daemon/stream.h
  13. +1
    -0
      recording-daemon/types.h
  14. +1
    -1
      t/amr-decode-test.c

+ 79
- 1
daemon/call_interfaces.c View File

@ -1004,8 +1004,10 @@ static const char *call_offer_answer_ng(bencode_item_t *input,
bencode_buffer_destroy_add(output->buffer, (free_func_t) sdp_chopper_destroy, chopper);
detect_setup_recording(call, &flags.record_call_str, &flags.metadata);
if (flags.record_call)
if (flags.record_call) {
call->recording_on = 1;
recording_start(call, NULL, &flags.metadata);
}
ret = monologue_offer_answer(monologue, &streams, &flags);
if (!ret) {
@ -1438,6 +1440,7 @@ const char *call_start_recording_ng(bencode_item_t *input, bencode_item_t *outpu
if (!call)
return "Unknown call-id";
call->recording_on = 1;
recording_start(call, NULL, &metadata);
rwlock_unlock_w(&call->master_lock);
@ -1456,6 +1459,7 @@ const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output
if (!call)
return "Unknown call-id";
call->recording_on = 0;
recording_stop(call);
rwlock_unlock_w(&call->master_lock);
@ -1512,6 +1516,80 @@ found:
return NULL;
}
// XXX these are all identical - unify and use a flags int and/or callback
const char *call_start_forwarding_ng(bencode_item_t *input, bencode_item_t *output) {
struct call *call;
struct call_monologue *monologue;
const char *errstr = NULL;
str metadata;
errstr = media_block_match(&call, &monologue, input);
if (errstr)
goto out;
bencode_dictionary_get_str(input, "metadata", &metadata);
if (monologue) {
ilog(LOG_INFO, "Start forwarding for single party (tag '" STR_FORMAT ")",
STR_FMT(&monologue->tag));
monologue->rec_forwarding = 1;
}
else {
ilog(LOG_INFO, "Start forwarding (entire call)");
call->rec_forwarding = 1;
}
recording_start(call, NULL, &metadata);
errstr = NULL;
out:
if (call) {
rwlock_unlock_w(&call->master_lock);
obj_put(call);
}
return errstr;
}
const char *call_stop_forwarding_ng(bencode_item_t *input, bencode_item_t *output) {
struct call *call;
struct call_monologue *monologue;
const char *errstr = NULL;
struct sdp_ng_flags flags;
errstr = media_block_match(&call, &monologue, input);
if (errstr)
goto out;
call_ng_process_flags(&flags, input, OP_OTHER);
if (monologue) {
ilog(LOG_INFO, "Stop forwarding for single party (tag '" STR_FORMAT ")",
STR_FMT(&monologue->tag));
monologue->rec_forwarding = 0;
}
else {
ilog(LOG_INFO, "Stop forwarding (entire call)");
call->rec_forwarding = 0;
if (flags.all) {
for (GList *l = call->monologues.head; l; l = l->next) {
monologue = l->data;
monologue->rec_forwarding = 0;
}
}
}
recording_stop(call);
errstr = NULL;
out:
if (call) {
rwlock_unlock_w(&call->master_lock);
obj_put(call);
}
return NULL;
}
const char *call_block_dtmf_ng(bencode_item_t *input, bencode_item_t *output) {
struct call *call;
struct call_monologue *monologue;


+ 1
- 1
daemon/codec.c View File

@ -975,7 +975,7 @@ static int __packet_encoded(encoder_t *enc, void *u1, void *u2) {
return 0;
}
static int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *u2, void *u3) {
static int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *u2) {
struct codec_ssrc_handler *ch = u1;
ilog(LOG_DEBUG, "RTP media successfully decoded: TS %llu, samples %u",


+ 8
- 0
daemon/control_ng.c View File

@ -219,6 +219,14 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
errstr = call_stop_recording_ng(dict, resp);
g_atomic_int_inc(&cur->stop_recording);
break;
case CSH_LOOKUP("start forwarding"):
errstr = call_start_forwarding_ng(dict, resp);
g_atomic_int_inc(&cur->start_forwarding);
break;
case CSH_LOOKUP("stop forwarding"):
errstr = call_stop_forwarding_ng(dict, resp);
g_atomic_int_inc(&cur->stop_forwarding);
break;
case CSH_LOOKUP("block DTMF"):
errstr = call_block_dtmf_ng(dict, resp);
g_atomic_int_inc(&cur->block_dtmf);


+ 44
- 8
daemon/recording.c View File

@ -35,6 +35,9 @@ struct pcap_format {
static int check_main_spool_dir(const char *spoolpath);
static char *recording_setup_file(struct recording *recording);
static char *meta_setup_file(struct recording *recording);
static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen,
const char *label_fmt, ...)
__attribute__((format(printf,4,5)));
// pcap methods
static int pcap_create_spool_dir(const char *dirpath);
@ -58,6 +61,9 @@ static void kernel_info_proc(struct packet_stream *, struct rtpengine_target_inf
static void pcap_eth_header(unsigned char *, struct packet_stream *);
#define append_meta_chunk_str(r, str, f...) append_meta_chunk(r, (str)->s, (str)->len, f)
#define append_meta_chunk_s(r, str, f...) append_meta_chunk(r, (str), strlen(str), f)
#define append_meta_chunk_null(r,f...) append_meta_chunk(r, "", 0, f)
static const struct recording_method methods[] = {
@ -224,12 +230,26 @@ static void update_metadata(struct call *call, str *metadata) {
}
}
// lock must be held
static void recording_update_flags(struct call *call) {
append_meta_chunk_null(call->recording, "RECORDING %u", call->recording_on ? 1 : 0);
append_meta_chunk_null(call->recording, "FORWARDING %u", call->rec_forwarding ? 1 : 0);
for (GList *l = call->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
append_meta_chunk_null(call->recording, "STREAM %u FORWARDING %u",
ps->unique_id, ps->media->monologue->rec_forwarding ? 1 : 0);
}
}
// lock must be held
void recording_start(struct call *call, const char *prefix, str *metadata) {
update_metadata(call, metadata);
if (call->recording) // already active
if (call->recording) {
// already active
recording_update_flags(call);
return;
}
if (!spooldir) {
ilog(LOG_ERR, "Call recording requested, but no spool directory configured");
@ -267,11 +287,27 @@ void recording_start(struct call *call, const char *prefix, str *metadata) {
__unkernelize(ps);
ps->handler = NULL;
}
recording_update_flags(call);
}
void recording_stop(struct call *call) {
if (!call->recording)
return;
// check if all recording options are disabled
if (call->recording_on || call->rec_forwarding) {
recording_update_flags(call);
return;
}
for (GList *l = call->monologues.head; l; l = l->next) {
struct call_monologue *ml = l->data;
if (ml->rec_forwarding) {
recording_update_flags(call);
return;
}
}
ilog(LOG_NOTICE, "Turning off call recording.");
recording_finish(call);
}
@ -628,10 +664,6 @@ static int vappend_meta_chunk_iov(struct recording *recording, struct iovec *in_
return 0;
}
static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen,
const char *label_fmt, ...)
__attribute__((format(printf,4,5)));
static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen,
const char *label_fmt, ...)
{
@ -646,8 +678,6 @@ static int append_meta_chunk(struct recording *recording, const char *buf, unsig
return ret;
}
#define append_meta_chunk_str(r, str, f...) append_meta_chunk(r, (str)->s, (str)->len, f)
#define append_meta_chunk_s(r, str, f...) append_meta_chunk(r, (str), strlen(str), f)
static void proc_init(struct call *call) {
struct recording *recording = call->recording;
@ -694,8 +724,14 @@ static void finish_proc(struct call *call) {
struct recording *recording = call->recording;
if (!kernel.is_open)
return;
if (recording->u.proc.call_idx != UNINIT_IDX)
if (recording->u.proc.call_idx != UNINIT_IDX) {
kernel_del_call(recording->u.proc.call_idx);
recording->u.proc.call_idx = UNINIT_IDX;
}
for (GList *l = call->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
ps->recording.u.proc.stream_idx = UNINIT_IDX;
}
unlink(recording->meta_filepath);
}


+ 3
- 0
include/call.h View File

@ -364,6 +364,7 @@ struct call_monologue {
int block_dtmf:1;
int block_media:1;
int rec_forwarding:1;
};
struct call {
@ -402,6 +403,8 @@ struct call {
int block_dtmf:1;
int block_media:1;
int recording_on:1;
int rec_forwarding:1;
};


+ 2
- 0
include/call_interfaces.h View File

@ -102,6 +102,8 @@ const char *call_query_ng(bencode_item_t *, bencode_item_t *);
const char *call_list_ng(bencode_item_t *, bencode_item_t *);
const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *);
const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *);
const char *call_start_forwarding_ng(bencode_item_t *, bencode_item_t *);
const char *call_stop_forwarding_ng(bencode_item_t *, bencode_item_t *);
const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *);
const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *);
const char *call_block_media_ng(bencode_item_t *, bencode_item_t *);


+ 2
- 0
include/control_ng.h View File

@ -19,6 +19,8 @@ struct control_ng_stats {
int list;
int start_recording;
int stop_recording;
int start_forwarding;
int stop_forwarding;
int block_dtmf;
int unblock_dtmf;
int block_media;


+ 19
- 2
recording-daemon/decoder.c View File

@ -16,6 +16,7 @@
#include "resample.h"
#include "codeclib.h"
#include "streambuf.h"
#include "main.h"
int resample_audio;
@ -84,6 +85,7 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp)
ssrc_t *ssrc = sp;
metafile_t *metafile = ssrc->metafile;
output_t *output = ssrc->output;
stream_t *stream = ssrc->stream;
decode_t *deco = dp;
dbg("got frame pts %llu samples %u contents %02x%02x%02x%02x...", (unsigned long long) frame->pts, frame->nb_samples,
@ -98,6 +100,7 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp)
// handle mix output
pthread_mutex_lock(&metafile->mix_lock);
if (metafile->mix_out) {
dbg("adding packet from stream #%lu to mix output", stream->id);
if (G_UNLIKELY(deco->mixer_idx == (unsigned int) -1))
deco->mixer_idx = mix_get_index(metafile->mix);
format_t actual_format;
@ -117,6 +120,7 @@ no_mix_out:
pthread_mutex_unlock(&metafile->mix_lock);
if (output) {
dbg("SSRC %lx of stream #%lu has single output", ssrc->ssrc, stream->id);
if (output_config(output, &dec->out_format, NULL))
goto err;
if (output_add(output, frame))
@ -126,6 +130,7 @@ no_mix_out:
no_recording:
if (ssrc->tcp_fwd_stream) {
// XXX might be a second resampling to same format
dbg("SSRC %lx of stream #%lu has TCP forwarding stream", ssrc->ssrc, stream->id);
AVFrame *dec_frame = resample_frame(&ssrc->tcp_fwd_resampler, frame, &ssrc->tcp_fwd_format);
if (!ssrc->tcp_fwd_poller.connected) {
@ -133,6 +138,8 @@ no_recording:
if (status == 0) {
ssrc->tcp_fwd_poller.connected = 1;
ssrc->tcp_fwd_poller.blocked = 0;
dbg("TCP connection to %s established",
endpoint_print_buf(&tcp_send_to_ep));
}
else if (status < 0) {
ilog(LOG_ERR, "Failed to connect TCP socket: %s", strerror(errno));
@ -141,16 +148,24 @@ no_recording:
}
}
if (!ssrc->tcp_fwd_poller.connected && ssrc->tcp_fwd_poller.blocked) {
if (ssrc->tcp_fwd_poller.connected && ssrc->tcp_fwd_poller.blocked) {
ssrc->tcp_fwd_poller.blocked = 0;
streambuf_writeable(ssrc->tcp_fwd_stream);
}
if (!ssrc->tcp_fwd_poller.intro) {
streambuf_write(ssrc->tcp_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1);
if (metafile->metadata) {
dbg("Writing metadata header to TCP");
streambuf_write(ssrc->tcp_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1);
}
else {
ilog(LOG_WARN, "No metadata present for forwarding connection");
streambuf_write(ssrc->tcp_fwd_stream, "\0", 1);
}
ssrc->tcp_fwd_poller.intro = 1;
}
dbg("Writing %u bytes PCM to TCP", dec_frame->linesize[0]);
streambuf_write(ssrc->tcp_fwd_stream, (char *) dec_frame->extended_data[0],
dec_frame->linesize[0]);
av_frame_free(&dec_frame);
@ -171,6 +186,8 @@ int decoder_input(decode_t *deco, const str *data, unsigned long ts, ssrc_t *ssr
}
void decoder_free(decode_t *deco) {
if (!deco)
return;
decoder_close(deco->dec);
resample_shutdown(&deco->mix_resampler);
g_slice_free1(sizeof(*deco), deco);


+ 4
- 2
recording-daemon/metafile.c View File

@ -147,9 +147,11 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned
else if (sscanf_match(section, "LABEL %lu", &lu) == 1)
tag_label(mf, lu, content);
else if (sscanf_match(section, "RECORDING %u", &u) == 1)
mf->recording_on = u;
mf->recording_on = u ? 1 : 0;
else if (sscanf_match(section, "FORWARDING %u", &u) == 1)
mf->forwarding_on = u;
mf->forwarding_on = u ? 1 : 0;
else if (sscanf_match(section, "STREAM %lu FORWARDING %u", &lu, &u) == 2)
stream_forwarding_on(mf, lu, u);
}


+ 12
- 4
recording-daemon/packet.c View File

@ -63,31 +63,39 @@ out:
pthread_mutex_lock(&ret->lock);
pthread_mutex_unlock(&mf->lock);
dbg("Init for SSRC %lx of stream #%lu", ret->ssrc, stream->id);
if (mf->recording_on && !ret->output && output_single) {
char buf[256];
snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc);
ret->output = output_new(output_dir, buf);
db_do_stream(mf, ret->output, "single", stream, ssrc);
}
if (mf->forwarding_on && !ret->tcp_fwd_stream) {
if ((stream->forwarding_on || mf->forwarding_on) && !ret->tcp_fwd_stream) {
ZERO(ret->tcp_fwd_poller);
dbg("Starting TCP connection to %s", endpoint_print_buf(&tcp_send_to_ep));
int status = connect_socket_nb(&ret->tcp_fwd_sock, SOCK_STREAM, &tcp_send_to_ep);
if (status >= 0) {
ret->tcp_fwd_stream = streambuf_new(&ret->tcp_fwd_poller, ret->tcp_fwd_sock.fd);
if (status == 1)
ret->tcp_fwd_poller.blocked = 1;
else
else {
dbg("TCP connection to %s established",
endpoint_print_buf(&tcp_send_to_ep));
ret->tcp_fwd_poller.connected = 1;
}
}
else
ilog(LOG_ERR, "Failed to open/connect TCP socket: %s", strerror(errno));
ilog(LOG_ERR, "Failed to open/connect TCP socket to %s: %s",
endpoint_print_buf(&tcp_send_to_ep),
strerror(errno));
ret->tcp_fwd_format = (format_t) {
.clockrate = tcp_resample,
.channels = 1,
.format = AV_SAMPLE_FMT_S16,
};
}
else if (!mf->forwarding_on && ret->tcp_fwd_stream) {
else if (!(stream->forwarding_on || mf->forwarding_on) && ret->tcp_fwd_stream) {
// XXX same as above - unify
close_socket(&ret->tcp_fwd_sock);
streambuf_destroy(ret->tcp_fwd_stream);


+ 6
- 0
recording-daemon/stream.c View File

@ -140,3 +140,9 @@ void stream_details(metafile_t *mf, unsigned long id, unsigned int tag) {
stream_t *stream = stream_get(mf, id);
stream->tag = tag;
}
void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on) {
stream_t *stream = stream_get(mf, id);
dbg("Setting forwarding flag to %u for stream #%lu", on, stream->id);
stream->forwarding_on = on ? 1 : 0;
}

+ 1
- 0
recording-daemon/stream.h View File

@ -5,6 +5,7 @@
void stream_open(metafile_t *mf, unsigned long id, char *name);
void stream_details(metafile_t *mf, unsigned long id, unsigned int tag);
void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on);
void stream_close(stream_t *stream);
void stream_free(stream_t *stream);


+ 1
- 0
recording-daemon/types.h View File

@ -52,6 +52,7 @@ struct stream_s {
unsigned long tag;
int fd;
handler_t handler;
int forwarding_on:1;
};
typedef struct stream_s stream_t;


+ 1
- 1
t/amr-decode-test.c View File

@ -8,7 +8,7 @@ static void hexdump(const unsigned char *buf, int len) {
printf("\n");
}
static int frame_cb(decoder_t *dec, AVFrame *frame, void *u1, void *u2, void *u3) {
static int frame_cb(decoder_t *dec, AVFrame *frame, void *u1, void *u2) {
char **expect = u1;
int *expect_len = u2;
assert(expect);


Loading…
Cancel
Save