From 3dcddf3ffa0d800c77e4afa87f133837491dcd80 Mon Sep 17 00:00:00 2001 From: Claudiu Boriga Date: Wed, 2 Aug 2017 13:14:44 +0300 Subject: [PATCH] recording-daemon: add option to forward calls With this option, the daemon can forward calls to a server via a UNIX domain socket insdead of creating audio files. --- recording-daemon/Makefile | 2 +- recording-daemon/forward.c | 71 +++++++++++++++++++++++++++++++++++++ recording-daemon/forward.h | 9 +++++ recording-daemon/main.c | 41 +++++++++++++-------- recording-daemon/main.h | 2 ++ recording-daemon/metafile.c | 63 +++++++++++++++++++++----------- recording-daemon/stream.c | 10 +++++- recording-daemon/types.h | 4 +++ 8 files changed, 165 insertions(+), 37 deletions(-) create mode 100644 recording-daemon/forward.c create mode 100644 recording-daemon/forward.h diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index e36a2b6a9..9d0e350d9 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -25,7 +25,7 @@ LDFLAGS+= `mysql_config --libs` include ../lib/lib.Makefile SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \ - decoder.c output.c mix.c resample.c db.c log.c + decoder.c output.c mix.c resample.c db.c log.c forward.c LIBSRCS= loglib.c auxlib.c rtplib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/recording-daemon/forward.c b/recording-daemon/forward.c new file mode 100644 index 000000000..793052365 --- /dev/null +++ b/recording-daemon/forward.c @@ -0,0 +1,71 @@ +#include +#include +#include +#include "forward.h" +#include "main.h" +#include "log.h" + +void start_forwarding_capture(metafile_t *mf, char *meta_info) { + int sock; + struct sockaddr_un addr; + + if (mf->forward_fd >= 0) { + ilog(LOG_INFO, "Connection already established"); + return; + } + + if ((sock = socket(AF_UNIX, SOCK_SEQPACKET, 0)) == -1) { + ilog(LOG_ERR, "Error creating socket: %s", strerror(errno)); + return; + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, forward_to, sizeof(addr.sun_path) - 1); + + if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) { + ilog(LOG_ERR, "Error setting socket non-blocking: %s", strerror(errno)); + goto err; + } + + if (connect(sock, (struct sockaddr*) &addr, sizeof(addr)) == -1) { + ilog(LOG_ERR, "Error connecting to socket %s : %s", addr.sun_path, + strerror(errno)); + goto err; + } + + if (send(sock, meta_info, strlen(meta_info), 0) == -1) { + ilog(LOG_ERR, "Error sending meta info: %s. Call will not be forwarded", strerror(errno)); + goto err; + } + + mf->forward_fd = sock; + return; +err: + close(sock); +} + +int forward_packet(metafile_t *mf, unsigned char *buf, unsigned len) { + + if (mf->forward_fd == -1) { + ilog(LOG_ERR, + "Trying to send packets, but connection not initialized!"); + goto err; + } + + if (send(mf->forward_fd, buf, len, 0) == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + ilog(LOG_DEBUG, "Dropping packet since call would block"); + else + ilog(LOG_ERR, "Error sending: %s", strerror(errno)); + goto err; + } + + free(buf); + return 0; + +err: + free(buf); + return -1; +} + diff --git a/recording-daemon/forward.h b/recording-daemon/forward.h new file mode 100644 index 000000000..2c871bcb9 --- /dev/null +++ b/recording-daemon/forward.h @@ -0,0 +1,9 @@ +#ifndef _FORWARD_H_ +#define _FORWARD_H_ + +#include "types.h" + +void start_forwarding_capture(metafile_t *mf, char *meta_info); +int forward_packet(metafile_t *mf, unsigned char *buf, unsigned len); + +#endif diff --git a/recording-daemon/main.c b/recording-daemon/main.c index bdc996cd6..4a18a1f26 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -22,6 +22,7 @@ #include "auxlib.h" #include "decoder.h" #include "output.h" +#include "forward.h" @@ -32,12 +33,13 @@ const char *output_dir = "/var/lib/rtpengine-recording"; static const char *output_format = "wav"; int output_mixed; int output_single; +int output_enabled = 1; const char *c_mysql_host, *c_mysql_user, *c_mysql_pass, *c_mysql_db; int c_mysql_port; - +const char *forward_to = NULL; static GQueue threads = G_QUEUE_INIT; // only accessed from main thread @@ -90,23 +92,25 @@ static void avlog_ilog(void *ptr, int loglevel, const char *fmt, va_list ap) { static void setup(void) { log_init("rtpengine-recording"); - av_register_all(); - avcodec_register_all(); - avfilter_register_all(); - avformat_network_init(); + if (output_enabled) { + av_register_all(); + avcodec_register_all(); + avfilter_register_all(); + avformat_network_init(); + av_log_set_callback(avlog_ilog); + output_init(output_format); + if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) { + ilog(LOG_INFO, "Creating output dir '%s'", output_dir); + if (mkdir(output_dir, 0700)) + die_errno("Failed to create output dir '%s'"); + } + } mysql_library_init(0, NULL, NULL); signals(); metafile_setup(); epoll_setup(); inotify_setup(); - av_log_set_callback(avlog_ilog); - output_init(output_format); - if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) { - ilog(LOG_INFO, "Creating output dir '%s'", output_dir); - if (mkdir(output_dir, 0700)) - die_errno("Failed to create output dir '%s'"); - } } @@ -167,7 +171,7 @@ static void options(int *argc, char ***argv) { { "spool-dir", 0, 0, G_OPTION_ARG_STRING, &spool_dir, "Directory containing rtpengine metadata files", "PATH" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads", "INT" }, { "output-dir", 0, 0, G_OPTION_ARG_STRING, &output_dir, "Where to write media files to", "PATH" }, - { "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3" }, + { "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3|none" }, { "resample-to", 0, 0, G_OPTION_ARG_INT, &resample_audio,"Resample all output audio", "INT" }, { "mp3-bitrate", 0, 0, G_OPTION_ARG_INT, &mp3_bitrate, "Bits per second for MP3 encoding", "INT" }, { "output-mixed", 0, 0, G_OPTION_ARG_NONE, &output_mixed, "Mix participating sources into a single output",NULL }, @@ -177,13 +181,22 @@ static void options(int *argc, char ***argv) { { "mysql-user", 0, 0, G_OPTION_ARG_STRING, &c_mysql_user, "MySQL connection credentials", "USERNAME" }, { "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &c_mysql_pass, "MySQL connection credentials", "PASSWORD" }, { "mysql-db", 0, 0, G_OPTION_ARG_STRING, &c_mysql_db, "MySQL database name", "STRING" }, + { "forward-to", 0, 0, G_OPTION_ARG_STRING, &forward_to, "Where to forward to (unix socket)", "PATH" }, { NULL, } }; config_load(argc, argv, e, " - rtpengine recording daemon", "/etc/rtpengine/rtpengine-recording.conf", "rtpengine-recording"); - if (!output_mixed && !output_single) + if (!strcmp(output_format, "none")) { + output_enabled = 0; + if (output_mixed || output_single) + die("Output is disabled, but output-mixed or output-single is set"); + if (!forward_to) { + //the daemon has no function + die("Both output and packet forwarding are disabled"); + } + } else if (!output_mixed && !output_single) output_mixed = output_single = 1; } diff --git a/recording-daemon/main.h b/recording-daemon/main.h index c8251299d..bf33b8309 100644 --- a/recording-daemon/main.h +++ b/recording-daemon/main.h @@ -8,11 +8,13 @@ extern const char *spool_dir; extern const char *output_dir; extern int output_mixed; extern int output_single; +extern int output_enabled; extern const char *c_mysql_host, *c_mysql_user, *c_mysql_pass, *c_mysql_db; extern int c_mysql_port; +extern const char *forward_to; extern volatile int shutdown_flag; diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index 6d59400dc..0c339d6e2 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -15,7 +15,7 @@ #include "output.h" #include "mix.h" #include "db.h" - +#include "forward.h" static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER; static GHashTable *metafiles; @@ -25,8 +25,10 @@ static void meta_free(void *ptr) { metafile_t *mf = ptr; dbg("freeing metafile info for %s", mf->name); - output_close(mf->mix_out); - mix_destroy(mf->mix); + if (output_enabled) { + output_close(mf->mix_out); + mix_destroy(mf->mix); + } g_string_chunk_free(mf->gsc); for (int i = 0; i < mf->streams->len; i++) { stream_t *stream = g_ptr_array_index(mf->streams, i); @@ -34,7 +36,8 @@ static void meta_free(void *ptr) { stream_free(stream); } g_ptr_array_free(mf->streams, TRUE); - g_hash_table_destroy(mf->ssrc_hash); + if (output_enabled) + g_hash_table_destroy(mf->ssrc_hash); g_slice_free1(sizeof(*mf), mf); } @@ -48,6 +51,14 @@ static void meta_destroy(metafile_t *mf) { stream_close(stream); pthread_mutex_unlock(&stream->lock); } + //close forward socket + if (mf->forward_fd >= 0) { + dbg("call [%s] forwarded %d packets. %d failed sends.", mf->call_id, + (int )g_atomic_int_get(&mf->forward_count), + (int )g_atomic_int_get(&mf->forward_failed)); + close(mf->forward_fd); + mf->forward_fd = -1; + } db_close_call(mf); } @@ -55,17 +66,17 @@ static void meta_destroy(metafile_t *mf) { // mf is locked static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) { db_do_call(mf); - - pthread_mutex_lock(&mf->mix_lock); - if (!mf->mix && output_mixed) { - char buf[256]; - snprintf(buf, sizeof(buf), "%s-mix", mf->parent); - mf->mix_out = output_new(output_dir, buf); - mf->mix = mix_new(); - db_do_stream(mf, mf->mix_out, "mixed", 0, 0); + if (output_enabled) { + pthread_mutex_lock(&mf->mix_lock); + if (!mf->mix && output_mixed) { + char buf[256]; + snprintf(buf, sizeof(buf), "%s-mix", mf->parent); + mf->mix_out = output_new(output_dir, buf); + mf->mix = mix_new(); + db_do_stream(mf, mf->mix_out, "mixed", 0, 0); + } + pthread_mutex_unlock(&mf->mix_lock); } - pthread_mutex_unlock(&mf->mix_lock); - dbg("stream %lu interface %s", snum, content); stream_open(mf, snum, content); } @@ -87,10 +98,12 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i ilog(LOG_ERR, "Payload type number %u is invalid", payload_num); return; } - - pthread_mutex_lock(&mf->payloads_lock); - mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc, payload_type); - pthread_mutex_unlock(&mf->payloads_lock); + if (output_enabled) { + pthread_mutex_lock(&mf->payloads_lock); + mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc, + payload_type); + pthread_mutex_unlock(&mf->payloads_lock); + } } @@ -98,6 +111,8 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i static void meta_metadata(metafile_t *mf, char *content) { mf->metadata = g_string_chunk_insert(mf->gsc, content); db_do_call(mf); + if (forward_to) + start_forwarding_capture(mf, content); } @@ -134,10 +149,16 @@ static metafile_t *metafile_get(char *name) { mf->gsc = g_string_chunk_new(0); mf->name = g_string_chunk_insert(mf->gsc, name); pthread_mutex_init(&mf->lock, NULL); - pthread_mutex_init(&mf->payloads_lock, NULL); - pthread_mutex_init(&mf->mix_lock, NULL); mf->streams = g_ptr_array_new(); - mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free); + mf->forward_fd = -1; + mf->forward_count = 0; + mf->forward_failed = 0; + + if (output_enabled) { + pthread_mutex_init(&mf->payloads_lock, NULL); + pthread_mutex_init(&mf->mix_lock, NULL); + mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free); + } g_hash_table_insert(metafiles, mf->name, mf); diff --git a/recording-daemon/stream.c b/recording-daemon/stream.c index 62f023b9e..9b5377def 100644 --- a/recording-daemon/stream.c +++ b/recording-daemon/stream.c @@ -10,6 +10,7 @@ #include "log.h" #include "main.h" #include "packet.h" +#include "forward.h" #define MAXBUFLEN 65535 @@ -67,7 +68,14 @@ static void stream_handler(handler_t *handler) { // got a packet pthread_mutex_unlock(&stream->lock); - packet_process(stream, buf, ret); + if (output_enabled) + packet_process(stream, buf, ret); + if (forward_to){ + if (forward_packet(stream->metafile,buf,ret)) + g_atomic_int_inc(&stream->metafile->forward_failed); + else + g_atomic_int_inc(&stream->metafile->forward_count); + } log_info_call = NULL; log_info_stream = NULL; return; diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 546054876..364d31a7b 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -101,6 +101,10 @@ struct metafile_s { mix_t *mix; output_t *mix_out; + int forward_fd; + volatile gint forward_count; + volatile gint forward_failed; + pthread_mutex_t payloads_lock; char *payload_types[128]; };