diff --git a/daemon/call.c b/daemon/call.c index cee70bdf6..6378d1e7a 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1464,6 +1464,7 @@ static void __rtp_payload_types(struct call_media *media, GQueue *types) { /* we steal the entire list to avoid duplicate allocs */ while ((pt = g_queue_pop_head(types))) { /* but we must duplicate the contents */ + call_str_cpy(call, &pt->encoding_with_params, &pt->encoding_with_params); call_str_cpy(call, &pt->encoding, &pt->encoding); call_str_cpy(call, &pt->encoding_parameters, &pt->encoding_parameters); g_hash_table_replace(media->rtp_payload_types, &pt->payload_type, pt); @@ -1726,6 +1727,8 @@ init: /* we are now ready to fire up ICE if so desired and requested */ ice_update(other_media->ice_agent, sp); ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */ + + recording_setup_media(other_media); } return 0; @@ -1999,13 +2002,9 @@ void call_destroy(struct call *c) { md->protocol ? md->protocol->name : "(unknown)" if (!rtp_pt) ilog(LOG_INFO, MLL_PREFIX "unknown codec", MLL_COMMON); - else if (!rtp_pt->encoding_parameters.s) - ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u", MLL_COMMON, - STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate); else - ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u/"STR_FORMAT"", MLL_COMMON, - STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate, - STR_FMT(&rtp_pt->encoding_parameters)); + ilog(LOG_INFO, MLL_PREFIX STR_FORMAT, MLL_COMMON, + STR_FMT(&rtp_pt->encoding_with_params)); /* add PayloadType(codec) info in CDR logging */ if (_log_facility_cdr && rtp_pt) { diff --git a/daemon/recording.c b/daemon/recording.c index 9d0526536..a838dfd84 100644 --- a/daemon/recording.c +++ b/daemon/recording.c @@ -44,6 +44,7 @@ static void finish_proc(struct call *); static void dump_packet_proc(struct recording *recording, struct packet_stream *sink, const str *s); static void init_stream_proc(struct packet_stream *); static void setup_stream_proc(struct packet_stream *); +static void setup_media_proc(struct call_media *); static void kernel_info_proc(struct packet_stream *, struct rtpengine_target_info *); @@ -71,6 +72,7 @@ static const struct recording_method methods[] = { .finish = finish_proc, .init_stream_struct = init_stream_proc, .setup_stream = setup_stream_proc, + .setup_media = setup_media_proc, .stream_kernel_info = kernel_info_proc, }, }; @@ -672,6 +674,26 @@ static void setup_stream_proc(struct packet_stream *stream) { append_meta_chunk(recording, buf, len, "STREAM %u interface", stream->unique_id); } +static void setup_media_proc(struct call_media *media) { + struct call *call = media->call; + struct recording *recording = call->recording; + + if (!recording) + return; + + GList *pltypes = g_hash_table_get_values(media->rtp_payload_types); + + for (GList *l = pltypes; l; l = l->next) { + struct rtp_payload_type *pt = l->data; + append_meta_chunk(recording, pt->encoding_with_params.s, pt->encoding_with_params.len, + "MEDIA %u PAYLOAD TYPE %u", media->unique_id, pt->payload_type); + } + + g_list_free(pltypes); +} + + + static void dump_packet_proc(struct recording *recording, struct packet_stream *stream, const str *s) { if (stream->recording.proc.stream_idx == UNINIT_IDX) return; diff --git a/daemon/recording.h b/daemon/recording.h index 63bc594ec..fdd6b588d 100644 --- a/daemon/recording.h +++ b/daemon/recording.h @@ -22,6 +22,7 @@ struct call; enum call_opmode; struct rtpengine_target_info; struct call_monologue; +struct call_media; struct recording_pcap { @@ -77,6 +78,7 @@ struct recording_method { void (*init_stream_struct)(struct packet_stream *); void (*setup_stream)(struct packet_stream *); + void (*setup_media)(struct call_media *); void (*stream_kernel_info)(struct packet_stream *, struct rtpengine_target_info *); }; @@ -185,6 +187,7 @@ void recording_finish(struct call *); #define recording_setup_stream(args...) _rm(setup_stream, args) +#define recording_setup_media(args...) _rm(setup_media, args) #define recording_init_stream(args...) _rm(init_stream_struct, args) #define recording_stream_kernel_info(args...) _rm(stream_kernel_info, args) #define recording_meta_chunk(args...) _rm(meta_chunk, args) diff --git a/daemon/rtp.h b/daemon/rtp.h index d5be29bd9..d4a230843 100644 --- a/daemon/rtp.h +++ b/daemon/rtp.h @@ -21,6 +21,7 @@ struct rtp_header { struct rtp_payload_type { unsigned int payload_type; + str encoding_with_params; str encoding; unsigned int clock_rate; str encoding_parameters; diff --git a/daemon/sdp.c b/daemon/sdp.c index 75a609705..2e9dabf22 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -705,6 +705,8 @@ static int parse_attribute_rtpmap(struct sdp_attribute *output) { a = &output->u.rtpmap; pt = &a->rtp_pt; + pt->encoding_with_params = a->encoding_str; + pt->payload_type = strtoul(a->payload_type_str.s, &ep, 10); if (ep == a->payload_type_str.s) return -1; diff --git a/debian/ngcp-rtpengine-daemon.install b/debian/ngcp-rtpengine-daemon.install index 11a8ccb4d..ba2780462 100644 --- a/debian/ngcp-rtpengine-daemon.install +++ b/debian/ngcp-rtpengine-daemon.install @@ -1 +1,2 @@ daemon/rtpengine /usr/sbin/ +recording-daemon/rtpengine-recording /usr/sbin/ diff --git a/debian/rules b/debian/rules index 36087d1b0..43abde60a 100755 --- a/debian/rules +++ b/debian/rules @@ -38,12 +38,14 @@ build-stamp: dh_testdir make -C iptables-extension make -C daemon -j`nproc` + make -C recording-daemon -j`nproc` touch $@ clean: dh_testdir dh_testroot cd daemon && $(MAKE) clean && cd .. + cd recording-daemon && $(MAKE) clean && cd .. rm -f build-stamp rm -f iptables-extension/libxt_RTPENGINE.so rm -f daemon/rtpengine daemon/build_time.h daemon/.depend kernel-module/.xt_RTPENGINE.o.d diff --git a/recording-daemon/.gitignore b/recording-daemon/.gitignore new file mode 100644 index 000000000..8577790bc --- /dev/null +++ b/recording-daemon/.gitignore @@ -0,0 +1,5 @@ +.depend +*.o +core +core.* +.ycm_extra_conf.pyc diff --git a/recording-daemon/.ycm_extra_conf.py b/recording-daemon/.ycm_extra_conf.py new file mode 100644 index 000000000..25eb311e7 --- /dev/null +++ b/recording-daemon/.ycm_extra_conf.py @@ -0,0 +1,102 @@ +import os +import ycm_core +from clang_helpers import PrepareClangFlags + +# Set this to the absolute path to the folder (NOT the file!) containing the +# compile_commands.json file to use that instead of 'flags'. See here for +# more details: http://clang.llvm.org/docs/JSONCompilationDatabase.html +# Most projects will NOT need to set this to anything; you can just change the +# 'flags' list of compilation flags. Notice that YCM itself uses that approach. +compilation_database_folder = '' + +# These are the compilation flags that will be used in case there's no +# compilation database set. +flags = [ +'-g', +'-Wall', +'-pthread', +'-fno-strict-aliasing', +'-I/usr/include/glib-2.0', +'-I/usr/lib/x86_64-linux-gnu/glib-2.0/include', +'-pthread', +'-D_GNU_SOURCE', +'-D__DEBUG=1', +'-D__YCM=1', +'-O2', +'-fstack-protector', +'--param=ssp-buffer-size=4', +'-Wformat', +'-Werror=format-security', +'-D_FORTIFY_SOURCE=2', +# THIS IS IMPORTANT! Without a "-std=" flag, clang won't know which +# language to use when compiling headers. So it will guess. Badly. So C++ +# headers will be compiled as C headers. You don't want that so ALWAYS specify +# a "-std=". +# For a C project, you would set this to something like 'c99' instead of +# 'c++11'. +'-std=c99', +# ...and the same thing goes for the magic -x option which specifies the +# language that the files to be compiled are written in. This is mostly +# relevant for c++ headers. +# For a C project, you would set this to 'c' instead of 'c++'. +'-x', +'c', +] + +if compilation_database_folder: + database = ycm_core.CompilationDatabase( compilation_database_folder ) +else: + database = None + + +def DirectoryOfThisScript(): + return os.path.dirname( os.path.abspath( __file__ ) ) + + +def MakeRelativePathsInFlagsAbsolute( flags, working_directory ): + if not working_directory: + return flags + new_flags = [] + make_next_absolute = False + path_flags = [ '-isystem', '-I', '-iquote', '--sysroot=' ] + for flag in flags: + new_flag = flag + + if make_next_absolute: + make_next_absolute = False + if not flag.startswith( '/' ): + new_flag = os.path.join( working_directory, flag ) + + for path_flag in path_flags: + if flag == path_flag: + make_next_absolute = True + break + + if flag.startswith( path_flag ): + path = flag[ len( path_flag ): ] + new_flag = path_flag + os.path.join( working_directory, path ) + break + + if new_flag: + new_flags.append( new_flag ) + return new_flags + + +def FlagsForFile( filename ): + if database: + # Bear in mind that compilation_info.compiler_flags_ does NOT return a + # python list, but a "list-like" StringVec object + compilation_info = database.GetCompilationInfoForFile( filename ) + final_flags = PrepareClangFlags( + MakeRelativePathsInFlagsAbsolute( + compilation_info.compiler_flags_, + compilation_info.compiler_working_dir_ ), + filename ) + else: + relative_to = DirectoryOfThisScript() + final_flags = MakeRelativePathsInFlagsAbsolute( flags, relative_to ) + + return { + 'flags': final_flags, + 'do_cache': True + } diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile new file mode 100644 index 000000000..cd38772d0 --- /dev/null +++ b/recording-daemon/Makefile @@ -0,0 +1,58 @@ +TARGET= rtpengine-recording + +CC?=gcc +CFLAGS= -g -Wall -pthread +CFLAGS+= -std=c99 +CFLAGS+= -D_GNU_SOURCE -D_POSIX_SOURCE -D_POSIX_C_SOURCE +CFLAGS+= `pkg-config --cflags glib-2.0` +CFLAGS+= `pkg-config --cflags gthread-2.0` +#CFLAGS+= `pcre-config --cflags` + +ifeq ($(DBG),yes) +CFLAGS+= -D__DEBUG=1 +else +CFLAGS+= -O3 +endif + +LDFLAGS= -lm +LDFLAGS+= `pkg-config --libs glib-2.0` +LDFLAGS+= `pkg-config --libs gthread-2.0` +#LDFLAGS+= `pcre-config --libs` + +ifneq ($(DBG),yes) + DPKG_BLDFLGS= $(shell which dpkg-buildflags 2>/dev/null) + ifneq ($(DPKG_BLDFLGS),) + # support http://wiki.debian.org/Hardening for >=wheezy + CFLAGS+= `dpkg-buildflags --get CFLAGS` + CPPFLAGS+= `dpkg-buildflags --get CPPFLAGS` + LDFLAGS+= `dpkg-buildflags --get LDFLAGS` + endif +endif + +SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c aux.c +OBJS= $(SRCS:.c=.o) + + +.PHONY: all dep clean tests debug + +all: + $(MAKE) $(TARGET) + +debug: + $(MAKE) DBG=yes all + +dep: .depend + +clean: + rm -f $(OBJS) $(TARGET) .depend core core.* + +.depend: $(SRCS) Makefile + $(CC) $(CFLAGS) -M $(SRCS) | sed -e 's/:/ .depend:/' > .depend + +$(TARGET): $(OBJS) .depend Makefile + $(CC) $(CFLAGS) -o $@ $(OBJS) $(LDFLAGS) + +$(OBJS): Makefile + + +include .depend diff --git a/recording-daemon/aux.c b/recording-daemon/aux.c new file mode 100644 index 000000000..67bd59502 --- /dev/null +++ b/recording-daemon/aux.c @@ -0,0 +1,21 @@ +#include "aux.h" +#include +#include + + +int __thread __sscanf_hack_var; + + +int __sscanf_match(const char *str, const char *fmt, ...) { + va_list ap; + + __sscanf_hack_var = 0; // to make sure that sscanf consumes the entire string + + va_start(ap, fmt); + int ret = vsscanf(str, fmt, ap); + va_end(ap); + + if (__sscanf_hack_var == 0) + return 0; + return ret; +} diff --git a/recording-daemon/aux.h b/recording-daemon/aux.h new file mode 100644 index 000000000..b00d6240b --- /dev/null +++ b/recording-daemon/aux.h @@ -0,0 +1,9 @@ +#ifndef _AUX_H_ +#define _AUX_H_ + +extern int __thread __sscanf_hack_var; + +#define sscanf_match(str, format, ...) __sscanf_match(str, format "%n", ##__VA_ARGS__, &__sscanf_hack_var) +int __sscanf_match(const char *str, const char *fmt, ...) __attribute__ ((__format__ (__scanf__, 2, 3))); + +#endif diff --git a/recording-daemon/epoll.c b/recording-daemon/epoll.c new file mode 100644 index 000000000..b03933ba6 --- /dev/null +++ b/recording-daemon/epoll.c @@ -0,0 +1,64 @@ +#include "epoll.h" +#include +#include +#include +#include +#include "log.h" +#include "main.h" +#include "garbage.h" + + +static int epoll_fd = -1; + + +void epoll_setup(void) { + epoll_fd = epoll_create1(0); + if (epoll_fd == -1) + die_errno("epoll_create1 failed"); + +} + + +int epoll_add(int fd, uint32_t events, handler_t *handler) { + struct epoll_event epev = { .events = events | EPOLLET, .data = { .ptr = handler } }; + int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &epev); + return ret; +} + + +void epoll_del(int fd) { + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); +} + + +void *poller_thread(void *ptr) { + struct epoll_event epev; + unsigned int me_num = GPOINTER_TO_UINT(ptr); + + dbg("poller thread %u running", me_num); + + while (!shutdown_flag) { + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + int ret = epoll_wait(epoll_fd, &epev, 1, 10000); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + + if (ret == -1) + die_errno("epoll_wait failed"); + + if (ret > 0) { + dbg("thread %u handling event", me_num); + + handler_t *handler = epev.data.ptr; + handler->func(handler); + } + + garbage_collect(me_num); + } + + return NULL; +} + + +void epoll_cleanup(void) { + close(epoll_fd); +} diff --git a/recording-daemon/epoll.h b/recording-daemon/epoll.h new file mode 100644 index 000000000..1d5e2b5ef --- /dev/null +++ b/recording-daemon/epoll.h @@ -0,0 +1,20 @@ +#ifndef _EPOLL_H_ +#define _EPOLL_H_ + +#include +#include +#include +#include "types.h" + + +void epoll_setup(void); +void epoll_cleanup(void); + +int epoll_add(int fd, uint32_t events, handler_t *handler); +void epoll_del(int fd); + + +void *poller_thread(void *ptr); + + +#endif diff --git a/recording-daemon/garbage.c b/recording-daemon/garbage.c new file mode 100644 index 000000000..0b76b7775 --- /dev/null +++ b/recording-daemon/garbage.c @@ -0,0 +1,96 @@ +#include "garbage.h" +#include +#include +#include "log.h" + + +typedef struct { + void *ptr; + void (*free_func)(void *); + int *wait_threads; + unsigned int array_len; + unsigned int threads_left; +} garbage_t; + + +static pthread_mutex_t garbage_lock = PTHREAD_MUTEX_INITIALIZER; +static GQueue garbage = G_QUEUE_INIT; +static volatile int garbage_thread_num; + + +unsigned int garbage_new_thread_num(void) { + return g_atomic_int_add(&garbage_thread_num, 1); +} + + +void garbage_add(void *ptr, free_func_t *free_func) { + // Each running poller thread has a unique number associated with it, starting + // with 0. A garbage entry uses an array of boolean flags, one for each running + // thread, to keep track of which threads have seen this entry. Once a garbage + // entry has been seen by all threads, the free function is finally called. + // This is to make sure that all poller threads have left epoll_wait() after + // an fd has been removed from the watch list. + + garbage_t *garb = g_slice_alloc(sizeof(*garb)); + garb->ptr = ptr; + garb->free_func = free_func; + + pthread_mutex_lock(&garbage_lock); + + garb->array_len = g_atomic_int_get(&garbage_thread_num); + garb->threads_left = garb->array_len; + garb->wait_threads = malloc(sizeof(int) * garb->array_len); + memset(garb->wait_threads, 0, sizeof(int) * garb->array_len); + + g_queue_push_tail(&garbage, garb); + + pthread_mutex_unlock(&garbage_lock); +} + + +static void garbage_collect1(garbage_t *garb) { + garb->free_func(garb->ptr); + + free(garb->wait_threads); + g_slice_free1(sizeof(*garb), garb); +} + + +void garbage_collect(unsigned int num) { + dbg("running garbage collection thread %u", num); + +restart: + pthread_mutex_lock(&garbage_lock); + + for (GList *l = garbage.head; l; l = l->next) { + garbage_t *garb = l->data; + // has this been created before we were running? + if (garb->array_len <= num) + continue; + // have we processed this already? + if (garb->wait_threads[num]) + continue; + dbg("marking garbage entry %p as seen by %u with %u threads left", garb, num, + garb->threads_left); + garb->wait_threads[num] = 1; + garb->threads_left--; + // anything left? + if (!garb->threads_left) { + // remove from list and process + g_queue_delete_link(&garbage, l); + pthread_mutex_unlock(&garbage_lock); + garbage_collect1(garb); + + goto restart; + } + } + + pthread_mutex_unlock(&garbage_lock); +} + + +void garbage_collect_all(void) { + garbage_t *garb; + while ((garb = g_queue_pop_head(&garbage))) + garbage_collect1(garb); +} diff --git a/recording-daemon/garbage.h b/recording-daemon/garbage.h new file mode 100644 index 000000000..572b54b9c --- /dev/null +++ b/recording-daemon/garbage.h @@ -0,0 +1,11 @@ +#ifndef _GARBAGE_H_ +#define _GARBAGE_H_ + +typedef void free_func_t(void *); + +unsigned int garbage_new_thread_num(void); +void garbage_add(void *ptr, free_func_t *free_func); +void garbage_collect(unsigned int num); +void garbage_collect_all(void); + +#endif diff --git a/recording-daemon/inotify.c b/recording-daemon/inotify.c new file mode 100644 index 000000000..f7f728d30 --- /dev/null +++ b/recording-daemon/inotify.c @@ -0,0 +1,79 @@ +#include "inotify.h" +#include +#include +#include +#include +#include +#include "log.h" +#include "main.h" +#include "epoll.h" +#include "metafile.h" + + +static int inotify_fd = -1; + + +static handler_func inotify_handler_func; +static handler_t inotify_handler = { + .func = inotify_handler_func, +}; + + +static void inotify_close_write(struct inotify_event *inev) { + dbg("inotify close_write(%s)", inev->name); + metafile_change(inev->name); +} + + +static void inotify_delete(struct inotify_event *inev) { + dbg("inotify delete(%s)", inev->name); + metafile_delete(inev->name); +} + + +static void inotify_handler_func(handler_t *handler) { + char buf[4 * (sizeof(struct inotify_event) + NAME_MAX + 1)]; + + while (1) { + int ret = read(inotify_fd, buf, sizeof(buf)); + if (ret == -1) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) + break; + die_errno("read on inotify fd failed"); + } + if (ret == 0) + die("EOF on inotify fd"); + + char *bufend = buf + ret; + char *bufhead = buf; + while (bufhead < bufend) { + struct inotify_event *inev = (void *) bufhead; + + if ((inev->mask & IN_DELETE)) + inotify_delete(inev); + if ((inev->mask & IN_CLOSE_WRITE)) + inotify_close_write(inev); + + bufhead += sizeof(*inev) + inev->len; + } + } +} + + +void inotify_setup(void) { + inotify_fd = inotify_init1(IN_NONBLOCK); + if (inotify_fd == -1) + die_errno("inotify_init1 failed"); + + int ret = inotify_add_watch(inotify_fd, SPOOL_DIR, IN_CLOSE_WRITE | IN_DELETE); + if (ret == -1) + die_errno("inotify_add_watch failed"); + + if (epoll_add(inotify_fd, EPOLLIN, &inotify_handler)) + die_errno("failed to add inotify_fd to epoll"); +} + + +void inotify_cleanup(void) { + close(inotify_fd); +} diff --git a/recording-daemon/inotify.h b/recording-daemon/inotify.h new file mode 100644 index 000000000..2fba88b79 --- /dev/null +++ b/recording-daemon/inotify.h @@ -0,0 +1,7 @@ +#ifndef _INOTIFY_H_ +#define _INOTIFY_H_ + +void inotify_setup(void); +void inotify_cleanup(void); + +#endif diff --git a/recording-daemon/log.h b/recording-daemon/log.h new file mode 100644 index 000000000..6b446623d --- /dev/null +++ b/recording-daemon/log.h @@ -0,0 +1,15 @@ +#ifndef _LOG_H_ +#define _LOG_H_ + +#include +#include +#include +#include +#include + +#define die(fmt, ...) do { ilog(LOG_CRIT, "Fatal error: " fmt, ##__VA_ARGS__); exit(-1); } while (0) +#define die_errno(msg) die("%s: %s", msg, strerror(errno)) +#define ilog(fclt, fmt, ...) fprintf(stderr, fmt "\n", ##__VA_ARGS__) +#define dbg(fmt, ...) ilog(LOG_DEBUG, fmt, ##__VA_ARGS__) + +#endif diff --git a/recording-daemon/main.c b/recording-daemon/main.c new file mode 100644 index 000000000..9ce523d0d --- /dev/null +++ b/recording-daemon/main.c @@ -0,0 +1,105 @@ +#include "main.h" +#include +#include +#include +#include +#include +#include +#include +#include "log.h" +#include "epoll.h" +#include "inotify.h" +#include "metafile.h" +#include "garbage.h" + + + +static GQueue threads = G_QUEUE_INIT; // only accessed from main thread + +volatile int shutdown_flag; + + +static void signals(void) { + sigset_t ss; + + sigfillset(&ss); + sigdelset(&ss, SIGABRT); + sigdelset(&ss, SIGSEGV); + sigdelset(&ss, SIGQUIT); + sigprocmask(SIG_SETMASK, &ss, NULL); + pthread_sigmask(SIG_SETMASK, &ss, NULL); +} + + +static void setup(void) { + signals(); + metafile_setup(); + epoll_setup(); + inotify_setup(); +} + + +static void start_poller_thread(void) { + pthread_t *thr = g_slice_alloc(sizeof(*thr)); + int ret = pthread_create(thr, NULL, poller_thread, + GUINT_TO_POINTER(garbage_new_thread_num())); + if (ret) + die_errno("pthread_create failed"); + + g_queue_push_tail(&threads, thr); +} + + +static void wait_threads_finish(void) { + pthread_t *thr; + while ((thr = g_queue_pop_head(&threads))) { + pthread_cancel(*thr); + pthread_join(*thr, NULL); + g_slice_free1(sizeof(*thr), thr); + } +} + + +static void wait_for_signal(void) { + sigset_t ss; + int ret, sig; + + sigemptyset(&ss); + sigaddset(&ss, SIGINT); + sigaddset(&ss, SIGTERM); + + while (1) { + ret = sigwait(&ss, &sig); + if (ret == -1) { + if (errno == EAGAIN || errno == EINTR) + continue; + abort(); + } + shutdown_flag = 1; + break; + } +} + + +static void cleanup(void) { + garbage_collect_all(); + metafile_cleanup(); + inotify_cleanup(); + epoll_cleanup(); +} + + +int main() { + setup(); + + for (int i = 0; i < NUM_THREADS; i++) + start_poller_thread(); + + wait_for_signal(); + + dbg("shutting down"); + + wait_threads_finish(); + + cleanup(); +} diff --git a/recording-daemon/main.h b/recording-daemon/main.h new file mode 100644 index 000000000..a25ee1974 --- /dev/null +++ b/recording-daemon/main.h @@ -0,0 +1,13 @@ +#ifndef _MAIN_H_ +#define _MAIN_H_ + + +#define SPOOL_DIR "/var/spool/rtpengine" +#define PROC_DIR "/proc/rtpengine/0/calls" +#define NUM_THREADS 8 + + +extern volatile int shutdown_flag; + + +#endif diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c new file mode 100644 index 000000000..b42e74675 --- /dev/null +++ b/recording-daemon/metafile.c @@ -0,0 +1,241 @@ +#include "metafile.h" +#include +#include +#include +#include +#include +#include +#include +#include "log.h" +#include "stream.h" +#include "garbage.h" +#include "main.h" +#include "aux.h" + + +static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER; +static GHashTable *metafiles; + +//static pcre_t stream_interface_re, + //stream_details_re; + + +static void meta_free(void *ptr) { + metafile_t *mf = ptr; + + dbg("freeing metafile info for %s", mf->name); + g_string_chunk_free(mf->gsc); + for (int i = 0; i < mf->streams->len; i++) { + stream_t *stream = g_ptr_array_index(mf->streams, i); + stream_close(stream); // should be closed already + stream_free(stream); + } + g_ptr_array_free(mf->streams, TRUE); + g_slice_free1(sizeof(*mf), mf); +} + + +// mf is locked +static void meta_destroy(metafile_t *mf) { + // close all streams + for (int i = 0; i < mf->streams->len; i++) { + stream_t *stream = g_ptr_array_index(mf->streams, i); + pthread_mutex_lock(&stream->lock); + stream_close(stream); + pthread_mutex_unlock(&stream->lock); + } +} + + +// mf is locked +static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) { + dbg("stream %lu interface %s", snum, content); + stream_open(mf, snum, content); +} + + +// mf is locked +static void meta_stream_details(metafile_t *mf, unsigned long snum, char *content) { + dbg("stream %lu details %s", snum, content); +} + + +// mf is locked +static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned int payload_num, + char *payload_type) +{ + dbg("payload type in media %lu num %u is %s", mnum, payload_num, payload_type); +} + + +// mf is locked +static void meta_section(metafile_t *mf, char *section, char *content, unsigned long len) { + unsigned long lu; + unsigned int u; + + if (!strcmp(section, "CALL-ID")) + mf->call_id = g_string_chunk_insert(mf->gsc, content); + else if (!strcmp(section, "PARENT")) + mf->parent = g_string_chunk_insert(mf->gsc, content); + else if (sscanf_match(section, "STREAM %lu interface", &lu) == 1) + meta_stream_interface(mf, lu, content); + else if (sscanf_match(section, "STREAM %lu details", &lu) == 1) + meta_stream_details(mf, lu, content); + else if (sscanf_match(section, "MEDIA %lu PAYLOAD TYPE %u", &lu, &u) == 2) + meta_rtp_payload_type(mf, lu, u, content); +} + + +void metafile_change(char *name) { + // get or create metafile metadata + pthread_mutex_lock(&metafiles_lock); + metafile_t *mf = g_hash_table_lookup(metafiles, name); + if (!mf) { + dbg("allocating metafile info for %s", name); + mf = g_slice_alloc0(sizeof(*mf)); + mf->gsc = g_string_chunk_new(0); + mf->name = g_string_chunk_insert(mf->gsc, name); + pthread_mutex_init(&mf->lock, NULL); + mf->streams = g_ptr_array_new(); + g_hash_table_insert(metafiles, mf->name, mf); + } + // switch locks + pthread_mutex_lock(&mf->lock); + pthread_mutex_unlock(&metafiles_lock); + + char fnbuf[PATH_MAX]; + snprintf(fnbuf, sizeof(fnbuf), "%s/%s", SPOOL_DIR, name); + + // open file and seek to last known position + int fd = open(fnbuf, O_RDONLY); + if (fd == -1) { + ilog(LOG_ERR, "Failed to open %s: %s\n", fnbuf, strerror(errno)); + goto out; + } + lseek(fd, mf->pos, SEEK_SET); + + // read the entire file + GString *s = g_string_new(NULL); + char buf[1024]; + while (1) { + int ret = read(fd, buf, sizeof(buf)); + if (ret == 0) + break; + if (ret == -1) + die_errno("read on metadata file failed"); + g_string_append_len(s, buf, ret); + } + + // save read position and close file + mf->pos = lseek(fd, 0, SEEK_CUR); + close(fd); + + // process contents of metadata file + char *head = s->str; + char *endp = s->str + s->len; + while (head < endp) { + // section header + char *nl = memchr(head, '\n', endp - head); + if (!nl || nl == head) { + ilog(LOG_WARN, "Missing section header in %s", name); + break; + } + if (memchr(head, '\0', nl - head)) { + ilog(LOG_WARN, "NUL character in section header in %s", name); + break; + } + *(nl++) = '\0'; + char *section = head; + dbg("section %s", section); + head = nl; + + // content length + nl = memchr(head, ':', endp - head); + if (!nl || nl == head) { + ilog(LOG_WARN, "Content length for section %s missing in %s", section, name); + break; + } + *(nl++) = '\0'; + if (*(nl++) != '\n') { + ilog(LOG_WARN, "Unterminated content length for section %s in %s", section, name); + break; + } + char *errp; + unsigned long slen = strtoul(head, &errp, 10); + if (*errp != '\0') { + ilog(LOG_WARN, "Invalid content length for section %s in %s", section, name); + break; + } + dbg("content length %lu", slen); + head = nl; + + // content + if (endp - head < slen) { + ilog(LOG_WARN, "Content truncated in section %s in %s", section, name); + break; + } + char *content = head; + if (memchr(content, '\0', slen)) { + ilog(LOG_WARN, "NUL character in content in section %s in %s", section, name); + break; + } + + // double newline separator + head += slen; + if (*head != '\n' || *(head + 1) != '\n') { + ilog(LOG_WARN, "Separator missing after section %s in %s", section, name); + break; + } + *head = '\0'; + head += 2; + + meta_section(mf, section, content, slen); + } + + g_string_free(s, TRUE); + +out: + pthread_mutex_unlock(&mf->lock); +} + + +void metafile_delete(char *name) { + // get metafile metadata + pthread_mutex_lock(&metafiles_lock); + metafile_t *mf = g_hash_table_lookup(metafiles, name); + if (!mf) { + // nothing to do + pthread_mutex_unlock(&metafiles_lock); + return; + } + // switch locks and remove entry + pthread_mutex_lock(&mf->lock); + g_hash_table_remove(metafiles, name); + pthread_mutex_unlock(&metafiles_lock); + + meta_destroy(mf); + + // add to garbage + garbage_add(mf, meta_free); + pthread_mutex_unlock(&mf->lock); +} + + +void metafile_setup(void) { + metafiles = g_hash_table_new(g_str_hash, g_str_equal); + //pcre_build(&stream_interface_re, "^STREAM (\\d+) interface$"); + //pcre_build(&stream_interface_re, "^STREAM (\\d+) details$"); +} + + +void metafile_cleanup(void) { + GList *mflist = g_hash_table_get_values(metafiles); + for (GList *l = mflist; l; l = l->next) { + metafile_t *mf = l->data; + meta_destroy(mf); + meta_free(mf); + + } + g_list_free(mflist); + g_hash_table_destroy(metafiles); +} diff --git a/recording-daemon/metafile.h b/recording-daemon/metafile.h new file mode 100644 index 000000000..f1beb0108 --- /dev/null +++ b/recording-daemon/metafile.h @@ -0,0 +1,12 @@ +#ifndef _METAFILE_H_ +#define _METAFILE_H_ + +#include "types.h" + +void metafile_setup(void); +void metafile_cleanup(void); + +void metafile_change(char *name); +void metafile_delete(char *name); + +#endif diff --git a/recording-daemon/pcre.c b/recording-daemon/pcre.c new file mode 100644 index 000000000..fff425a17 --- /dev/null +++ b/recording-daemon/pcre.c @@ -0,0 +1,14 @@ +#include "pcre.h" +#include +#include "log.h" + + +void pcre_build(pcre_t *out, const char *pattern) { + const char *errptr; + int erroff; + + out->re = pcre_compile(pattern, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); + if (!out->re) + die("Failed to compile PCRE '%s': %s (at %i)", pattern, errptr, erroff); + out->extra = pcre_study(out->re, 0, &errptr); +} diff --git a/recording-daemon/pcre.h b/recording-daemon/pcre.h new file mode 100644 index 000000000..f982340a7 --- /dev/null +++ b/recording-daemon/pcre.h @@ -0,0 +1,8 @@ +#ifndef _PCRE_H_ +#define _PCRE_H_ + +#include "types.h" + +void pcre_build(pcre_t *out, const char *pattern); + +#endif diff --git a/recording-daemon/stream.c b/recording-daemon/stream.c new file mode 100644 index 000000000..686be84f2 --- /dev/null +++ b/recording-daemon/stream.c @@ -0,0 +1,95 @@ +#include "stream.h" +#include +#include +#include +#include +#include +#include "metafile.h" +#include "epoll.h" +#include "log.h" +#include "main.h" + + +// stream is locked +void stream_close(stream_t *stream) { + if (stream->fd == -1) + return; + epoll_del(stream->fd); + close(stream->fd); + stream->fd = -1; +} + +void stream_free(stream_t *stream) { + g_slice_free1(sizeof(*stream), stream); +} + + +static void stream_handler(handler_t *handler) { + stream_t *stream = handler->ptr; + + //dbg("poll event for %s", stream->name); + + pthread_mutex_lock(&stream->lock); + + if (stream->fd == -1) + goto out; + + char buf[65535]; + int ret = read(stream->fd, buf, sizeof(buf)); + if (ret == 0) { + ilog(LOG_INFO, "EOF on stream %s", stream->name); + stream_close(stream); + goto out; + } + else if (ret < 0) { + ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno)); + stream_close(stream); + goto out; + } + +out: + pthread_mutex_unlock(&stream->lock); +} + + +// mf is locked +static stream_t *stream_get(metafile_t *mf, unsigned long id) { + if (mf->streams->len <= id) + g_ptr_array_set_size(mf->streams, id + 1); + stream_t *ret = g_ptr_array_index(mf->streams, id); + if (ret) + goto out; + + ret = g_slice_alloc0(sizeof(*ret)); + g_ptr_array_index(mf->streams, id) = ret; + pthread_mutex_init(&ret->lock, NULL); + ret->fd = -1; + ret->id = id; + +out: + return ret; +} + + +// mf is locked +void stream_open(metafile_t *mf, unsigned long id, char *name) { + dbg("opening stream %lu/%s", id, name); + + stream_t *stream = stream_get(mf, id); + + stream->name = g_string_chunk_insert(mf->gsc, name); + + char fnbuf[PATH_MAX]; + snprintf(fnbuf, sizeof(fnbuf), "%s/%s/%s", PROC_DIR, mf->parent, name); + + stream->fd = open(fnbuf, O_RDONLY | O_NONBLOCK); + if (stream->fd == -1) { + ilog(LOG_ERR, "Failed to open kernel stream %s: %s", fnbuf, strerror(errno)); + return; + } + + // add to epoll + stream->handler.ptr = stream; + stream->handler.func = stream_handler; + epoll_add(stream->fd, EPOLLIN, &stream->handler); +} diff --git a/recording-daemon/stream.h b/recording-daemon/stream.h new file mode 100644 index 000000000..1af4047c1 --- /dev/null +++ b/recording-daemon/stream.h @@ -0,0 +1,10 @@ +#ifndef _STREAM_H_ +#define _STREAM_H_ + +#include "types.h" + +void stream_open(metafile_t *mf, unsigned long id, char *name); +void stream_close(stream_t *stream); +void stream_free(stream_t *stream); + +#endif diff --git a/recording-daemon/types.h b/recording-daemon/types.h new file mode 100644 index 000000000..ada38263e --- /dev/null +++ b/recording-daemon/types.h @@ -0,0 +1,46 @@ +#ifndef _TYPES_H_ +#define _TYPES_H_ + + +#include +#include +#include +#include + + +typedef struct handler_s handler_t; +typedef void handler_func(handler_t *); + +struct handler_s { + handler_func *func; + void *ptr; +}; + +struct stream_s { + pthread_mutex_t lock; + char *name; + unsigned long id; + int fd; + handler_t handler; +}; +typedef struct stream_s stream_t; + +struct metafile_s { + pthread_mutex_t lock; + char *name; + char *parent; + char *call_id; + off_t pos; + GStringChunk *gsc; // XXX limit max size + GPtrArray *streams; +}; +typedef struct metafile_s metafile_t; + +// struct pcre_s { +// pcre *re; +// pcre_extra *extra; +// }; +// typedef struct pcre_s pcre_t; + + +#endif