Browse Source

TT#10156 write recordings metadata into mysql database

Change-Id: If50b0528520fe816ed63494dc6c6e707fe9eca2c
changes/94/11294/26
Richard Fuchs 9 years ago
parent
commit
f5cc21f92b
12 changed files with 337 additions and 36 deletions
  1. +1
    -0
      debian/control
  2. +3
    -1
      recording-daemon/Makefile
  3. +231
    -0
      recording-daemon/db.c
  4. +14
    -0
      recording-daemon/db.h
  5. +12
    -0
      recording-daemon/epoll.c
  6. +13
    -0
      recording-daemon/main.c
  7. +5
    -0
      recording-daemon/main.h
  8. +7
    -2
      recording-daemon/metafile.c
  9. +10
    -26
      recording-daemon/output.c
  10. +1
    -1
      recording-daemon/output.h
  11. +10
    -5
      recording-daemon/packet.c
  12. +30
    -1
      recording-daemon/types.h

+ 1
- 0
debian/control View File

@ -4,6 +4,7 @@ Priority: extra
Maintainer: Sipwise Development Team <support@sipwise.com>
Build-Depends:
debhelper (>= 9~),
default-libmysqlclient-dev | libmysqlclient-dev,
iptables-dev (>= 1.4),
libavcodec-dev (>= 6:10),
libavfilter-dev (>= 6:10),


+ 3
- 1
recording-daemon/Makefile View File

@ -10,6 +10,7 @@ CFLAGS+= `pkg-config --cflags libavformat`
CFLAGS+= `pkg-config --cflags libavutil`
CFLAGS+= `pkg-config --cflags libavresample`
CFLAGS+= `pkg-config --cflags libavfilter`
CFLAGS+= `mysql_config --cflags`
LDFLAGS= -lm
LDFLAGS+= `pkg-config --libs glib-2.0`
@ -19,11 +20,12 @@ LDFLAGS+= `pkg-config --libs libavformat`
LDFLAGS+= `pkg-config --libs libavutil`
LDFLAGS+= `pkg-config --libs libavresample`
LDFLAGS+= `pkg-config --libs libavfilter`
LDFLAGS+= `mysql_config --libs`
include ../lib/lib.Makefile
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c resample.c
decoder.c output.c mix.c resample.c db.c
LIBSRCS= loglib.c auxlib.c rtplib.c
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o)


+ 231
- 0
recording-daemon/db.c View File

@ -0,0 +1,231 @@
#include "db.h"
#include <mysql.h>
#include <glib.h>
#include <string.h>
#include "types.h"
#include "main.h"
#include "log.h"
static MYSQL __thread *mysql_conn;
static MYSQL_STMT __thread
*stm_insert_call,
*stm_close_call,
*stm_insert_stream,
*stm_close_stream,
*stm_config_stream;
static void my_stmt_close(MYSQL_STMT **st) {
if (!*st)
return;
mysql_stmt_close(*st);
*st = NULL;
}
static void reset_conn() {
my_stmt_close(&stm_insert_call);
my_stmt_close(&stm_close_call);
my_stmt_close(&stm_insert_stream);
my_stmt_close(&stm_close_stream);
my_stmt_close(&stm_config_stream);
mysql_close(mysql_conn);
mysql_conn = NULL;
}
INLINE int prep(MYSQL_STMT **st, const char *str) {
*st = mysql_stmt_init(mysql_conn);
if (!*st)
return -1;
if (mysql_stmt_prepare(*st, str, strlen(str))) {
ilog(LOG_ERR, "Failed to prepare statement '%s': %s", str, mysql_stmt_error(*st));
return -1;
}
return 0;
}
static int check_conn() {
if (mysql_conn)
return 0;
if (!c_mysql_host || !c_mysql_db)
return -1;
ilog(LOG_DEBUG, "connecting to MySQL");
mysql_conn = mysql_init(NULL);
if (!mysql_conn)
goto err;
if (!mysql_real_connect(mysql_conn, c_mysql_host, c_mysql_user, c_mysql_pass, c_mysql_db, c_mysql_port,
NULL, CLIENT_IGNORE_SIGPIPE))
goto err;
if (mysql_select_db(mysql_conn, c_mysql_db))
goto err;
if (mysql_autocommit(mysql_conn, 0))
goto err;
if (prep(&stm_insert_call, "insert into recording_calls (call_id) values (?)"))
goto err;
if (prep(&stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \
"file_format, " \
"output_type, " \
"stream_id, ssrc) values (?,?,?,?,?,?,?)"))
goto err;
if (prep(&stm_close_call, "update recording_calls set end_time = now() where id = ?"))
goto err;
if (prep(&stm_close_stream, "update recording_streams set end_time = now() where id = ?"))
goto err;
if (prep(&stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?"))
goto err;
ilog(LOG_INFO, "Connection to MySQL established");
return 0;
err:
if (mysql_conn) {
ilog(LOG_ERR, "Failed to connect to MySQL: %s", mysql_error(mysql_conn));
reset_conn();
}
else
ilog(LOG_ERR, "Failed to connect to MySQL: out of memory");
return -1;
}
INLINE void my_str(MYSQL_BIND *b, const char *s) {
*b = (MYSQL_BIND) {
.buffer_type = MYSQL_TYPE_STRING,
.buffer = (void *) s,
.buffer_length = strlen(s),
.length = &b->buffer_length,
};
}
INLINE void my_ull(MYSQL_BIND *b, const unsigned long long *ull) {
*b = (MYSQL_BIND) {
.buffer_type = MYSQL_TYPE_LONGLONG,
.buffer = (void *) ull,
.buffer_length = sizeof(*ull),
.is_unsigned = 1,
};
}
INLINE void my_i(MYSQL_BIND *b, const int *i) {
*b = (MYSQL_BIND) {
.buffer_type = MYSQL_TYPE_LONG,
.buffer = (void *) i,
.buffer_length = sizeof(*i),
.is_unsigned = 0,
};
}
static void execute_wrap(MYSQL_STMT **stmt, MYSQL_BIND *binds, unsigned long long *auto_id) {
for (int retr = 0; retr < 5; retr++) {
if (mysql_stmt_bind_param(*stmt, binds))
goto err;
if (mysql_stmt_execute(*stmt))
goto err;
if (auto_id) {
*auto_id = mysql_insert_id(mysql_conn);
if (*auto_id <= 0)
goto err;
}
if (mysql_commit(mysql_conn))
goto err;
return;
err:
ilog(LOG_ERR, "Failed to bind or execute prepared statement: %s",
mysql_stmt_error(*stmt));
if (retr > 2) {
reset_conn();
if (check_conn())
return;
}
}
}
void db_do_call(metafile_t *mf) {
if (check_conn())
return;
if (mf->db_id > 0)
return;
MYSQL_BIND b[1];
my_str(&b[0], mf->call_id);
execute_wrap(&stm_insert_call, b, &mf->db_id);
}
void db_do_stream(metafile_t *mf, output_t *op, const char *type, unsigned int id, unsigned long ssrc) {
if (check_conn())
return;
if (mf->db_id <= 0)
return;
if (op->db_id > 0)
return;
MYSQL_BIND b[7];
my_ull(&b[0], &mf->db_id);
my_str(&b[1], op->file_name);
my_str(&b[2], op->full_filename);
my_str(&b[3], op->file_format);
my_str(&b[4], type);
b[5] = (MYSQL_BIND) {
.buffer_type = MYSQL_TYPE_LONG,
.buffer = &id,
.buffer_length = sizeof(id),
.is_unsigned = 1,
};
b[6] = (MYSQL_BIND) {
.buffer_type = MYSQL_TYPE_LONG,
.buffer = &ssrc,
.buffer_length = sizeof(ssrc),
.is_unsigned = 1,
};
execute_wrap(&stm_insert_stream, b, &op->db_id);
}
void db_close_call(metafile_t *mf) {
if (check_conn())
return;
if (mf->db_id <= 0)
return;
MYSQL_BIND b[1];
my_ull(&b[0], &mf->db_id);
execute_wrap(&stm_close_call, b, NULL);
}
void db_close_stream(output_t *op) {
if (check_conn())
return;
if (op->db_id <= 0)
return;
MYSQL_BIND b[1];
my_ull(&b[0], &op->db_id);
execute_wrap(&stm_close_stream, b, NULL);
}
void db_config_stream(output_t *op) {
if (check_conn())
return;
if (op->db_id <= 0)
return;
MYSQL_BIND b[3];
my_i(&b[0], &op->actual_format.channels);
my_i(&b[1], &op->actual_format.clockrate);
my_ull(&b[2], &op->db_id);
execute_wrap(&stm_config_stream, b, NULL);
}

+ 14
- 0
recording-daemon/db.h View File

@ -0,0 +1,14 @@
#ifndef _DB_H_
#define _DB_H_
#include "types.h"
void db_do_call(metafile_t *);
void db_close_call(metafile_t *);
void db_do_stream(metafile_t *mf, output_t *op, const char *type, unsigned int id, unsigned long ssrc);
void db_close_stream(output_t *op);
void db_config_stream(output_t *op);
#endif

+ 12
- 0
recording-daemon/epoll.c View File

@ -3,6 +3,7 @@
#include <glib.h>
#include <pthread.h>
#include <unistd.h>
#include <mysql.h>
#include "log.h"
#include "main.h"
#include "garbage.h"
@ -31,12 +32,21 @@ void epoll_del(int fd) {
}
static void poller_thread_end(void *ptr) {
mysql_thread_end();
}
void *poller_thread(void *ptr) {
struct epoll_event epev;
unsigned int me_num = GPOINTER_TO_UINT(ptr);
dbg("poller thread %u running", me_num);
mysql_thread_init();
pthread_cleanup_push(poller_thread_end, NULL);
while (!shutdown_flag) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
int ret = epoll_wait(epoll_fd, &epev, 1, 10000);
@ -58,6 +68,8 @@ void *poller_thread(void *ptr) {
garbage_collect(me_num);
}
pthread_cleanup_pop(1);
return NULL;
}


+ 13
- 0
recording-daemon/main.c View File

@ -11,6 +11,7 @@
#include <libavfilter/avfilter.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <mysql.h>
#include "log.h"
#include "epoll.h"
#include "inotify.h"
@ -30,6 +31,11 @@ const char *output_dir = "/var/lib/rtpengine-recording";
static const char *output_format = "wav";
int output_mixed;
int output_single;
const char *c_mysql_host,
*c_mysql_user,
*c_mysql_pass,
*c_mysql_db;
int c_mysql_port;
static GQueue threads = G_QUEUE_INIT; // only accessed from main thread
@ -68,6 +74,7 @@ static void setup(void) {
avcodec_register_all();
avfilter_register_all();
avformat_network_init();
mysql_library_init(0, NULL, NULL);
signals();
metafile_setup();
epoll_setup();
@ -130,6 +137,7 @@ static void cleanup(void) {
metafile_cleanup();
inotify_cleanup();
epoll_cleanup();
mysql_library_end();
}
@ -144,6 +152,11 @@ static void options(int *argc, char ***argv) {
{ "mp3-bitrate", 0, 0, G_OPTION_ARG_INT, &mp3_bitrate, "Bits per second for MP3 encoding", "INT" },
{ "output-mixed", 0, 0, G_OPTION_ARG_NONE, &output_mixed, "Mix participating sources into a single output",NULL },
{ "output-single", 0, 0, G_OPTION_ARG_NONE, &output_single, "Create one output file for each source",NULL },
{ "mysql-host", 0, 0, G_OPTION_ARG_STRING, &c_mysql_host, "MySQL host for storage of call metadata","HOST|IP" },
{ "mysql-port", 0, 0, G_OPTION_ARG_INT, &c_mysql_port, "MySQL port" ,"INT" },
{ "mysql-user", 0, 0, G_OPTION_ARG_STRING, &c_mysql_user, "MySQL connection credentials", "USERNAME" },
{ "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &c_mysql_pass, "MySQL connection credentials", "PASSWORD" },
{ "mysql-db", 0, 0, G_OPTION_ARG_STRING, &c_mysql_db, "MySQL database name", "STRING" },
{ NULL, }
};


+ 5
- 0
recording-daemon/main.h View File

@ -8,6 +8,11 @@ extern const char *spool_dir;
extern const char *output_dir;
extern int output_mixed;
extern int output_single;
extern const char *c_mysql_host,
*c_mysql_user,
*c_mysql_pass,
*c_mysql_db;
extern int c_mysql_port;
extern volatile int shutdown_flag;


+ 7
- 2
recording-daemon/metafile.c View File

@ -14,6 +14,7 @@
#include "packet.h"
#include "output.h"
#include "mix.h"
#include "db.h"
static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER;
@ -47,17 +48,21 @@ static void meta_destroy(metafile_t *mf) {
stream_close(stream);
pthread_mutex_unlock(&stream->lock);
}
db_close_call(mf);
}
// mf is locked
static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) {
db_do_call(mf);
pthread_mutex_lock(&mf->mix_lock);
if (!mf->mix && output_mixed) {
char buf[256];
snprintf(buf, sizeof(buf), "%s/%s-mix", output_dir, mf->parent);
mf->mix_out = output_new(buf);
snprintf(buf, sizeof(buf), "%s-mix", mf->parent);
mf->mix_out = output_new(output_dir, buf);
mf->mix = mix_new();
db_do_stream(mf, mf->mix_out, "mixed", 0, 0);
}
pthread_mutex_unlock(&mf->mix_lock);


+ 10
- 26
recording-daemon/output.c View File

@ -1,32 +1,11 @@
#include "output.h"
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/audio_fifo.h>
#include <libavutil/channel_layout.h>
#include <libavutil/samplefmt.h>
#include <limits.h>
#include <string.h>
#include <stdint.h>
#include <glib.h>
#include "log.h"
struct output_s {
char filename[PATH_MAX];
format_t requested_format,
actual_format;
AVCodecContext *avcctx;
AVFormatContext *fmtctx;
AVStream *avst;
AVPacket avpkt;
AVAudioFifo *fifo;
int64_t fifo_pts; // pts of first data in fifo
int64_t mux_dts; // last dts passed to muxer
AVFrame *frame;
};
#include "db.h"
static int output_codec_id;
@ -147,11 +126,14 @@ int output_add(output_t *output, AVFrame *frame) {
}
output_t *output_new(const char *filename) {
output_t *output_new(const char *path, const char *filename) {
output_t *ret = g_slice_alloc0(sizeof(*ret));
g_strlcpy(ret->filename, filename, sizeof(ret->filename));
g_strlcpy(ret->file_path, path, sizeof(ret->file_path));
g_strlcpy(ret->file_name, filename, sizeof(ret->file_name));
snprintf(ret->full_filename, sizeof(ret->full_filename), "%s/%s", path, filename);
format_init(&ret->requested_format);
format_init(&ret->actual_format);
ret->file_format = output_file_format;
return ret;
}
@ -171,7 +153,7 @@ int output_config(output_t *output, const format_t *requested_format, format_t *
output->fmtctx = avformat_alloc_context();
if (!output->fmtctx)
goto err;
output->fmtctx->oformat = av_guess_format(output_file_format, NULL, NULL);
output->fmtctx->oformat = av_guess_format(output->file_format, NULL, NULL);
err = "failed to determine output format";
if (!output->fmtctx->oformat)
goto err;
@ -222,7 +204,7 @@ int output_config(output_t *output, const format_t *requested_format, format_t *
char full_fn[PATH_MAX];
char suff[16] = "";
for (int i = 1; i < 20; i++) {
snprintf(full_fn, sizeof(full_fn), "%s%s.%s", output->filename, suff, output_file_format);
snprintf(full_fn, sizeof(full_fn), "%s%s.%s", output->full_filename, suff, output->file_format);
if (!g_file_test(full_fn, G_FILE_TEST_EXISTS))
goto got_fn;
snprintf(suff, sizeof(suff), "-%i", i);
@ -263,6 +245,7 @@ got_fn:
done:
*actual_format = output->actual_format;
db_config_stream(output);
return 0;
err:
@ -304,6 +287,7 @@ void output_close(output_t *output) {
if (!output)
return;
output_shutdown(output);
db_close_stream(output);
g_slice_free1(sizeof(*output), output);
}


+ 1
- 1
recording-daemon/output.h View File

@ -10,7 +10,7 @@ extern int mp3_bitrate;
void output_init(const char *format);
output_t *output_new(const char *filename);
output_t *output_new(const char *path, const char *filename);
void output_close(output_t *);
int output_config(output_t *output, const format_t *requested_format, format_t *actual_format);


+ 10
- 5
recording-daemon/packet.c View File

@ -12,6 +12,7 @@
#include "rtcplib.h"
#include "main.h"
#include "output.h"
#include "db.h"
static int ptr_cmp(const void *a, const void *b, void *dummy) {
@ -43,7 +44,8 @@ void ssrc_free(void *p) {
// mf must be unlocked; returns ssrc locked
static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) {
static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) {
metafile_t *mf = stream->metafile;
pthread_mutex_lock(&mf->lock);
ssrc_t *ret = g_hash_table_lookup(mf->ssrc_hash, GUINT_TO_POINTER(ssrc));
if (ret)
@ -52,14 +54,17 @@ static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) {
ret = g_slice_alloc0(sizeof(*ret));
pthread_mutex_init(&ret->lock, NULL);
ret->metafile = mf;
ret->stream = stream;
ret->ssrc = ssrc;
ret->packets = g_tree_new_full(ptr_cmp, NULL, NULL, packet_free);
ret->seq = -1;
char buf[256];
snprintf(buf, sizeof(buf), "%s/%s-%08lx", output_dir, mf->parent, ssrc);
if (output_single)
ret->output = output_new(buf);
snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc);
if (output_single) {
ret->output = output_new(output_dir, buf);
db_do_stream(mf, ret->output, "single", stream->id, ssrc);
}
g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret);
@ -228,7 +233,7 @@ void packet_process(stream_t *stream, unsigned char *buf, unsigned len) {
dbg("packet parsed successfully, seq %u", packet->seq);
// insert into ssrc queue
ssrc_t *ssrc = ssrc_get(stream->metafile, ntohl(packet->rtp->ssrc));
ssrc_t *ssrc = ssrc_get(stream, ntohl(packet->rtp->ssrc));
// check seq for dupes
if (G_UNLIKELY(ssrc->seq == -1)) {


+ 30
- 1
recording-daemon/types.h View File

@ -7,6 +7,10 @@
#include <glib.h>
#include <libavutil/frame.h>
#include <libavresample/avresample.h>
#include <libavformat/avformat.h>
#include <libavutil/channel_layout.h>
#include <libavutil/samplefmt.h>
#include <libavutil/audio_fifo.h>
#include "str.h"
@ -28,6 +32,8 @@ struct mix_s;
typedef struct mix_s mix_t;
struct resample_s;
typedef struct resample_s resample_t;
struct format_s;
typedef struct format_s format_t;
typedef void handler_func(handler_t *);
@ -66,6 +72,7 @@ typedef struct packet_s packet_t;
struct ssrc_s {
pthread_mutex_t lock;
stream_t *stream;
metafile_t *metafile;
unsigned long ssrc;
GTree *packets; // contains packet_t objects
@ -82,6 +89,7 @@ struct metafile_s {
char *parent;
char *call_id;
off_t pos;
unsigned long long db_id;
GStringChunk *gsc; // XXX limit max size
@ -109,7 +117,28 @@ struct format_s {
int channels;
int format; // enum AVSampleFormat
};
typedef struct format_s format_t;
struct output_s {
char full_filename[PATH_MAX], // path + filename
file_path[PATH_MAX],
file_name[PATH_MAX];
const char *file_format;
unsigned long long db_id;
format_t requested_format,
actual_format;
AVCodecContext *avcctx;
AVFormatContext *fmtctx;
AVStream *avst;
AVPacket avpkt;
AVAudioFifo *fifo;
int64_t fifo_pts; // pts of first data in fifo
int64_t mux_dts; // last dts passed to muxer
AVFrame *frame;
};
INLINE int format_eq(const format_t *a, const format_t *b) {
if (G_UNLIKELY(a->clockrate != b->clockrate))


Loading…
Cancel
Save