diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index d7adeec08..c52baeb58 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -1004,8 +1004,10 @@ static const char *call_offer_answer_ng(bencode_item_t *input, bencode_buffer_destroy_add(output->buffer, (free_func_t) sdp_chopper_destroy, chopper); detect_setup_recording(call, &flags.record_call_str, &flags.metadata); - if (flags.record_call) + if (flags.record_call) { + call->recording_on = 1; recording_start(call, NULL, &flags.metadata); + } ret = monologue_offer_answer(monologue, &streams, &flags); if (!ret) { @@ -1438,6 +1440,7 @@ const char *call_start_recording_ng(bencode_item_t *input, bencode_item_t *outpu if (!call) return "Unknown call-id"; + call->recording_on = 1; recording_start(call, NULL, &metadata); rwlock_unlock_w(&call->master_lock); @@ -1456,6 +1459,7 @@ const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output if (!call) return "Unknown call-id"; + call->recording_on = 0; recording_stop(call); rwlock_unlock_w(&call->master_lock); @@ -1512,6 +1516,80 @@ found: return NULL; } +// XXX these are all identical - unify and use a flags int and/or callback +const char *call_start_forwarding_ng(bencode_item_t *input, bencode_item_t *output) { + struct call *call; + struct call_monologue *monologue; + const char *errstr = NULL; + str metadata; + + errstr = media_block_match(&call, &monologue, input); + if (errstr) + goto out; + + bencode_dictionary_get_str(input, "metadata", &metadata); + + if (monologue) { + ilog(LOG_INFO, "Start forwarding for single party (tag '" STR_FORMAT ")", + STR_FMT(&monologue->tag)); + monologue->rec_forwarding = 1; + } + else { + ilog(LOG_INFO, "Start forwarding (entire call)"); + call->rec_forwarding = 1; + } + + recording_start(call, NULL, &metadata); + errstr = NULL; +out: + if (call) { + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + return errstr; +} + +const char *call_stop_forwarding_ng(bencode_item_t *input, bencode_item_t *output) { + struct call *call; + struct call_monologue *monologue; + const char *errstr = NULL; + struct sdp_ng_flags flags; + + errstr = media_block_match(&call, &monologue, input); + if (errstr) + goto out; + + call_ng_process_flags(&flags, input, OP_OTHER); + + if (monologue) { + ilog(LOG_INFO, "Stop forwarding for single party (tag '" STR_FORMAT ")", + STR_FMT(&monologue->tag)); + monologue->rec_forwarding = 0; + } + else { + ilog(LOG_INFO, "Stop forwarding (entire call)"); + call->rec_forwarding = 0; + if (flags.all) { + for (GList *l = call->monologues.head; l; l = l->next) { + monologue = l->data; + monologue->rec_forwarding = 0; + } + } + } + + recording_stop(call); + + errstr = NULL; +out: + if (call) { + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + return NULL; +} + const char *call_block_dtmf_ng(bencode_item_t *input, bencode_item_t *output) { struct call *call; struct call_monologue *monologue; diff --git a/daemon/codec.c b/daemon/codec.c index 9c5e59fd9..09fcd36bb 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -975,7 +975,7 @@ static int __packet_encoded(encoder_t *enc, void *u1, void *u2) { return 0; } -static int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *u2, void *u3) { +static int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *u2) { struct codec_ssrc_handler *ch = u1; ilog(LOG_DEBUG, "RTP media successfully decoded: TS %llu, samples %u", diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 59c4a0efa..39f56dbd3 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -219,6 +219,14 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin errstr = call_stop_recording_ng(dict, resp); g_atomic_int_inc(&cur->stop_recording); break; + case CSH_LOOKUP("start forwarding"): + errstr = call_start_forwarding_ng(dict, resp); + g_atomic_int_inc(&cur->start_forwarding); + break; + case CSH_LOOKUP("stop forwarding"): + errstr = call_stop_forwarding_ng(dict, resp); + g_atomic_int_inc(&cur->stop_forwarding); + break; case CSH_LOOKUP("block DTMF"): errstr = call_block_dtmf_ng(dict, resp); g_atomic_int_inc(&cur->block_dtmf); diff --git a/daemon/recording.c b/daemon/recording.c index da3d4af71..30ac94405 100644 --- a/daemon/recording.c +++ b/daemon/recording.c @@ -35,6 +35,9 @@ struct pcap_format { static int check_main_spool_dir(const char *spoolpath); static char *recording_setup_file(struct recording *recording); static char *meta_setup_file(struct recording *recording); +static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen, + const char *label_fmt, ...) + __attribute__((format(printf,4,5))); // pcap methods static int pcap_create_spool_dir(const char *dirpath); @@ -58,6 +61,9 @@ static void kernel_info_proc(struct packet_stream *, struct rtpengine_target_inf static void pcap_eth_header(unsigned char *, struct packet_stream *); +#define append_meta_chunk_str(r, str, f...) append_meta_chunk(r, (str)->s, (str)->len, f) +#define append_meta_chunk_s(r, str, f...) append_meta_chunk(r, (str), strlen(str), f) +#define append_meta_chunk_null(r,f...) append_meta_chunk(r, "", 0, f) static const struct recording_method methods[] = { @@ -224,12 +230,26 @@ static void update_metadata(struct call *call, str *metadata) { } } +// lock must be held +static void recording_update_flags(struct call *call) { + append_meta_chunk_null(call->recording, "RECORDING %u", call->recording_on ? 1 : 0); + append_meta_chunk_null(call->recording, "FORWARDING %u", call->rec_forwarding ? 1 : 0); + for (GList *l = call->streams.head; l; l = l->next) { + struct packet_stream *ps = l->data; + append_meta_chunk_null(call->recording, "STREAM %u FORWARDING %u", + ps->unique_id, ps->media->monologue->rec_forwarding ? 1 : 0); + } +} + // lock must be held void recording_start(struct call *call, const char *prefix, str *metadata) { update_metadata(call, metadata); - if (call->recording) // already active + if (call->recording) { + // already active + recording_update_flags(call); return; + } if (!spooldir) { ilog(LOG_ERR, "Call recording requested, but no spool directory configured"); @@ -267,11 +287,27 @@ void recording_start(struct call *call, const char *prefix, str *metadata) { __unkernelize(ps); ps->handler = NULL; } + + recording_update_flags(call); } void recording_stop(struct call *call) { if (!call->recording) return; + // check if all recording options are disabled + if (call->recording_on || call->rec_forwarding) { + recording_update_flags(call); + return; + } + + for (GList *l = call->monologues.head; l; l = l->next) { + struct call_monologue *ml = l->data; + if (ml->rec_forwarding) { + recording_update_flags(call); + return; + } + } + ilog(LOG_NOTICE, "Turning off call recording."); recording_finish(call); } @@ -628,10 +664,6 @@ static int vappend_meta_chunk_iov(struct recording *recording, struct iovec *in_ return 0; } -static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen, - const char *label_fmt, ...) - __attribute__((format(printf,4,5))); - static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen, const char *label_fmt, ...) { @@ -646,8 +678,6 @@ static int append_meta_chunk(struct recording *recording, const char *buf, unsig return ret; } -#define append_meta_chunk_str(r, str, f...) append_meta_chunk(r, (str)->s, (str)->len, f) -#define append_meta_chunk_s(r, str, f...) append_meta_chunk(r, (str), strlen(str), f) static void proc_init(struct call *call) { struct recording *recording = call->recording; @@ -694,8 +724,14 @@ static void finish_proc(struct call *call) { struct recording *recording = call->recording; if (!kernel.is_open) return; - if (recording->u.proc.call_idx != UNINIT_IDX) + if (recording->u.proc.call_idx != UNINIT_IDX) { kernel_del_call(recording->u.proc.call_idx); + recording->u.proc.call_idx = UNINIT_IDX; + } + for (GList *l = call->streams.head; l; l = l->next) { + struct packet_stream *ps = l->data; + ps->recording.u.proc.stream_idx = UNINIT_IDX; + } unlink(recording->meta_filepath); } diff --git a/include/call.h b/include/call.h index 83ab0e6ab..e9ebab8a4 100644 --- a/include/call.h +++ b/include/call.h @@ -364,6 +364,7 @@ struct call_monologue { int block_dtmf:1; int block_media:1; + int rec_forwarding:1; }; struct call { @@ -402,6 +403,8 @@ struct call { int block_dtmf:1; int block_media:1; + int recording_on:1; + int rec_forwarding:1; }; diff --git a/include/call_interfaces.h b/include/call_interfaces.h index 10a0e0b4b..edbc6ada1 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -102,6 +102,8 @@ const char *call_query_ng(bencode_item_t *, bencode_item_t *); const char *call_list_ng(bencode_item_t *, bencode_item_t *); const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *); const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *); +const char *call_start_forwarding_ng(bencode_item_t *, bencode_item_t *); +const char *call_stop_forwarding_ng(bencode_item_t *, bencode_item_t *); const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_block_media_ng(bencode_item_t *, bencode_item_t *); diff --git a/include/control_ng.h b/include/control_ng.h index afe409640..ef8b3489f 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -19,6 +19,8 @@ struct control_ng_stats { int list; int start_recording; int stop_recording; + int start_forwarding; + int stop_forwarding; int block_dtmf; int unblock_dtmf; int block_media; diff --git a/recording-daemon/decoder.c b/recording-daemon/decoder.c index c36e4a242..7dd5c4fee 100644 --- a/recording-daemon/decoder.c +++ b/recording-daemon/decoder.c @@ -16,6 +16,7 @@ #include "resample.h" #include "codeclib.h" #include "streambuf.h" +#include "main.h" int resample_audio; @@ -84,6 +85,7 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp) ssrc_t *ssrc = sp; metafile_t *metafile = ssrc->metafile; output_t *output = ssrc->output; + stream_t *stream = ssrc->stream; decode_t *deco = dp; dbg("got frame pts %llu samples %u contents %02x%02x%02x%02x...", (unsigned long long) frame->pts, frame->nb_samples, @@ -98,6 +100,7 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp) // handle mix output pthread_mutex_lock(&metafile->mix_lock); if (metafile->mix_out) { + dbg("adding packet from stream #%lu to mix output", stream->id); if (G_UNLIKELY(deco->mixer_idx == (unsigned int) -1)) deco->mixer_idx = mix_get_index(metafile->mix); format_t actual_format; @@ -117,6 +120,7 @@ no_mix_out: pthread_mutex_unlock(&metafile->mix_lock); if (output) { + dbg("SSRC %lx of stream #%lu has single output", ssrc->ssrc, stream->id); if (output_config(output, &dec->out_format, NULL)) goto err; if (output_add(output, frame)) @@ -126,6 +130,7 @@ no_mix_out: no_recording: if (ssrc->tcp_fwd_stream) { // XXX might be a second resampling to same format + dbg("SSRC %lx of stream #%lu has TCP forwarding stream", ssrc->ssrc, stream->id); AVFrame *dec_frame = resample_frame(&ssrc->tcp_fwd_resampler, frame, &ssrc->tcp_fwd_format); if (!ssrc->tcp_fwd_poller.connected) { @@ -133,6 +138,8 @@ no_recording: if (status == 0) { ssrc->tcp_fwd_poller.connected = 1; ssrc->tcp_fwd_poller.blocked = 0; + dbg("TCP connection to %s established", + endpoint_print_buf(&tcp_send_to_ep)); } else if (status < 0) { ilog(LOG_ERR, "Failed to connect TCP socket: %s", strerror(errno)); @@ -141,16 +148,24 @@ no_recording: } } - if (!ssrc->tcp_fwd_poller.connected && ssrc->tcp_fwd_poller.blocked) { + if (ssrc->tcp_fwd_poller.connected && ssrc->tcp_fwd_poller.blocked) { ssrc->tcp_fwd_poller.blocked = 0; streambuf_writeable(ssrc->tcp_fwd_stream); } if (!ssrc->tcp_fwd_poller.intro) { - streambuf_write(ssrc->tcp_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1); + if (metafile->metadata) { + dbg("Writing metadata header to TCP"); + streambuf_write(ssrc->tcp_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1); + } + else { + ilog(LOG_WARN, "No metadata present for forwarding connection"); + streambuf_write(ssrc->tcp_fwd_stream, "\0", 1); + } ssrc->tcp_fwd_poller.intro = 1; } + dbg("Writing %u bytes PCM to TCP", dec_frame->linesize[0]); streambuf_write(ssrc->tcp_fwd_stream, (char *) dec_frame->extended_data[0], dec_frame->linesize[0]); av_frame_free(&dec_frame); @@ -171,6 +186,8 @@ int decoder_input(decode_t *deco, const str *data, unsigned long ts, ssrc_t *ssr } void decoder_free(decode_t *deco) { + if (!deco) + return; decoder_close(deco->dec); resample_shutdown(&deco->mix_resampler); g_slice_free1(sizeof(*deco), deco); diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index c94cdc610..481bcd4cc 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -147,9 +147,11 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned else if (sscanf_match(section, "LABEL %lu", &lu) == 1) tag_label(mf, lu, content); else if (sscanf_match(section, "RECORDING %u", &u) == 1) - mf->recording_on = u; + mf->recording_on = u ? 1 : 0; else if (sscanf_match(section, "FORWARDING %u", &u) == 1) - mf->forwarding_on = u; + mf->forwarding_on = u ? 1 : 0; + else if (sscanf_match(section, "STREAM %lu FORWARDING %u", &lu, &u) == 2) + stream_forwarding_on(mf, lu, u); } diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 3e5e9077b..dbe5fec58 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -63,31 +63,39 @@ out: pthread_mutex_lock(&ret->lock); pthread_mutex_unlock(&mf->lock); + dbg("Init for SSRC %lx of stream #%lu", ret->ssrc, stream->id); + if (mf->recording_on && !ret->output && output_single) { char buf[256]; snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); ret->output = output_new(output_dir, buf); db_do_stream(mf, ret->output, "single", stream, ssrc); } - if (mf->forwarding_on && !ret->tcp_fwd_stream) { + if ((stream->forwarding_on || mf->forwarding_on) && !ret->tcp_fwd_stream) { ZERO(ret->tcp_fwd_poller); + dbg("Starting TCP connection to %s", endpoint_print_buf(&tcp_send_to_ep)); int status = connect_socket_nb(&ret->tcp_fwd_sock, SOCK_STREAM, &tcp_send_to_ep); if (status >= 0) { ret->tcp_fwd_stream = streambuf_new(&ret->tcp_fwd_poller, ret->tcp_fwd_sock.fd); if (status == 1) ret->tcp_fwd_poller.blocked = 1; - else + else { + dbg("TCP connection to %s established", + endpoint_print_buf(&tcp_send_to_ep)); ret->tcp_fwd_poller.connected = 1; + } } else - ilog(LOG_ERR, "Failed to open/connect TCP socket: %s", strerror(errno)); + ilog(LOG_ERR, "Failed to open/connect TCP socket to %s: %s", + endpoint_print_buf(&tcp_send_to_ep), + strerror(errno)); ret->tcp_fwd_format = (format_t) { .clockrate = tcp_resample, .channels = 1, .format = AV_SAMPLE_FMT_S16, }; } - else if (!mf->forwarding_on && ret->tcp_fwd_stream) { + else if (!(stream->forwarding_on || mf->forwarding_on) && ret->tcp_fwd_stream) { // XXX same as above - unify close_socket(&ret->tcp_fwd_sock); streambuf_destroy(ret->tcp_fwd_stream); diff --git a/recording-daemon/stream.c b/recording-daemon/stream.c index 52abe71e9..93111f650 100644 --- a/recording-daemon/stream.c +++ b/recording-daemon/stream.c @@ -140,3 +140,9 @@ void stream_details(metafile_t *mf, unsigned long id, unsigned int tag) { stream_t *stream = stream_get(mf, id); stream->tag = tag; } + +void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on) { + stream_t *stream = stream_get(mf, id); + dbg("Setting forwarding flag to %u for stream #%lu", on, stream->id); + stream->forwarding_on = on ? 1 : 0; +} diff --git a/recording-daemon/stream.h b/recording-daemon/stream.h index e1859cc86..499c99eb6 100644 --- a/recording-daemon/stream.h +++ b/recording-daemon/stream.h @@ -5,6 +5,7 @@ void stream_open(metafile_t *mf, unsigned long id, char *name); void stream_details(metafile_t *mf, unsigned long id, unsigned int tag); +void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on); void stream_close(stream_t *stream); void stream_free(stream_t *stream); diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 4e9c113ed..41ce7a87b 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -52,6 +52,7 @@ struct stream_s { unsigned long tag; int fd; handler_t handler; + int forwarding_on:1; }; typedef struct stream_s stream_t; diff --git a/t/amr-decode-test.c b/t/amr-decode-test.c index e2c3b07eb..315601be6 100644 --- a/t/amr-decode-test.c +++ b/t/amr-decode-test.c @@ -8,7 +8,7 @@ static void hexdump(const unsigned char *buf, int len) { printf("\n"); } -static int frame_cb(decoder_t *dec, AVFrame *frame, void *u1, void *u2, void *u3) { +static int frame_cb(decoder_t *dec, AVFrame *frame, void *u1, void *u2) { char **expect = u1; int *expect_len = u2; assert(expect);