diff --git a/debian/control b/debian/control index be3e7c29f..d1bbd4402 100644 --- a/debian/control +++ b/debian/control @@ -4,6 +4,7 @@ Priority: extra Maintainer: Sipwise Development Team Build-Depends: debhelper (>= 9~), + default-libmysqlclient-dev | libmysqlclient-dev, iptables-dev (>= 1.4), libavcodec-dev (>= 6:10), libavfilter-dev (>= 6:10), diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index 4266a54c7..d8e066264 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -10,6 +10,7 @@ CFLAGS+= `pkg-config --cflags libavformat` CFLAGS+= `pkg-config --cflags libavutil` CFLAGS+= `pkg-config --cflags libavresample` CFLAGS+= `pkg-config --cflags libavfilter` +CFLAGS+= `mysql_config --cflags` LDFLAGS= -lm LDFLAGS+= `pkg-config --libs glib-2.0` @@ -19,11 +20,12 @@ LDFLAGS+= `pkg-config --libs libavformat` LDFLAGS+= `pkg-config --libs libavutil` LDFLAGS+= `pkg-config --libs libavresample` LDFLAGS+= `pkg-config --libs libavfilter` +LDFLAGS+= `mysql_config --libs` include ../lib/lib.Makefile SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \ - decoder.c output.c mix.c resample.c + decoder.c output.c mix.c resample.c db.c LIBSRCS= loglib.c auxlib.c rtplib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/recording-daemon/db.c b/recording-daemon/db.c new file mode 100644 index 000000000..2804212ba --- /dev/null +++ b/recording-daemon/db.c @@ -0,0 +1,231 @@ +#include "db.h" +#include +#include +#include +#include "types.h" +#include "main.h" +#include "log.h" + + +static MYSQL __thread *mysql_conn; +static MYSQL_STMT __thread + *stm_insert_call, + *stm_close_call, + *stm_insert_stream, + *stm_close_stream, + *stm_config_stream; + + +static void my_stmt_close(MYSQL_STMT **st) { + if (!*st) + return; + mysql_stmt_close(*st); + *st = NULL; +} + + +static void reset_conn() { + my_stmt_close(&stm_insert_call); + my_stmt_close(&stm_close_call); + my_stmt_close(&stm_insert_stream); + my_stmt_close(&stm_close_stream); + my_stmt_close(&stm_config_stream); + mysql_close(mysql_conn); + mysql_conn = NULL; +} + + +INLINE int prep(MYSQL_STMT **st, const char *str) { + *st = mysql_stmt_init(mysql_conn); + if (!*st) + return -1; + if (mysql_stmt_prepare(*st, str, strlen(str))) { + ilog(LOG_ERR, "Failed to prepare statement '%s': %s", str, mysql_stmt_error(*st)); + return -1; + } + return 0; +} + + +static int check_conn() { + if (mysql_conn) + return 0; + if (!c_mysql_host || !c_mysql_db) + return -1; + + ilog(LOG_DEBUG, "connecting to MySQL"); + + mysql_conn = mysql_init(NULL); + if (!mysql_conn) + goto err; + if (!mysql_real_connect(mysql_conn, c_mysql_host, c_mysql_user, c_mysql_pass, c_mysql_db, c_mysql_port, + NULL, CLIENT_IGNORE_SIGPIPE)) + goto err; + if (mysql_select_db(mysql_conn, c_mysql_db)) + goto err; + if (mysql_autocommit(mysql_conn, 0)) + goto err; + + if (prep(&stm_insert_call, "insert into recording_calls (call_id) values (?)")) + goto err; + if (prep(&stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \ + "file_format, " \ + "output_type, " \ + "stream_id, ssrc) values (?,?,?,?,?,?,?)")) + goto err; + if (prep(&stm_close_call, "update recording_calls set end_time = now() where id = ?")) + goto err; + if (prep(&stm_close_stream, "update recording_streams set end_time = now() where id = ?")) + goto err; + if (prep(&stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?")) + goto err; + + ilog(LOG_INFO, "Connection to MySQL established"); + + return 0; + +err: + if (mysql_conn) { + ilog(LOG_ERR, "Failed to connect to MySQL: %s", mysql_error(mysql_conn)); + reset_conn(); + } + else + ilog(LOG_ERR, "Failed to connect to MySQL: out of memory"); + + return -1; +} + + +INLINE void my_str(MYSQL_BIND *b, const char *s) { + *b = (MYSQL_BIND) { + .buffer_type = MYSQL_TYPE_STRING, + .buffer = (void *) s, + .buffer_length = strlen(s), + .length = &b->buffer_length, + }; +} +INLINE void my_ull(MYSQL_BIND *b, const unsigned long long *ull) { + *b = (MYSQL_BIND) { + .buffer_type = MYSQL_TYPE_LONGLONG, + .buffer = (void *) ull, + .buffer_length = sizeof(*ull), + .is_unsigned = 1, + }; +} +INLINE void my_i(MYSQL_BIND *b, const int *i) { + *b = (MYSQL_BIND) { + .buffer_type = MYSQL_TYPE_LONG, + .buffer = (void *) i, + .buffer_length = sizeof(*i), + .is_unsigned = 0, + }; +} + + +static void execute_wrap(MYSQL_STMT **stmt, MYSQL_BIND *binds, unsigned long long *auto_id) { + for (int retr = 0; retr < 5; retr++) { + if (mysql_stmt_bind_param(*stmt, binds)) + goto err; + if (mysql_stmt_execute(*stmt)) + goto err; + if (auto_id) { + *auto_id = mysql_insert_id(mysql_conn); + if (*auto_id <= 0) + goto err; + } + if (mysql_commit(mysql_conn)) + goto err; + + return; + +err: + ilog(LOG_ERR, "Failed to bind or execute prepared statement: %s", + mysql_stmt_error(*stmt)); + if (retr > 2) { + reset_conn(); + if (check_conn()) + return; + } + } +} + + +void db_do_call(metafile_t *mf) { + if (check_conn()) + return; + if (mf->db_id > 0) + return; + + MYSQL_BIND b[1]; + my_str(&b[0], mf->call_id); + + execute_wrap(&stm_insert_call, b, &mf->db_id); +} + + +void db_do_stream(metafile_t *mf, output_t *op, const char *type, unsigned int id, unsigned long ssrc) { + if (check_conn()) + return; + if (mf->db_id <= 0) + return; + if (op->db_id > 0) + return; + + MYSQL_BIND b[7]; + my_ull(&b[0], &mf->db_id); + my_str(&b[1], op->file_name); + my_str(&b[2], op->full_filename); + my_str(&b[3], op->file_format); + my_str(&b[4], type); + b[5] = (MYSQL_BIND) { + .buffer_type = MYSQL_TYPE_LONG, + .buffer = &id, + .buffer_length = sizeof(id), + .is_unsigned = 1, + }; + b[6] = (MYSQL_BIND) { + .buffer_type = MYSQL_TYPE_LONG, + .buffer = &ssrc, + .buffer_length = sizeof(ssrc), + .is_unsigned = 1, + }; + + execute_wrap(&stm_insert_stream, b, &op->db_id); +} + +void db_close_call(metafile_t *mf) { + if (check_conn()) + return; + if (mf->db_id <= 0) + return; + + MYSQL_BIND b[1]; + my_ull(&b[0], &mf->db_id); + + execute_wrap(&stm_close_call, b, NULL); +} +void db_close_stream(output_t *op) { + if (check_conn()) + return; + if (op->db_id <= 0) + return; + + MYSQL_BIND b[1]; + my_ull(&b[0], &op->db_id); + + execute_wrap(&stm_close_stream, b, NULL); +} + +void db_config_stream(output_t *op) { + if (check_conn()) + return; + if (op->db_id <= 0) + return; + + MYSQL_BIND b[3]; + my_i(&b[0], &op->actual_format.channels); + my_i(&b[1], &op->actual_format.clockrate); + my_ull(&b[2], &op->db_id); + + execute_wrap(&stm_config_stream, b, NULL); +} diff --git a/recording-daemon/db.h b/recording-daemon/db.h new file mode 100644 index 000000000..7842ca8ce --- /dev/null +++ b/recording-daemon/db.h @@ -0,0 +1,14 @@ +#ifndef _DB_H_ +#define _DB_H_ + +#include "types.h" + + +void db_do_call(metafile_t *); +void db_close_call(metafile_t *); +void db_do_stream(metafile_t *mf, output_t *op, const char *type, unsigned int id, unsigned long ssrc); +void db_close_stream(output_t *op); +void db_config_stream(output_t *op); + + +#endif diff --git a/recording-daemon/epoll.c b/recording-daemon/epoll.c index 5386fc3bc..68e86b9ba 100644 --- a/recording-daemon/epoll.c +++ b/recording-daemon/epoll.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "log.h" #include "main.h" #include "garbage.h" @@ -31,12 +32,21 @@ void epoll_del(int fd) { } +static void poller_thread_end(void *ptr) { + mysql_thread_end(); +} + + void *poller_thread(void *ptr) { struct epoll_event epev; unsigned int me_num = GPOINTER_TO_UINT(ptr); dbg("poller thread %u running", me_num); + mysql_thread_init(); + + pthread_cleanup_push(poller_thread_end, NULL); + while (!shutdown_flag) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); int ret = epoll_wait(epoll_fd, &epev, 1, 10000); @@ -58,6 +68,8 @@ void *poller_thread(void *ptr) { garbage_collect(me_num); } + pthread_cleanup_pop(1); + return NULL; } diff --git a/recording-daemon/main.c b/recording-daemon/main.c index db785deab..965f8c045 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -11,6 +11,7 @@ #include #include #include +#include #include "log.h" #include "epoll.h" #include "inotify.h" @@ -30,6 +31,11 @@ const char *output_dir = "/var/lib/rtpengine-recording"; static const char *output_format = "wav"; int output_mixed; int output_single; +const char *c_mysql_host, + *c_mysql_user, + *c_mysql_pass, + *c_mysql_db; +int c_mysql_port; static GQueue threads = G_QUEUE_INIT; // only accessed from main thread @@ -68,6 +74,7 @@ static void setup(void) { avcodec_register_all(); avfilter_register_all(); avformat_network_init(); + mysql_library_init(0, NULL, NULL); signals(); metafile_setup(); epoll_setup(); @@ -130,6 +137,7 @@ static void cleanup(void) { metafile_cleanup(); inotify_cleanup(); epoll_cleanup(); + mysql_library_end(); } @@ -144,6 +152,11 @@ static void options(int *argc, char ***argv) { { "mp3-bitrate", 0, 0, G_OPTION_ARG_INT, &mp3_bitrate, "Bits per second for MP3 encoding", "INT" }, { "output-mixed", 0, 0, G_OPTION_ARG_NONE, &output_mixed, "Mix participating sources into a single output",NULL }, { "output-single", 0, 0, G_OPTION_ARG_NONE, &output_single, "Create one output file for each source",NULL }, + { "mysql-host", 0, 0, G_OPTION_ARG_STRING, &c_mysql_host, "MySQL host for storage of call metadata","HOST|IP" }, + { "mysql-port", 0, 0, G_OPTION_ARG_INT, &c_mysql_port, "MySQL port" ,"INT" }, + { "mysql-user", 0, 0, G_OPTION_ARG_STRING, &c_mysql_user, "MySQL connection credentials", "USERNAME" }, + { "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &c_mysql_pass, "MySQL connection credentials", "PASSWORD" }, + { "mysql-db", 0, 0, G_OPTION_ARG_STRING, &c_mysql_db, "MySQL database name", "STRING" }, { NULL, } }; diff --git a/recording-daemon/main.h b/recording-daemon/main.h index b3d0070c0..c8251299d 100644 --- a/recording-daemon/main.h +++ b/recording-daemon/main.h @@ -8,6 +8,11 @@ extern const char *spool_dir; extern const char *output_dir; extern int output_mixed; extern int output_single; +extern const char *c_mysql_host, + *c_mysql_user, + *c_mysql_pass, + *c_mysql_db; +extern int c_mysql_port; extern volatile int shutdown_flag; diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index e4cc2340a..03e2d10ac 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -14,6 +14,7 @@ #include "packet.h" #include "output.h" #include "mix.h" +#include "db.h" static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER; @@ -47,17 +48,21 @@ static void meta_destroy(metafile_t *mf) { stream_close(stream); pthread_mutex_unlock(&stream->lock); } + db_close_call(mf); } // mf is locked static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) { + db_do_call(mf); + pthread_mutex_lock(&mf->mix_lock); if (!mf->mix && output_mixed) { char buf[256]; - snprintf(buf, sizeof(buf), "%s/%s-mix", output_dir, mf->parent); - mf->mix_out = output_new(buf); + snprintf(buf, sizeof(buf), "%s-mix", mf->parent); + mf->mix_out = output_new(output_dir, buf); mf->mix = mix_new(); + db_do_stream(mf, mf->mix_out, "mixed", 0, 0); } pthread_mutex_unlock(&mf->mix_lock); diff --git a/recording-daemon/output.c b/recording-daemon/output.c index 7eea31e1a..71636a514 100644 --- a/recording-daemon/output.c +++ b/recording-daemon/output.c @@ -1,32 +1,11 @@ #include "output.h" #include -#include -#include -#include -#include #include #include #include #include #include "log.h" - - -struct output_s { - char filename[PATH_MAX]; - - format_t requested_format, - actual_format; - - AVCodecContext *avcctx; - AVFormatContext *fmtctx; - AVStream *avst; - AVPacket avpkt; - AVAudioFifo *fifo; - int64_t fifo_pts; // pts of first data in fifo - int64_t mux_dts; // last dts passed to muxer - AVFrame *frame; -}; - +#include "db.h" static int output_codec_id; @@ -147,11 +126,14 @@ int output_add(output_t *output, AVFrame *frame) { } -output_t *output_new(const char *filename) { +output_t *output_new(const char *path, const char *filename) { output_t *ret = g_slice_alloc0(sizeof(*ret)); - g_strlcpy(ret->filename, filename, sizeof(ret->filename)); + g_strlcpy(ret->file_path, path, sizeof(ret->file_path)); + g_strlcpy(ret->file_name, filename, sizeof(ret->file_name)); + snprintf(ret->full_filename, sizeof(ret->full_filename), "%s/%s", path, filename); format_init(&ret->requested_format); format_init(&ret->actual_format); + ret->file_format = output_file_format; return ret; } @@ -171,7 +153,7 @@ int output_config(output_t *output, const format_t *requested_format, format_t * output->fmtctx = avformat_alloc_context(); if (!output->fmtctx) goto err; - output->fmtctx->oformat = av_guess_format(output_file_format, NULL, NULL); + output->fmtctx->oformat = av_guess_format(output->file_format, NULL, NULL); err = "failed to determine output format"; if (!output->fmtctx->oformat) goto err; @@ -222,7 +204,7 @@ int output_config(output_t *output, const format_t *requested_format, format_t * char full_fn[PATH_MAX]; char suff[16] = ""; for (int i = 1; i < 20; i++) { - snprintf(full_fn, sizeof(full_fn), "%s%s.%s", output->filename, suff, output_file_format); + snprintf(full_fn, sizeof(full_fn), "%s%s.%s", output->full_filename, suff, output->file_format); if (!g_file_test(full_fn, G_FILE_TEST_EXISTS)) goto got_fn; snprintf(suff, sizeof(suff), "-%i", i); @@ -263,6 +245,7 @@ got_fn: done: *actual_format = output->actual_format; + db_config_stream(output); return 0; err: @@ -304,6 +287,7 @@ void output_close(output_t *output) { if (!output) return; output_shutdown(output); + db_close_stream(output); g_slice_free1(sizeof(*output), output); } diff --git a/recording-daemon/output.h b/recording-daemon/output.h index 44dd34268..c7cd7e61b 100644 --- a/recording-daemon/output.h +++ b/recording-daemon/output.h @@ -10,7 +10,7 @@ extern int mp3_bitrate; void output_init(const char *format); -output_t *output_new(const char *filename); +output_t *output_new(const char *path, const char *filename); void output_close(output_t *); int output_config(output_t *output, const format_t *requested_format, format_t *actual_format); diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 7e5dfaac8..4670b7052 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -12,6 +12,7 @@ #include "rtcplib.h" #include "main.h" #include "output.h" +#include "db.h" static int ptr_cmp(const void *a, const void *b, void *dummy) { @@ -43,7 +44,8 @@ void ssrc_free(void *p) { // mf must be unlocked; returns ssrc locked -static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) { +static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) { + metafile_t *mf = stream->metafile; pthread_mutex_lock(&mf->lock); ssrc_t *ret = g_hash_table_lookup(mf->ssrc_hash, GUINT_TO_POINTER(ssrc)); if (ret) @@ -52,14 +54,17 @@ static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) { ret = g_slice_alloc0(sizeof(*ret)); pthread_mutex_init(&ret->lock, NULL); ret->metafile = mf; + ret->stream = stream; ret->ssrc = ssrc; ret->packets = g_tree_new_full(ptr_cmp, NULL, NULL, packet_free); ret->seq = -1; char buf[256]; - snprintf(buf, sizeof(buf), "%s/%s-%08lx", output_dir, mf->parent, ssrc); - if (output_single) - ret->output = output_new(buf); + snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); + if (output_single) { + ret->output = output_new(output_dir, buf); + db_do_stream(mf, ret->output, "single", stream->id, ssrc); + } g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret); @@ -228,7 +233,7 @@ void packet_process(stream_t *stream, unsigned char *buf, unsigned len) { dbg("packet parsed successfully, seq %u", packet->seq); // insert into ssrc queue - ssrc_t *ssrc = ssrc_get(stream->metafile, ntohl(packet->rtp->ssrc)); + ssrc_t *ssrc = ssrc_get(stream, ntohl(packet->rtp->ssrc)); // check seq for dupes if (G_UNLIKELY(ssrc->seq == -1)) { diff --git a/recording-daemon/types.h b/recording-daemon/types.h index f87be6df9..c84f82ea2 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -7,6 +7,10 @@ #include #include #include +#include +#include +#include +#include #include "str.h" @@ -28,6 +32,8 @@ struct mix_s; typedef struct mix_s mix_t; struct resample_s; typedef struct resample_s resample_t; +struct format_s; +typedef struct format_s format_t; typedef void handler_func(handler_t *); @@ -66,6 +72,7 @@ typedef struct packet_s packet_t; struct ssrc_s { pthread_mutex_t lock; + stream_t *stream; metafile_t *metafile; unsigned long ssrc; GTree *packets; // contains packet_t objects @@ -82,6 +89,7 @@ struct metafile_s { char *parent; char *call_id; off_t pos; + unsigned long long db_id; GStringChunk *gsc; // XXX limit max size @@ -109,7 +117,28 @@ struct format_s { int channels; int format; // enum AVSampleFormat }; -typedef struct format_s format_t; + + +struct output_s { + char full_filename[PATH_MAX], // path + filename + file_path[PATH_MAX], + file_name[PATH_MAX]; + const char *file_format; + unsigned long long db_id; + + format_t requested_format, + actual_format; + + AVCodecContext *avcctx; + AVFormatContext *fmtctx; + AVStream *avst; + AVPacket avpkt; + AVAudioFifo *fifo; + int64_t fifo_pts; // pts of first data in fifo + int64_t mux_dts; // last dts passed to muxer + AVFrame *frame; +}; + INLINE int format_eq(const format_t *a, const format_t *b) { if (G_UNLIKELY(a->clockrate != b->clockrate))