From 442c48f627df1d706639a82948d1c9356f13eb12 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 14 Feb 2019 15:43:23 -0500 Subject: [PATCH] TT#52651 produce output for TCP forwarding feature Change-Id: I18543921577faf655679829684f5af46c0af5054 --- daemon/codec.c | 2 +- lib/codeclib.c | 5 ++-- lib/codeclib.h | 3 +-- recording-daemon/decoder.c | 45 +++++++++++++++++++++++++++++---- recording-daemon/decoder.h | 2 +- recording-daemon/main.c | 10 ++++---- recording-daemon/metafile.c | 2 ++ recording-daemon/output.c | 9 ------- recording-daemon/packet.c | 50 +++++++++++++++++++++++++++++++------ recording-daemon/poller.c | 3 ++- recording-daemon/poller.h | 8 +++++- recording-daemon/types.h | 10 ++++++++ t/amr-decode-test.c | 2 +- 13 files changed, 114 insertions(+), 37 deletions(-) diff --git a/daemon/codec.c b/daemon/codec.c index 06caed979..9c5e59fd9 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -989,7 +989,7 @@ static int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void * static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) { - return decoder_input_data(ch->decoder, packet->payload, packet->ts, __packet_decoded, ch, mp, NULL); + return decoder_input_data(ch->decoder, packet->payload, packet->ts, __packet_decoded, ch, mp); } static int handler_func_transcode(struct codec_handler *h, struct media_packet *mp) { diff --git a/lib/codeclib.c b/lib/codeclib.c index b9c8758f7..cab7ba0cb 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -591,8 +591,7 @@ err: } int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts, - int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2, void *u3), - void *u1, void *u2, void *u3) + int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2), void *u1, void *u2) { GQueue frames = G_QUEUE_INIT; @@ -634,7 +633,7 @@ int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts, ret = -1; } else { - if (callback(dec, rsmp_frame, u1, u2, u3)) + if (callback(dec, rsmp_frame, u1, u2)) ret = -1; } av_frame_free(&frame); diff --git a/lib/codeclib.h b/lib/codeclib.h index eba039414..4920dcd3d 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -199,8 +199,7 @@ decoder_t *decoder_new_fmtp(const codec_def_t *def, int clockrate, int channels, const str *fmtp); void decoder_close(decoder_t *dec); int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts, - int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2, void *u3), - void *u1, void *u2, void *u3); + int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2), void *u1, void *u2); encoder_t *encoder_new(); diff --git a/recording-daemon/decoder.c b/recording-daemon/decoder.c index bda8465e7..c36e4a242 100644 --- a/recording-daemon/decoder.c +++ b/recording-daemon/decoder.c @@ -15,6 +15,7 @@ #include "mix.h" #include "resample.h" #include "codeclib.h" +#include "streambuf.h" int resample_audio; @@ -79,9 +80,10 @@ decode_t *decoder_new(const char *payload_str, output_t *outp) { } -static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *op, void *mp, void *dp) { - metafile_t *metafile = mp; - output_t *output = op; +static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp) { + ssrc_t *ssrc = sp; + metafile_t *metafile = ssrc->metafile; + output_t *output = ssrc->output; decode_t *deco = dp; dbg("got frame pts %llu samples %u contents %02x%02x%02x%02x...", (unsigned long long) frame->pts, frame->nb_samples, @@ -122,6 +124,39 @@ no_mix_out: } no_recording: + if (ssrc->tcp_fwd_stream) { + // XXX might be a second resampling to same format + AVFrame *dec_frame = resample_frame(&ssrc->tcp_fwd_resampler, frame, &ssrc->tcp_fwd_format); + + if (!ssrc->tcp_fwd_poller.connected) { + int status = connect_socket_retry(&ssrc->tcp_fwd_sock); + if (status == 0) { + ssrc->tcp_fwd_poller.connected = 1; + ssrc->tcp_fwd_poller.blocked = 0; + } + else if (status < 0) { + ilog(LOG_ERR, "Failed to connect TCP socket: %s", strerror(errno)); + streambuf_destroy(ssrc->tcp_fwd_stream); + ssrc->tcp_fwd_stream = NULL; + } + } + + if (!ssrc->tcp_fwd_poller.connected && ssrc->tcp_fwd_poller.blocked) { + ssrc->tcp_fwd_poller.blocked = 0; + streambuf_writeable(ssrc->tcp_fwd_stream); + } + + if (!ssrc->tcp_fwd_poller.intro) { + streambuf_write(ssrc->tcp_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1); + ssrc->tcp_fwd_poller.intro = 1; + } + + streambuf_write(ssrc->tcp_fwd_stream, (char *) dec_frame->extended_data[0], + dec_frame->linesize[0]); + av_frame_free(&dec_frame); + + } + av_frame_free(&frame); return 0; @@ -131,8 +166,8 @@ err: } -int decoder_input(decode_t *deco, const str *data, unsigned long ts, output_t *output, metafile_t *metafile) { - return decoder_input_data(deco->dec, data, ts, decoder_got_frame, output, metafile, deco); +int decoder_input(decode_t *deco, const str *data, unsigned long ts, ssrc_t *ssrc) { + return decoder_input_data(deco->dec, data, ts, decoder_got_frame, ssrc, deco); } void decoder_free(decode_t *deco) { diff --git a/recording-daemon/decoder.h b/recording-daemon/decoder.h index e7b7a2a88..955d509ee 100644 --- a/recording-daemon/decoder.h +++ b/recording-daemon/decoder.h @@ -9,7 +9,7 @@ extern int resample_audio; decode_t *decoder_new(const char *payload_str, output_t *); -int decoder_input(decode_t *, const str *, unsigned long ts, output_t *, metafile_t *); +int decoder_input(decode_t *, const str *, unsigned long ts, ssrc_t *); void decoder_free(decode_t *); diff --git a/recording-daemon/main.c b/recording-daemon/main.c index 34ed7819e..0eb5eb739 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -70,11 +70,11 @@ static void signals(void) { static void setup(void) { log_init("rtpengine-recording"); - if (decoding_enabled) { + socket_init(); + if (decoding_enabled) codeclib_init(0); - output_init(output_format); - } if (output_enabled) { + output_init(output_format); if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) { ilog(LOG_INFO, "Creating output dir '%s'", output_dir); if (mkdir(output_dir, 0700)) @@ -178,7 +178,7 @@ static void options(int *argc, char ***argv) { output_enabled = 0; if (output_mixed || output_single) die("Output is disabled, but output-mixed or output-single is set"); - if (!forward_to && !tcp_send_to_ep.address.family) { + if (!forward_to && !tcp_send_to_ep.port) { //the daemon has no function die("Both output and forwarding are disabled"); } @@ -186,7 +186,7 @@ static void options(int *argc, char ***argv) { } else if (!output_mixed && !output_single) output_mixed = output_single = 1; - if (output_enabled || tcp_send_to_ep.address.family) + if (output_enabled || tcp_send_to_ep.port) decoding_enabled = 1; if (!os_str || !strcmp(os_str, "file")) diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index b6f77e56d..c94cdc610 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -148,6 +148,8 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned tag_label(mf, lu, content); else if (sscanf_match(section, "RECORDING %u", &u) == 1) mf->recording_on = u; + else if (sscanf_match(section, "FORWARDING %u", &u) == 1) + mf->forwarding_on = u; } diff --git a/recording-daemon/output.c b/recording-daemon/output.c index 4de81144c..be49de762 100644 --- a/recording-daemon/output.c +++ b/recording-daemon/output.c @@ -12,8 +12,6 @@ static const codec_def_t *output_codec; static const char *output_file_format; -static const codec_def_t *tcp_send_codec; - int mp3_bitrate; @@ -170,13 +168,6 @@ void output_close(output_t *output) { void output_init(const char *format) { str codec; - str_init(&codec, "PCM-S16LE"); - tcp_send_codec = codec_find(&codec, MT_AUDIO); - assert(tcp_send_codec != NULL); - - if (!format) - return; - if (!strcmp(format, "wav")) { str_init(&codec, "PCM-S16LE"); output_file_format = "wav"; diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 5af094605..3e5e9077b 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -13,6 +13,8 @@ #include "main.h" #include "output.h" #include "db.h" +#include "streambuf.h" +#include "resample.h" static void packet_free(void *p) { @@ -30,6 +32,12 @@ void ssrc_free(void *p) { output_close(s->output); for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++) decoder_free(s->decoders[i]); + if (s->tcp_fwd_stream) { + close_socket(&s->tcp_fwd_sock); + streambuf_destroy(s->tcp_fwd_stream); + s->tcp_fwd_stream = NULL; + resample_shutdown(&s->tcp_fwd_resampler); + } g_slice_free1(sizeof(*s), s); } @@ -49,18 +57,44 @@ static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) { ret->ssrc = ssrc; packet_sequencer_init(&ret->sequencer, packet_free); - char buf[256]; - snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); - if (output_single) { - ret->output = output_new(output_dir, buf); - db_do_stream(mf, ret->output, "single", stream, ssrc); - } - g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret); out: pthread_mutex_lock(&ret->lock); pthread_mutex_unlock(&mf->lock); + + if (mf->recording_on && !ret->output && output_single) { + char buf[256]; + snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); + ret->output = output_new(output_dir, buf); + db_do_stream(mf, ret->output, "single", stream, ssrc); + } + if (mf->forwarding_on && !ret->tcp_fwd_stream) { + ZERO(ret->tcp_fwd_poller); + int status = connect_socket_nb(&ret->tcp_fwd_sock, SOCK_STREAM, &tcp_send_to_ep); + if (status >= 0) { + ret->tcp_fwd_stream = streambuf_new(&ret->tcp_fwd_poller, ret->tcp_fwd_sock.fd); + if (status == 1) + ret->tcp_fwd_poller.blocked = 1; + else + ret->tcp_fwd_poller.connected = 1; + } + else + ilog(LOG_ERR, "Failed to open/connect TCP socket: %s", strerror(errno)); + ret->tcp_fwd_format = (format_t) { + .clockrate = tcp_resample, + .channels = 1, + .format = AV_SAMPLE_FMT_S16, + }; + } + else if (!mf->forwarding_on && ret->tcp_fwd_stream) { + // XXX same as above - unify + close_socket(&ret->tcp_fwd_sock); + streambuf_destroy(ret->tcp_fwd_stream); + ret->tcp_fwd_stream = NULL; + resample_shutdown(&ret->tcp_fwd_resampler); + } + return ret; } @@ -103,7 +137,7 @@ static void packet_decode(ssrc_t *ssrc, packet_t *packet) { } if (decoder_input(ssrc->decoders[payload_type], &packet->payload, ntohl(packet->rtp->timestamp), - ssrc->output, ssrc->metafile)) + ssrc)) ilog(LOG_ERR, "Failed to decode media packet"); } diff --git a/recording-daemon/poller.c b/recording-daemon/poller.c index 29b7c168b..e6d7cfa42 100644 --- a/recording-daemon/poller.c +++ b/recording-daemon/poller.c @@ -1,9 +1,10 @@ #include "poller.h" void poller_blocked(struct poller *p, int fd) { + p->blocked = 1; } int poller_isblocked(struct poller *p, int fd) { - return 0; + return p->blocked ? 1 : 0; } void poller_error(struct poller *p, int fd) { } diff --git a/recording-daemon/poller.h b/recording-daemon/poller.h index 1bf240749..91e242300 100644 --- a/recording-daemon/poller.h +++ b/recording-daemon/poller.h @@ -2,7 +2,13 @@ #define __POLLER_H__ -struct poller; +// dummy poller +struct poller { + int blocked:1; + int connected:1; + int error:1; + int intro:1; +}; void poller_blocked(struct poller *, int); int poller_isblocked(struct poller *, int); diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 2f184d6e3..4e9c113ed 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -12,12 +12,15 @@ #include #include "str.h" #include "codeclib.h" +#include "poller.h" +#include "socket.h" struct iphdr; struct ip6_hdr; struct udphdr; struct rtp_header; +struct streambuf; struct handler_s; @@ -75,6 +78,12 @@ struct ssrc_s { packet_sequencer_t sequencer; decode_t *decoders[128]; output_t *output; + + format_t tcp_fwd_format; + resample_t tcp_fwd_resampler; + socket_t tcp_fwd_sock; + struct streambuf *tcp_fwd_stream; + struct poller tcp_fwd_poller; }; typedef struct ssrc_s ssrc_t; @@ -114,6 +123,7 @@ struct metafile_s { char *payload_types[128]; int recording_on:1; + int forwarding_on:1; }; diff --git a/t/amr-decode-test.c b/t/amr-decode-test.c index 13851ef02..e2c3b07eb 100644 --- a/t/amr-decode-test.c +++ b/t/amr-decode-test.c @@ -54,7 +54,7 @@ static void do_test_amr_xx(const char *file, int line, decoder_t *d = decoder_new_fmtp(def, clockrate, 1, &fmt, fmtp); assert(d); const str data = { data_s, data_len }; - int ret = decoder_input_data(d, &data, 1, frame_cb, &expect_s, &expect_len, NULL); + int ret = decoder_input_data(d, &data, 1, frame_cb, &expect_s, &expect_len); assert(!ret); assert(expect_s == NULL); decoder_close(d);