From 3b28460507fbaa7695ec5c9c2b12abe239a34102 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 22 Feb 2019 15:30:46 -0500 Subject: [PATCH] TT#52651 Squashed commit of the following: commit 20291443683a4bcca198e685aadcd9107d600ae3 Author: Richard Fuchs Date: Thu Feb 21 13:12:50 2019 -0500 TT#52651 switch TCP to TLS Change-Id: Iab6b05d3b5c88553cbd6f531f3189084d9e71995 commit b28e718ee4ae3ac8e72335702e15abbf06649f9c Author: Richard Fuchs Date: Fri Feb 22 09:20:54 2019 -0500 TT#52651 generalise streambuf interface Change-Id: I7d5ab8ffe13e52d4dbb1901531cc13fcc173d60d commit cb2dbd2a92e1fe81d1ab66aa34ce52849e4cc261 Author: Richard Fuchs Date: Tue Feb 19 09:32:56 2019 -0500 TT#52651 add start/stop forwarding commands and party selection logic Change-Id: I8ef7e288d3a3e485bd2fa14e1a2407a0c8d94bac commit 442c48f627df1d706639a82948d1c9356f13eb12 Author: Richard Fuchs Date: Thu Feb 14 15:43:23 2019 -0500 TT#52651 produce output for TCP forwarding feature Change-Id: I18543921577faf655679829684f5af46c0af5054 commit 2ef8028eb27fba0bd7db27c8095645e3a668d0bd Author: Richard Fuchs Date: Thu Feb 14 10:18:21 2019 -0500 TT#52651 make recording to output files optional Change-Id: I12c288b965641352658ce3b499c2ee90593e1322 commit 10a58cd7a060c4a774d3595cfff2bcb8aa604ecd Author: Richard Fuchs Date: Wed Feb 13 16:02:16 2019 -0500 TT#52651 strip streambuf into lib and include in recording daemon Change-Id: I1f6638961e9e767063e0b4e6b5d55d88799366d3 commit 9d3bb5bffc81aef27e69a7f6fdd3cea6c57b3831 Author: Richard Fuchs Date: Wed Feb 13 15:40:12 2019 -0500 TT#52651 extract/move unrelated old legacy decoder struct members Change-Id: Iffd79b43180c30a9e128a460f7ba85ba49dedeaf commit 1bc38e42019b7ca0988343ae7359371cb431bec9 Author: Richard Fuchs Date: Tue Feb 12 16:43:42 2019 -0500 TT#52651 config options for forwarding option Change-Id: Ieaa2ee0e55a0c531158174bc6a534738a64dbee6 commit 06d61cd3dd0d0cb736bb2ddf7bcc304cc1f83107 Author: Richard Fuchs Date: Tue Feb 12 16:29:52 2019 -0500 TT#52651 move socket.[ch] into lib/ includes necessary re-shuffling of additional code pieces Change-Id: I74b314ab5936ac8a0eeaff94e084617b59b28d79 Change-Id: I025e8ec86b90ede79565542dff57ec1559d04200 --- daemon/.gitignore | 2 + daemon/Makefile | 6 +- daemon/aux.c | 34 ------- daemon/call_interfaces.c | 80 ++++++++++++++- daemon/control_ng.c | 8 ++ daemon/log.h | 7 ++ daemon/main.c | 48 +-------- daemon/poller.c | 9 +- daemon/recording.c | 52 ++++++++-- include/aux.h | 150 ---------------------------- include/call.h | 9 +- include/call_interfaces.h | 2 + include/control_ng.h | 2 + include/poller.h | 7 +- include/tcp_listener.h | 1 + lib/auxlib.c | 37 +++++++ lib/auxlib.h | 171 ++++++++++++++++++++++++++++++++ lib/codeclib.c | 2 - lib/codeclib.h | 5 +- {daemon => lib}/socket.c | 8 +- {include => lib}/socket.h | 4 +- lib/ssllib.c | 51 ++++++++++ lib/ssllib.h | 8 ++ {daemon => lib}/streambuf.c | 48 ++++++--- {include => lib}/streambuf.h | 11 ++- recording-daemon/.gitignore | 2 + recording-daemon/Makefile | 6 +- recording-daemon/db.c | 6 +- recording-daemon/decoder.c | 74 +++++++++++--- recording-daemon/decoder.h | 5 +- recording-daemon/log.h | 1 + recording-daemon/main.c | 28 +++++- recording-daemon/main.h | 4 + recording-daemon/metafile.c | 16 ++- recording-daemon/packet.c | 185 +++++++++++++++++++++++++++++++++-- recording-daemon/packet.h | 2 + recording-daemon/poller.c | 11 +++ recording-daemon/poller.h | 22 +++++ recording-daemon/stream.c | 8 +- recording-daemon/stream.h | 1 + recording-daemon/types.h | 32 +++++- t/Makefile | 6 +- t/log.h | 1 + 43 files changed, 850 insertions(+), 322 deletions(-) rename {daemon => lib}/socket.c (99%) rename {include => lib}/socket.h (99%) create mode 100644 lib/ssllib.c create mode 100644 lib/ssllib.h rename {daemon => lib}/streambuf.c (74%) rename {include => lib}/streambuf.h (76%) create mode 100644 recording-daemon/poller.c create mode 100644 recording-daemon/poller.h diff --git a/daemon/.gitignore b/daemon/.gitignore index 21bdd047b..fb8f071b2 100644 --- a/daemon/.gitignore +++ b/daemon/.gitignore @@ -11,3 +11,5 @@ codeclib.c resample.c str.c fix_frame_channel_layout.h +socket.c +streambuf.c diff --git a/daemon/Makefile b/daemon/Makefile index d706f893b..cd53d0a81 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -119,12 +119,12 @@ ifeq ($(have_bcg729),yes) LDLIBS+= $(bcg729_lib) endif -SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ +SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ - crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c socket.c \ + crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ codec.c load.c dtmf.c -LIBSRCS= loglib.c auxlib.c rtplib.c str.c +LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c endif diff --git a/daemon/aux.c b/daemon/aux.c index b81e1cf39..8f6d36430 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -27,9 +27,6 @@ struct detach_thread { const char *scheduler; int priority; }; -struct thread_buf { - char buf[THREAD_BUF_SIZE]; -}; struct scheduler { const char *name; int num; @@ -42,10 +39,6 @@ static GList *threads_to_join; static GList *threads_running; static cond_t threads_cond = COND_STATIC_INIT; -static struct thread_buf __thread t_bufs[NUM_THREAD_BUFS]; -static int __thread t_buf_idx; - -__thread struct timeval rtpe_now; volatile int rtpe_shutdown; #ifdef NEED_ATOMIC64_MUTEX @@ -249,25 +242,6 @@ void thread_create_detach_prio(void (*f)(void *), void *d, const char *scheduler abort(); } -unsigned int in6_addr_hash(const void *p) { - const struct in6_addr *a = p; - return a->s6_addr32[0] ^ a->s6_addr32[3]; -} - -int in6_addr_eq(const void *a, const void *b) { - const struct in6_addr *A = a, *B = b; - return !memcmp(A, B, sizeof(*A)); -} - -char *get_thread_buf(void) { - char *ret; - ret = t_bufs[t_buf_idx].buf; - t_buf_idx++; - if (t_buf_idx >= G_N_ELEMENTS(t_bufs)) - t_buf_idx = 0; - return ret; -} - int g_tree_find_first_cmp(void *k, void *v, void *d) { void **p = d; GEqualFunc f = p[1]; @@ -285,14 +259,6 @@ int g_tree_find_all_cmp(void *k, void *v, void *d) { g_queue_push_tail(q, v); return FALSE; } -unsigned int uint32_hash(const void *p) { - const u_int32_t *a = p; - return *a; -} -int uint32_eq(const void *a, const void *b) { - const u_int32_t *A = a, *B = b; - return (*A == *B) ? TRUE : FALSE; -} void free_buf(char **p) { if (*p) diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index d7adeec08..c52baeb58 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -1004,8 +1004,10 @@ static const char *call_offer_answer_ng(bencode_item_t *input, bencode_buffer_destroy_add(output->buffer, (free_func_t) sdp_chopper_destroy, chopper); detect_setup_recording(call, &flags.record_call_str, &flags.metadata); - if (flags.record_call) + if (flags.record_call) { + call->recording_on = 1; recording_start(call, NULL, &flags.metadata); + } ret = monologue_offer_answer(monologue, &streams, &flags); if (!ret) { @@ -1438,6 +1440,7 @@ const char *call_start_recording_ng(bencode_item_t *input, bencode_item_t *outpu if (!call) return "Unknown call-id"; + call->recording_on = 1; recording_start(call, NULL, &metadata); rwlock_unlock_w(&call->master_lock); @@ -1456,6 +1459,7 @@ const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output if (!call) return "Unknown call-id"; + call->recording_on = 0; recording_stop(call); rwlock_unlock_w(&call->master_lock); @@ -1512,6 +1516,80 @@ found: return NULL; } +// XXX these are all identical - unify and use a flags int and/or callback +const char *call_start_forwarding_ng(bencode_item_t *input, bencode_item_t *output) { + struct call *call; + struct call_monologue *monologue; + const char *errstr = NULL; + str metadata; + + errstr = media_block_match(&call, &monologue, input); + if (errstr) + goto out; + + bencode_dictionary_get_str(input, "metadata", &metadata); + + if (monologue) { + ilog(LOG_INFO, "Start forwarding for single party (tag '" STR_FORMAT ")", + STR_FMT(&monologue->tag)); + monologue->rec_forwarding = 1; + } + else { + ilog(LOG_INFO, "Start forwarding (entire call)"); + call->rec_forwarding = 1; + } + + recording_start(call, NULL, &metadata); + errstr = NULL; +out: + if (call) { + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + return errstr; +} + +const char *call_stop_forwarding_ng(bencode_item_t *input, bencode_item_t *output) { + struct call *call; + struct call_monologue *monologue; + const char *errstr = NULL; + struct sdp_ng_flags flags; + + errstr = media_block_match(&call, &monologue, input); + if (errstr) + goto out; + + call_ng_process_flags(&flags, input, OP_OTHER); + + if (monologue) { + ilog(LOG_INFO, "Stop forwarding for single party (tag '" STR_FORMAT ")", + STR_FMT(&monologue->tag)); + monologue->rec_forwarding = 0; + } + else { + ilog(LOG_INFO, "Stop forwarding (entire call)"); + call->rec_forwarding = 0; + if (flags.all) { + for (GList *l = call->monologues.head; l; l = l->next) { + monologue = l->data; + monologue->rec_forwarding = 0; + } + } + } + + recording_stop(call); + + errstr = NULL; +out: + if (call) { + rwlock_unlock_w(&call->master_lock); + obj_put(call); + } + + return NULL; +} + const char *call_block_dtmf_ng(bencode_item_t *input, bencode_item_t *output) { struct call *call; struct call_monologue *monologue; diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 59c4a0efa..39f56dbd3 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -219,6 +219,14 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin errstr = call_stop_recording_ng(dict, resp); g_atomic_int_inc(&cur->stop_recording); break; + case CSH_LOOKUP("start forwarding"): + errstr = call_start_forwarding_ng(dict, resp); + g_atomic_int_inc(&cur->start_forwarding); + break; + case CSH_LOOKUP("stop forwarding"): + errstr = call_stop_forwarding_ng(dict, resp); + g_atomic_int_inc(&cur->stop_forwarding); + break; case CSH_LOOKUP("block DTMF"): errstr = call_block_dtmf_ng(dict, resp); g_atomic_int_inc(&cur->block_dtmf); diff --git a/daemon/log.h b/daemon/log.h index e8864ca07..a5f6ff46a 100644 --- a/daemon/log.h +++ b/daemon/log.h @@ -48,5 +48,12 @@ void dtmflog(GString *s); void log_format(enum log_format); void __ilog(int prio, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); +// call debug +#ifdef __DEBUG +#define __C_DBG(x...) ilog(LOG_DEBUG, x) +#else +#define __C_DBG(x...) ((void)0) +#endif + #endif diff --git a/daemon/main.c b/daemon/main.c index 3b3fce06b..7033ef776 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -12,7 +12,6 @@ #include #include #include -#include #include #include @@ -42,6 +41,7 @@ #include "graphite.h" #include "codeclib.h" #include "load.h" +#include "ssllib.h" @@ -586,59 +586,15 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { } -#if OPENSSL_VERSION_NUMBER < 0x10100000L -static mutex_t *openssl_locks; - -static void cb_openssl_threadid(CRYPTO_THREADID *tid) { - pthread_t me; - - me = pthread_self(); - - if (sizeof(me) == sizeof(void *)) - CRYPTO_THREADID_set_pointer(tid, (void *) me); - else - CRYPTO_THREADID_set_numeric(tid, (unsigned long) me); -} - -static void cb_openssl_lock(int mode, int type, const char *file, int line) { - if ((mode & CRYPTO_LOCK)) - mutex_lock(&openssl_locks[type]); - else - mutex_unlock(&openssl_locks[type]); -} - -static void make_OpenSSL_thread_safe(void) { - int i; - - openssl_locks = malloc(sizeof(*openssl_locks) * CRYPTO_num_locks()); - for (i = 0; i < CRYPTO_num_locks(); i++) - mutex_init(&openssl_locks[i]); - - CRYPTO_THREADID_set_callback(cb_openssl_threadid); - CRYPTO_set_locking_callback(cb_openssl_lock); -} -#else -static void make_OpenSSL_thread_safe(void) { - ; -} -#endif - - static void early_init() { socket_init(); // needed for socktype_udp } static void init_everything() { - struct timespec ts; - log_init("rtpengine"); log_format(rtpe_config.log_format); recording_fs_init(rtpe_config.spooldir, rtpe_config.rec_method, rtpe_config.rec_format); - clock_gettime(CLOCK_REALTIME, &ts); - srandom(ts.tv_sec ^ ts.tv_nsec); - SSL_library_init(); - SSL_load_error_strings(); - make_OpenSSL_thread_safe(); + rtpe_ssl_init(); #if !GLIB_CHECK_VERSION(2,32,0) g_thread_init(NULL); diff --git a/daemon/poller.c b/daemon/poller.c index 2aeac20b6..f3335dc34 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -366,7 +366,8 @@ out: } -void poller_blocked(struct poller *p, int fd) { +void poller_blocked(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); struct epoll_event e; if (!p || fd < 0) @@ -392,7 +393,8 @@ fail: mutex_unlock(&p->lock); } -void poller_error(struct poller *p, int fd) { +void poller_error(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); if (!p || fd < 0) return; @@ -412,7 +414,8 @@ fail: mutex_unlock(&p->lock); } -int poller_isblocked(struct poller *p, int fd) { +int poller_isblocked(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); int ret; if (!p || fd < 0) diff --git a/daemon/recording.c b/daemon/recording.c index da3d4af71..30ac94405 100644 --- a/daemon/recording.c +++ b/daemon/recording.c @@ -35,6 +35,9 @@ struct pcap_format { static int check_main_spool_dir(const char *spoolpath); static char *recording_setup_file(struct recording *recording); static char *meta_setup_file(struct recording *recording); +static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen, + const char *label_fmt, ...) + __attribute__((format(printf,4,5))); // pcap methods static int pcap_create_spool_dir(const char *dirpath); @@ -58,6 +61,9 @@ static void kernel_info_proc(struct packet_stream *, struct rtpengine_target_inf static void pcap_eth_header(unsigned char *, struct packet_stream *); +#define append_meta_chunk_str(r, str, f...) append_meta_chunk(r, (str)->s, (str)->len, f) +#define append_meta_chunk_s(r, str, f...) append_meta_chunk(r, (str), strlen(str), f) +#define append_meta_chunk_null(r,f...) append_meta_chunk(r, "", 0, f) static const struct recording_method methods[] = { @@ -224,12 +230,26 @@ static void update_metadata(struct call *call, str *metadata) { } } +// lock must be held +static void recording_update_flags(struct call *call) { + append_meta_chunk_null(call->recording, "RECORDING %u", call->recording_on ? 1 : 0); + append_meta_chunk_null(call->recording, "FORWARDING %u", call->rec_forwarding ? 1 : 0); + for (GList *l = call->streams.head; l; l = l->next) { + struct packet_stream *ps = l->data; + append_meta_chunk_null(call->recording, "STREAM %u FORWARDING %u", + ps->unique_id, ps->media->monologue->rec_forwarding ? 1 : 0); + } +} + // lock must be held void recording_start(struct call *call, const char *prefix, str *metadata) { update_metadata(call, metadata); - if (call->recording) // already active + if (call->recording) { + // already active + recording_update_flags(call); return; + } if (!spooldir) { ilog(LOG_ERR, "Call recording requested, but no spool directory configured"); @@ -267,11 +287,27 @@ void recording_start(struct call *call, const char *prefix, str *metadata) { __unkernelize(ps); ps->handler = NULL; } + + recording_update_flags(call); } void recording_stop(struct call *call) { if (!call->recording) return; + // check if all recording options are disabled + if (call->recording_on || call->rec_forwarding) { + recording_update_flags(call); + return; + } + + for (GList *l = call->monologues.head; l; l = l->next) { + struct call_monologue *ml = l->data; + if (ml->rec_forwarding) { + recording_update_flags(call); + return; + } + } + ilog(LOG_NOTICE, "Turning off call recording."); recording_finish(call); } @@ -628,10 +664,6 @@ static int vappend_meta_chunk_iov(struct recording *recording, struct iovec *in_ return 0; } -static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen, - const char *label_fmt, ...) - __attribute__((format(printf,4,5))); - static int append_meta_chunk(struct recording *recording, const char *buf, unsigned int buflen, const char *label_fmt, ...) { @@ -646,8 +678,6 @@ static int append_meta_chunk(struct recording *recording, const char *buf, unsig return ret; } -#define append_meta_chunk_str(r, str, f...) append_meta_chunk(r, (str)->s, (str)->len, f) -#define append_meta_chunk_s(r, str, f...) append_meta_chunk(r, (str), strlen(str), f) static void proc_init(struct call *call) { struct recording *recording = call->recording; @@ -694,8 +724,14 @@ static void finish_proc(struct call *call) { struct recording *recording = call->recording; if (!kernel.is_open) return; - if (recording->u.proc.call_idx != UNINIT_IDX) + if (recording->u.proc.call_idx != UNINIT_IDX) { kernel_del_call(recording->u.proc.call_idx); + recording->u.proc.call_idx = UNINIT_IDX; + } + for (GList *l = call->streams.head; l; l = l->next) { + struct packet_stream *ps = l->data; + ps->recording.u.proc.stream_idx = UNINIT_IDX; + } unlink(recording->meta_filepath); } diff --git a/include/aux.h b/include/aux.h index a9b7e426f..75de0bc45 100644 --- a/include/aux.h +++ b/include/aux.h @@ -49,24 +49,8 @@ G_STATIC_ASSERT (sizeof *(atomic) == sizeof (gint)); \ -/*** HELPER MACROS ***/ - -#define ZERO(x) memset(&(x), 0, sizeof(x)) - -#define UINT64F "%" G_GUINT64_FORMAT - -#define THREAD_BUF_SIZE 64 -#define NUM_THREAD_BUFS 8 - -#define AUTO_CLEANUP(decl, func) decl __attribute__ ((__cleanup__(func))) -#define AUTO_CLEANUP_INIT(decl, func, val) AUTO_CLEANUP(decl, func) = val -#define AUTO_CLEANUP_NULL(decl, func) AUTO_CLEANUP_INIT(decl, func, 0) -#define AUTO_CLEANUP_BUF(var) AUTO_CLEANUP_NULL(char *var, free_buf) - - /*** GLOBALS ***/ -extern __thread struct timeval rtpe_now; extern volatile int rtpe_shutdown; @@ -79,11 +63,6 @@ typedef int (*parse_func)(char **, void **, void *); int pcre_multi_match(pcre *, pcre_extra *, const char *, unsigned int, parse_func, void *, GQueue *); INLINE void strmove(char **, char **); INLINE void strdupfree(char **, const char *); -char *get_thread_buf(void); -unsigned int in6_addr_hash(const void *p); -int in6_addr_eq(const void *a, const void *b); -unsigned int uint32_hash(const void *p); -int uint32_eq(const void *a, const void *b); @@ -274,135 +253,6 @@ void free_buf(char **); -/*** MUTEX ABSTRACTION ***/ - -typedef pthread_mutex_t mutex_t; -typedef pthread_rwlock_t rwlock_t; -typedef pthread_cond_t cond_t; - -#define mutex_init(m) __debug_mutex_init(m, __FILE__, __LINE__) -#define mutex_destroy(m) __debug_mutex_destroy(m, __FILE__, __LINE__) -#define mutex_lock(m) __debug_mutex_lock(m, __FILE__, __LINE__) -#define mutex_trylock(m) __debug_mutex_trylock(m, __FILE__, __LINE__) -#define mutex_unlock(m) __debug_mutex_unlock(m, __FILE__, __LINE__) -#define MUTEX_STATIC_INIT PTHREAD_MUTEX_INITIALIZER - -#define rwlock_init(l) __debug_rwlock_init(l, __FILE__, __LINE__) -#define rwlock_destroy(l) __debug_rwlock_destroy(l, __FILE__, __LINE__) -#define rwlock_lock_r(l) __debug_rwlock_lock_r(l, __FILE__, __LINE__) -#define rwlock_unlock_r(l) __debug_rwlock_unlock_r(l, __FILE__, __LINE__) -#define rwlock_lock_w(l) __debug_rwlock_lock_w(l, __FILE__, __LINE__) -#define rwlock_unlock_w(l) __debug_rwlock_unlock_w(l, __FILE__, __LINE__) - -#define cond_init(c) __debug_cond_init(c, __FILE__, __LINE__) -#define cond_wait(c,m) __debug_cond_wait(c,m, __FILE__, __LINE__) -#define cond_timedwait(c,m,t) __debug_cond_timedwait(c,m,t, __FILE__, __LINE__) -#define cond_signal(c) __debug_cond_signal(c, __FILE__, __LINE__) -#define cond_broadcast(c) __debug_cond_broadcast(c, __FILE__, __LINE__) -#define COND_STATIC_INIT PTHREAD_COND_INITIALIZER - -INLINE int __cond_timedwait_tv(cond_t *c, mutex_t *m, const struct timeval *tv) { - struct timespec ts; - ts.tv_sec = tv->tv_sec; - ts.tv_nsec = tv->tv_usec * 1000; - return pthread_cond_timedwait(c, m, &ts); -} - -#ifndef __THREAD_DEBUG - -#define __debug_mutex_init(m, F, L) pthread_mutex_init(m, NULL) -#define __debug_mutex_destroy(m, F, L) pthread_mutex_destroy(m) -#define __debug_mutex_lock(m, F, L) pthread_mutex_lock(m) -#define __debug_mutex_trylock(m, F, L) pthread_mutex_trylock(m) -#define __debug_mutex_unlock(m, F, L) pthread_mutex_unlock(m) - -#define __debug_rwlock_init(l, F, L) pthread_rwlock_init(l, NULL) -#define __debug_rwlock_destroy(l, F, L) pthread_rwlock_destroy(l) -#define __debug_rwlock_lock_r(l, F, L) pthread_rwlock_rdlock(l) -#define __debug_rwlock_unlock_r(l, F, L) pthread_rwlock_unlock(l) -#define __debug_rwlock_lock_w(l, F, L) pthread_rwlock_wrlock(l) -#define __debug_rwlock_unlock_w(l, F, L) pthread_rwlock_unlock(l) - -#define __debug_cond_init(c, F, L) pthread_cond_init(c, NULL) -#define __debug_cond_wait(c, m, F, L) pthread_cond_wait(c,m) -#define __debug_cond_timedwait(c, m, t, F, L) __cond_timedwait_tv(c,m,t) -#define __debug_cond_signal(c, F, L) pthread_cond_signal(c) -#define __debug_cond_broadcast(c, F, L) pthread_cond_broadcast(c) - -#else - - -#include "log.h" - - - -INLINE int __debug_mutex_init(mutex_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "mutex_init(%p) at %s:%u", m, file, line); - return pthread_mutex_init(m, NULL); -} -INLINE int __debug_mutex_destroy(mutex_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "mutex_destroy(%p) at %s:%u", m, file, line); - return pthread_mutex_destroy(m); -} -INLINE int __debug_mutex_lock(mutex_t *m, const char *file, unsigned int line) { - int ret; - write_log(LOG_DEBUG, "mutex_lock(%p) at %s:%u ...", m, file, line); - ret = pthread_mutex_lock(m); - write_log(LOG_DEBUG, "mutex_lock(%p) at %s:%u returning %i", m, file, line, ret); - return ret; -} -INLINE int __debug_mutex_trylock(mutex_t *m, const char *file, unsigned int line) { - int ret; - write_log(LOG_DEBUG, "mutex_trylock(%p) at %s:%u ...", m, file, line); - ret = pthread_mutex_trylock(m); - write_log(LOG_DEBUG, "mutex_trylock(%p) at %s:%u returning %i", m, file, line, ret); - return ret; -} -INLINE int __debug_mutex_unlock(mutex_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "mutex_unlock(%p) at %s:%u", m, file, line); - return pthread_mutex_unlock(m); -} - -INLINE int __debug_rwlock_init(rwlock_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "rwlock_init(%p) at %s:%u", m, file, line); - return pthread_rwlock_init(m, NULL); -} -INLINE int __debug_rwlock_destroy(rwlock_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "rwlock_destroy(%p) at %s:%u", m, file, line); - return pthread_rwlock_destroy(m); -} -INLINE int __debug_rwlock_lock_r(rwlock_t *m, const char *file, unsigned int line) { - int ret; - write_log(LOG_DEBUG, "rwlock_lock_r(%p) at %s:%u ...", m, file, line); - ret = pthread_rwlock_rdlock(m); - write_log(LOG_DEBUG, "rwlock_lock_r(%p) at %s:%u returning %i", m, file, line, ret); - return ret; -} -INLINE int __debug_rwlock_lock_w(rwlock_t *m, const char *file, unsigned int line) { - int ret; - write_log(LOG_DEBUG, "rwlock_lock_w(%p) at %s:%u ...", m, file, line); - ret = pthread_rwlock_wrlock(m); - write_log(LOG_DEBUG, "rwlock_lock_w(%p) at %s:%u returning %i", m, file, line, ret); - return ret; -} -INLINE int __debug_rwlock_unlock_r(rwlock_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "rwlock_unlock_r(%p) at %s:%u", m, file, line); - return pthread_rwlock_unlock(m); -} -INLINE int __debug_rwlock_unlock_w(rwlock_t *m, const char *file, unsigned int line) { - write_log(LOG_DEBUG, "rwlock_unlock_w(%p) at %s:%u", m, file, line); - return pthread_rwlock_unlock(m); -} - -#define __debug_cond_init(c, F, L) pthread_cond_init(c, NULL) -#define __debug_cond_wait(c, m, F, L) pthread_cond_wait(c,m) -#define __debug_cond_timedwait(c, m, t, F, L) __cond_timedwait_tv(c,m,t) -#define __debug_cond_signal(c, F, L) pthread_cond_signal(c) -#define __debug_cond_broadcast(c, F, L) pthread_cond_broadcast(c) - -#endif - - /*** THREAD HELPERS ***/ diff --git a/include/call.h b/include/call.h index a665d1e69..e9ebab8a4 100644 --- a/include/call.h +++ b/include/call.h @@ -90,12 +90,6 @@ enum call_type { #define RTP_LOOP_MAX_COUNT 30 /* number of consecutively detected dupes to trigger protection */ #endif -#ifdef __DEBUG -#define __C_DBG(x...) ilog(LOG_DEBUG, x) -#else -#define __C_DBG(x...) ((void)0) -#endif - #define IS_FOREIGN_CALL(c) (c->foreign_call) #define IS_OWN_CALL(c) !IS_FOREIGN_CALL(c) @@ -370,6 +364,7 @@ struct call_monologue { int block_dtmf:1; int block_media:1; + int rec_forwarding:1; }; struct call { @@ -408,6 +403,8 @@ struct call { int block_dtmf:1; int block_media:1; + int recording_on:1; + int rec_forwarding:1; }; diff --git a/include/call_interfaces.h b/include/call_interfaces.h index 10a0e0b4b..edbc6ada1 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -102,6 +102,8 @@ const char *call_query_ng(bencode_item_t *, bencode_item_t *); const char *call_list_ng(bencode_item_t *, bencode_item_t *); const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *); const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *); +const char *call_start_forwarding_ng(bencode_item_t *, bencode_item_t *); +const char *call_stop_forwarding_ng(bencode_item_t *, bencode_item_t *); const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_block_media_ng(bencode_item_t *, bencode_item_t *); diff --git a/include/control_ng.h b/include/control_ng.h index afe409640..ef8b3489f 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -19,6 +19,8 @@ struct control_ng_stats { int list; int start_recording; int stop_recording; + int start_forwarding; + int stop_forwarding; int block_dtmf; int unblock_dtmf; int block_media; diff --git a/include/poller.h b/include/poller.h index 8ede2f343..01f72a6ff 100644 --- a/include/poller.h +++ b/include/poller.h @@ -34,9 +34,10 @@ struct poller *poller_new(void); int poller_add_item(struct poller *, struct poller_item *); int poller_update_item(struct poller *, struct poller_item *); int poller_del_item(struct poller *, int); -void poller_blocked(struct poller *, int); -int poller_isblocked(struct poller *, int); -void poller_error(struct poller *, int); + +void poller_blocked(struct poller *, void *); +int poller_isblocked(struct poller *, void *); +void poller_error(struct poller *, void *); int poller_poll(struct poller *, int); void poller_timer_loop(void *); diff --git a/include/tcp_listener.h b/include/tcp_listener.h index 0bb297a79..327e108a7 100644 --- a/include/tcp_listener.h +++ b/include/tcp_listener.h @@ -3,6 +3,7 @@ #include "socket.h" #include "obj.h" +#include "aux.h" struct poller; diff --git a/lib/auxlib.c b/lib/auxlib.c index ceedad0b7..c2f729798 100644 --- a/lib/auxlib.c +++ b/lib/auxlib.c @@ -10,12 +10,21 @@ #ifdef HAVE_LIBSYSTEMD #include #endif +#include +#include #include "log.h" #include "loglib.h" +struct thread_buf { + char buf[THREAD_BUF_SIZE]; +}; static int version; struct rtpengine_common_config *rtpe_common_config_ptr; +__thread struct timeval rtpe_now; + +static struct thread_buf __thread t_bufs[NUM_THREAD_BUFS]; +static int __thread t_buf_idx; void daemonize(void) { @@ -200,3 +209,31 @@ out: err: die("Bad command line: %s", er->message); } + +char *get_thread_buf(void) { + char *ret; + ret = t_bufs[t_buf_idx].buf; + t_buf_idx++; + if (t_buf_idx >= G_N_ELEMENTS(t_bufs)) + t_buf_idx = 0; + return ret; +} + +unsigned int in6_addr_hash(const void *p) { + const struct in6_addr *a = p; + return a->s6_addr32[0] ^ a->s6_addr32[3]; +} + +int in6_addr_eq(const void *a, const void *b) { + const struct in6_addr *A = a, *B = b; + return !memcmp(A, B, sizeof(*A)); +} + +unsigned int uint32_hash(const void *p) { + const u_int32_t *a = p; + return *a; +} +int uint32_eq(const void *a, const void *b) { + const u_int32_t *A = a, *B = b; + return (*A == *B) ? TRUE : FALSE; +} diff --git a/lib/auxlib.h b/lib/auxlib.h index 395fee1be..59f17b1e9 100644 --- a/lib/auxlib.h +++ b/lib/auxlib.h @@ -6,6 +6,11 @@ #include "compat.h" #include + +#define THREAD_BUF_SIZE 64 +#define NUM_THREAD_BUFS 8 + + struct rtpengine_common_config { char *config_file; char *config_section; @@ -18,6 +23,18 @@ struct rtpengine_common_config { extern struct rtpengine_common_config *rtpe_common_config_ptr; + + +/*** GLOBALS ***/ + +extern __thread struct timeval rtpe_now; +extern volatile int rtpe_shutdown; + + + + +/*** PROTOTYPES ***/ + void daemonize(void); void wpidfile(void); void service_notify(const char *message); @@ -25,9 +42,163 @@ void config_load(int *argc, char ***argv, GOptionEntry *entries, const char *des char *default_config, char *default_section, struct rtpengine_common_config *); +char *get_thread_buf(void); + +unsigned int in6_addr_hash(const void *p); +int in6_addr_eq(const void *a, const void *b); +unsigned int uint32_hash(const void *p); +int uint32_eq(const void *a, const void *b); + + +/*** HELPER MACROS ***/ + +#define ZERO(x) memset(&(x), 0, sizeof(x)) + +#define UINT64F "%" G_GUINT64_FORMAT + +#define AUTO_CLEANUP(decl, func) decl __attribute__ ((__cleanup__(func))) +#define AUTO_CLEANUP_INIT(decl, func, val) AUTO_CLEANUP(decl, func) = val +#define AUTO_CLEANUP_NULL(decl, func) AUTO_CLEANUP_INIT(decl, func, 0) +#define AUTO_CLEANUP_BUF(var) AUTO_CLEANUP_NULL(char *var, free_buf) + + +/*** STRING HELPERS ***/ + INLINE void random_string(unsigned char *buf, int len) { int ret = RAND_bytes(buf, len); assert(ret == 1); } + +/*** MUTEX ABSTRACTION ***/ + +typedef pthread_mutex_t mutex_t; +typedef pthread_rwlock_t rwlock_t; +typedef pthread_cond_t cond_t; + +#define mutex_init(m) __debug_mutex_init(m, __FILE__, __LINE__) +#define mutex_destroy(m) __debug_mutex_destroy(m, __FILE__, __LINE__) +#define mutex_lock(m) __debug_mutex_lock(m, __FILE__, __LINE__) +#define mutex_trylock(m) __debug_mutex_trylock(m, __FILE__, __LINE__) +#define mutex_unlock(m) __debug_mutex_unlock(m, __FILE__, __LINE__) +#define MUTEX_STATIC_INIT PTHREAD_MUTEX_INITIALIZER + +#define rwlock_init(l) __debug_rwlock_init(l, __FILE__, __LINE__) +#define rwlock_destroy(l) __debug_rwlock_destroy(l, __FILE__, __LINE__) +#define rwlock_lock_r(l) __debug_rwlock_lock_r(l, __FILE__, __LINE__) +#define rwlock_unlock_r(l) __debug_rwlock_unlock_r(l, __FILE__, __LINE__) +#define rwlock_lock_w(l) __debug_rwlock_lock_w(l, __FILE__, __LINE__) +#define rwlock_unlock_w(l) __debug_rwlock_unlock_w(l, __FILE__, __LINE__) + +#define cond_init(c) __debug_cond_init(c, __FILE__, __LINE__) +#define cond_wait(c,m) __debug_cond_wait(c,m, __FILE__, __LINE__) +#define cond_timedwait(c,m,t) __debug_cond_timedwait(c,m,t, __FILE__, __LINE__) +#define cond_signal(c) __debug_cond_signal(c, __FILE__, __LINE__) +#define cond_broadcast(c) __debug_cond_broadcast(c, __FILE__, __LINE__) +#define COND_STATIC_INIT PTHREAD_COND_INITIALIZER + +INLINE int __cond_timedwait_tv(cond_t *c, mutex_t *m, const struct timeval *tv) { + struct timespec ts; + ts.tv_sec = tv->tv_sec; + ts.tv_nsec = tv->tv_usec * 1000; + return pthread_cond_timedwait(c, m, &ts); +} + +#ifndef __THREAD_DEBUG + +#define __debug_mutex_init(m, F, L) pthread_mutex_init(m, NULL) +#define __debug_mutex_destroy(m, F, L) pthread_mutex_destroy(m) +#define __debug_mutex_lock(m, F, L) pthread_mutex_lock(m) +#define __debug_mutex_trylock(m, F, L) pthread_mutex_trylock(m) +#define __debug_mutex_unlock(m, F, L) pthread_mutex_unlock(m) + +#define __debug_rwlock_init(l, F, L) pthread_rwlock_init(l, NULL) +#define __debug_rwlock_destroy(l, F, L) pthread_rwlock_destroy(l) +#define __debug_rwlock_lock_r(l, F, L) pthread_rwlock_rdlock(l) +#define __debug_rwlock_unlock_r(l, F, L) pthread_rwlock_unlock(l) +#define __debug_rwlock_lock_w(l, F, L) pthread_rwlock_wrlock(l) +#define __debug_rwlock_unlock_w(l, F, L) pthread_rwlock_unlock(l) + +#define __debug_cond_init(c, F, L) pthread_cond_init(c, NULL) +#define __debug_cond_wait(c, m, F, L) pthread_cond_wait(c,m) +#define __debug_cond_timedwait(c, m, t, F, L) __cond_timedwait_tv(c,m,t) +#define __debug_cond_signal(c, F, L) pthread_cond_signal(c) +#define __debug_cond_broadcast(c, F, L) pthread_cond_broadcast(c) + +#else + + +#include "log.h" + + + +INLINE int __debug_mutex_init(mutex_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "mutex_init(%p) at %s:%u", m, file, line); + return pthread_mutex_init(m, NULL); +} +INLINE int __debug_mutex_destroy(mutex_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "mutex_destroy(%p) at %s:%u", m, file, line); + return pthread_mutex_destroy(m); +} +INLINE int __debug_mutex_lock(mutex_t *m, const char *file, unsigned int line) { + int ret; + write_log(LOG_DEBUG, "mutex_lock(%p) at %s:%u ...", m, file, line); + ret = pthread_mutex_lock(m); + write_log(LOG_DEBUG, "mutex_lock(%p) at %s:%u returning %i", m, file, line, ret); + return ret; +} +INLINE int __debug_mutex_trylock(mutex_t *m, const char *file, unsigned int line) { + int ret; + write_log(LOG_DEBUG, "mutex_trylock(%p) at %s:%u ...", m, file, line); + ret = pthread_mutex_trylock(m); + write_log(LOG_DEBUG, "mutex_trylock(%p) at %s:%u returning %i", m, file, line, ret); + return ret; +} +INLINE int __debug_mutex_unlock(mutex_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "mutex_unlock(%p) at %s:%u", m, file, line); + return pthread_mutex_unlock(m); +} + +INLINE int __debug_rwlock_init(rwlock_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "rwlock_init(%p) at %s:%u", m, file, line); + return pthread_rwlock_init(m, NULL); +} +INLINE int __debug_rwlock_destroy(rwlock_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "rwlock_destroy(%p) at %s:%u", m, file, line); + return pthread_rwlock_destroy(m); +} +INLINE int __debug_rwlock_lock_r(rwlock_t *m, const char *file, unsigned int line) { + int ret; + write_log(LOG_DEBUG, "rwlock_lock_r(%p) at %s:%u ...", m, file, line); + ret = pthread_rwlock_rdlock(m); + write_log(LOG_DEBUG, "rwlock_lock_r(%p) at %s:%u returning %i", m, file, line, ret); + return ret; +} +INLINE int __debug_rwlock_lock_w(rwlock_t *m, const char *file, unsigned int line) { + int ret; + write_log(LOG_DEBUG, "rwlock_lock_w(%p) at %s:%u ...", m, file, line); + ret = pthread_rwlock_wrlock(m); + write_log(LOG_DEBUG, "rwlock_lock_w(%p) at %s:%u returning %i", m, file, line, ret); + return ret; +} +INLINE int __debug_rwlock_unlock_r(rwlock_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "rwlock_unlock_r(%p) at %s:%u", m, file, line); + return pthread_rwlock_unlock(m); +} +INLINE int __debug_rwlock_unlock_w(rwlock_t *m, const char *file, unsigned int line) { + write_log(LOG_DEBUG, "rwlock_unlock_w(%p) at %s:%u", m, file, line); + return pthread_rwlock_unlock(m); +} + +#define __debug_cond_init(c, F, L) pthread_cond_init(c, NULL) +#define __debug_cond_wait(c, m, F, L) pthread_cond_wait(c,m) +#define __debug_cond_timedwait(c, m, t, F, L) __cond_timedwait_tv(c,m,t) +#define __debug_cond_signal(c, F, L) pthread_cond_signal(c) +#define __debug_cond_broadcast(c, F, L) pthread_cond_broadcast(c) + +#endif + + + + #endif diff --git a/lib/codeclib.c b/lib/codeclib.c index 35a36c3db..cab7ba0cb 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -456,7 +456,6 @@ decoder_t *decoder_new_fmtp(const codec_def_t *def, int clockrate, int channels, ret->pts = (uint64_t) -1LL; ret->rtp_ts = (unsigned long) -1L; - ret->mixer_idx = (unsigned int) -1; return ret; @@ -488,7 +487,6 @@ void decoder_close(decoder_t *dec) { dec->def->codec_type->decoder_close(dec); resample_shutdown(&dec->resampler); - resample_shutdown(&dec->mix_resampler); g_slice_free1(sizeof(*dec), dec); } diff --git a/lib/codeclib.h b/lib/codeclib.h index 1261c29e8..4920dcd3d 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -131,8 +131,7 @@ struct decoder_s { format_t in_format, out_format; - resample_t resampler, - mix_resampler; // XXX move this out of here - specific to recording-daemon + resample_t resampler; union { struct { @@ -146,8 +145,6 @@ struct decoder_s { unsigned long rtp_ts; uint64_t pts; - - unsigned int mixer_idx; }; struct encoder_s { diff --git a/daemon/socket.c b/lib/socket.c similarity index 99% rename from daemon/socket.c rename to lib/socket.c index b1fe71f94..49c3315ce 100644 --- a/daemon/socket.c +++ b/lib/socket.c @@ -8,9 +8,8 @@ #include #include #include "str.h" -#include "media_socket.h" #include "xt_RTPENGINE.h" -#include "call.h" +#include "log.h" static int __ip4_addr_parse(sockaddr_t *dst, const char *src); static int __ip6_addr_parse(sockaddr_t *dst, const char *src); @@ -697,9 +696,10 @@ int connect_socket_retry(socket_t *r) { int ret = 0; if (r->family->connect(r, &r->remote)) { - if (errno != EINPROGRESS && errno != EALREADY) + if (errno != EINPROGRESS && errno != EALREADY && errno != EISCONN) goto fail; - ret = 1; + if (errno != EISCONN) + ret = 1; } return ret; diff --git a/include/socket.h b/lib/socket.h similarity index 99% rename from include/socket.h rename to lib/socket.h index 7a010d849..e33e238b9 100644 --- a/include/socket.h +++ b/lib/socket.h @@ -4,6 +4,8 @@ #include #include +#include +#include @@ -103,7 +105,7 @@ extern socktype_t *socktype_udp; -#include "aux.h" +#include "auxlib.h" INLINE int sockaddr_print(const sockaddr_t *a, char *buf, size_t len) { diff --git a/lib/ssllib.c b/lib/ssllib.c new file mode 100644 index 000000000..bdecb8183 --- /dev/null +++ b/lib/ssllib.c @@ -0,0 +1,51 @@ +#include "ssllib.h" +#include +#include + + +#if OPENSSL_VERSION_NUMBER < 0x10100000L +static mutex_t *openssl_locks; + +static void cb_openssl_threadid(CRYPTO_THREADID *tid) { + pthread_t me; + + me = pthread_self(); + + if (sizeof(me) == sizeof(void *)) + CRYPTO_THREADID_set_pointer(tid, (void *) me); + else + CRYPTO_THREADID_set_numeric(tid, (unsigned long) me); +} + +static void cb_openssl_lock(int mode, int type, const char *file, int line) { + if ((mode & CRYPTO_LOCK)) + mutex_lock(&openssl_locks[type]); + else + mutex_unlock(&openssl_locks[type]); +} + +static void make_OpenSSL_thread_safe(void) { + int i; + + openssl_locks = malloc(sizeof(*openssl_locks) * CRYPTO_num_locks()); + for (i = 0; i < CRYPTO_num_locks(); i++) + mutex_init(&openssl_locks[i]); + + CRYPTO_THREADID_set_callback(cb_openssl_threadid); + CRYPTO_set_locking_callback(cb_openssl_lock); +} +#else +static void make_OpenSSL_thread_safe(void) { + ; +} +#endif + + +void rtpe_ssl_init(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + srandom(ts.tv_sec ^ ts.tv_nsec); + SSL_library_init(); + SSL_load_error_strings(); + make_OpenSSL_thread_safe(); +} diff --git a/lib/ssllib.h b/lib/ssllib.h new file mode 100644 index 000000000..f0888a87d --- /dev/null +++ b/lib/ssllib.h @@ -0,0 +1,8 @@ +#ifndef __SSLLIB_H__ +#define __SSLLIB_H__ + + +void rtpe_ssl_init(void); + + +#endif diff --git a/daemon/streambuf.c b/lib/streambuf.c similarity index 74% rename from daemon/streambuf.c rename to lib/streambuf.c index 073609fbe..f6fb86308 100644 --- a/daemon/streambuf.c +++ b/lib/streambuf.c @@ -9,31 +9,48 @@ #include #include "poller.h" -#include "aux.h" +#include "auxlib.h" +static ssize_t __fd_write(void *, const void *, size_t); +static ssize_t __fd_read(void *, void *, size_t); -struct streambuf *streambuf_new(struct poller *p, int fd) { +static const struct streambuf_funcs __fd_funcs = { + .write = __fd_write, + .read = __fd_read, +}; + +static ssize_t __fd_write(void *fd, const void *b, size_t s) { + return write(GPOINTER_TO_INT(fd), b, s); +} +static ssize_t __fd_read(void *fd, void *b, size_t s) { + return read(GPOINTER_TO_INT(fd), b, s); +} + +struct streambuf *streambuf_new_ptr(struct poller *p, void *fd_ptr, const struct streambuf_funcs *funcs) { struct streambuf *b; - b = malloc(sizeof(*b)); - ZERO(*b); + b = g_slice_alloc0(sizeof(*b)); mutex_init(&b->lock); b->buf = g_string_new(""); - b->fd = fd; + b->fd_ptr = fd_ptr; b->poller = p; b->active = rtpe_now.tv_sec; + b->funcs = funcs; return b; } +struct streambuf *streambuf_new(struct poller *p, int fd) { + return streambuf_new_ptr(p, GINT_TO_POINTER(fd), &__fd_funcs); +} void streambuf_destroy(struct streambuf *b) { g_string_free(b->buf, TRUE); - free(b); + g_slice_free1(sizeof(*b), b); } @@ -48,7 +65,7 @@ int streambuf_writeable(struct streambuf *b) { break; out = (b->buf->len > 1024) ? 1024 : b->buf->len; - ret = write(b->fd, b->buf->str, out); + ret = b->funcs->write(b->fd_ptr, b->buf->str, out); if (ret < 0) { if (errno == EINTR) @@ -66,7 +83,7 @@ int streambuf_writeable(struct streambuf *b) { } if (ret != out) { - poller_blocked(b->poller, b->fd); + poller_blocked(b->poller, b->fd_ptr); break; } } @@ -82,7 +99,7 @@ int streambuf_readable(struct streambuf *b) { mutex_lock(&b->lock); for (;;) { - ret = read(b->fd, buf, 1024); + ret = b->funcs->read(b->fd_ptr, buf, 1024); if (ret == 0) { // don't discard already read data in the buffer @@ -187,20 +204,23 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { unsigned int out; int ret; + if (!b) + return; + mutex_lock(&b->lock); - while (len && !poller_isblocked(b->poller, b->fd)) { + while (len && !poller_isblocked(b->poller, b->fd_ptr)) { out = (len > 1024) ? 1024 : len; - ret = write(b->fd, s, out); + ret = b->funcs->write(b->fd_ptr, s, out); if (ret < 0) { if (errno == EINTR) continue; if (errno != EAGAIN && errno != EWOULDBLOCK) { - poller_error(b->poller, b->fd); + poller_error(b->poller, b->fd_ptr); break; } - poller_blocked(b->poller, b->fd); + poller_blocked(b->poller, b->fd_ptr); break; } if (ret == 0) @@ -212,7 +232,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { } if (b->buf->len > 5242880) - poller_error(b->poller, b->fd); + poller_error(b->poller, b->fd_ptr); else if (len) g_string_append_len(b->buf, s, len); diff --git a/include/streambuf.h b/lib/streambuf.h similarity index 76% rename from include/streambuf.h rename to lib/streambuf.h index 0b79d95e2..e47810b5d 100644 --- a/include/streambuf.h +++ b/lib/streambuf.h @@ -10,7 +10,7 @@ #include "compat.h" #include "str.h" -#include "aux.h" +#include "auxlib.h" @@ -18,18 +18,25 @@ struct poller; +struct streambuf_funcs { + ssize_t (*write)(void *, const void *, size_t); + ssize_t (*read)(void *, void *, size_t); +}; struct streambuf { mutex_t lock; GString *buf; - int fd; + void *fd_ptr; struct poller *poller; time_t active; int eof; + const struct streambuf_funcs + *funcs; }; struct streambuf *streambuf_new(struct poller *, int); +struct streambuf *streambuf_new_ptr(struct poller *, void *, const struct streambuf_funcs *); void streambuf_destroy(struct streambuf *); int streambuf_writeable(struct streambuf *); int streambuf_readable(struct streambuf *); diff --git a/recording-daemon/.gitignore b/recording-daemon/.gitignore index 828e6510e..ce030d02c 100644 --- a/recording-daemon/.gitignore +++ b/recording-daemon/.gitignore @@ -11,3 +11,5 @@ codeclib.c resample.c str.c fix_frame_channel_layout.h +socket.c +streambuf.c diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index a117f0828..f686b92df 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -1,6 +1,6 @@ TARGET= rtpengine-recording -CFLAGS= -g -Wall -pthread -I. -I../lib/ +CFLAGS= -g -Wall -pthread -I. -I../lib/ -I../kernel-module/ CFLAGS+= -std=c99 CFLAGS+= -D_GNU_SOURCE -D_POSIX_SOURCE -D_POSIX_C_SOURCE CFLAGS+= $(shell pkg-config --cflags glib-2.0) @@ -25,8 +25,8 @@ LDLIBS+= $(shell mysql_config --libs) LDLIBS+= $(shell pkg-config --libs openssl) SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \ - decoder.c output.c mix.c db.c log.c forward.c tag.c -LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.c resample.c str.c + decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c +LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.c resample.c str.c socket.c streambuf.c ssllib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) include ../lib/common.Makefile diff --git a/recording-daemon/db.c b/recording-daemon/db.c index 7b183c4ba..80d4862e7 100644 --- a/recording-daemon/db.c +++ b/recording-daemon/db.c @@ -214,7 +214,7 @@ static void db_do_call_id(metafile_t *mf) { execute_wrap(&stm_insert_call, b, &mf->db_id); } static void db_do_call_metadata(metafile_t *mf) { - if (!mf->metadata) + if (!mf->metadata_db) return; if (mf->db_id == 0) return; @@ -224,7 +224,7 @@ static void db_do_call_metadata(metafile_t *mf) { // XXX offload this parsing to proxy module -> bencode list/dictionary str all_meta; - str_init(&all_meta, mf->metadata); + str_init(&all_meta, mf->metadata_db); while (all_meta.len > 1) { str token; if (str_token_sep(&token, &all_meta, '|')) @@ -242,7 +242,7 @@ static void db_do_call_metadata(metafile_t *mf) { execute_wrap(&stm_insert_metadata, b, NULL); } - mf->metadata = NULL; + mf->metadata_db = NULL; } void db_do_call(metafile_t *mf) { diff --git a/recording-daemon/decoder.c b/recording-daemon/decoder.c index 603683fef..d97c955d7 100644 --- a/recording-daemon/decoder.c +++ b/recording-daemon/decoder.c @@ -15,13 +15,16 @@ #include "mix.h" #include "resample.h" #include "codeclib.h" +#include "streambuf.h" +#include "main.h" +#include "packet.h" int resample_audio; -decoder_t *decoder_new(const char *payload_str, output_t *outp) { +decode_t *decoder_new(const char *payload_str, output_t *outp) { str name; char *slash = strchr(payload_str, '/'); if (!slash) { @@ -69,13 +72,22 @@ decoder_t *decoder_new(const char *payload_str, output_t *outp) { outp->encoder->requested_format.format = out_format.format; } - return decoder_new_fmt(def, clockrate, channels, &out_format); + decoder_t *dec = decoder_new_fmt(def, clockrate, channels, &out_format); + if (!dec) + return NULL; + decode_t *deco = g_slice_alloc0(sizeof(decode_t)); + deco->dec = dec; + deco->mixer_idx = (unsigned int) -1; + return deco; } -static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *op, void *mp) { - metafile_t *metafile = mp; - output_t *output = op; +static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp) { + ssrc_t *ssrc = sp; + metafile_t *metafile = ssrc->metafile; + output_t *output = ssrc->output; + stream_t *stream = ssrc->stream; + decode_t *deco = dp; dbg("got frame pts %llu samples %u contents %02x%02x%02x%02x...", (unsigned long long) frame->pts, frame->nb_samples, (unsigned int) frame->extended_data[0][0], @@ -83,34 +95,66 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *op, void *mp) (unsigned int) frame->extended_data[0][2], (unsigned int) frame->extended_data[0][3]); + if (!metafile->recording_on) + goto no_recording; + // handle mix output pthread_mutex_lock(&metafile->mix_lock); if (metafile->mix_out) { - if (G_UNLIKELY(dec->mixer_idx == (unsigned int) -1)) - dec->mixer_idx = mix_get_index(metafile->mix); + dbg("adding packet from stream #%lu to mix output", stream->id); + if (G_UNLIKELY(deco->mixer_idx == (unsigned int) -1)) + deco->mixer_idx = mix_get_index(metafile->mix); format_t actual_format; if (output_config(metafile->mix_out, &dec->out_format, &actual_format)) goto no_mix_out; mix_config(metafile->mix, &actual_format); // XXX might be a second resampling to same format - AVFrame *dec_frame = resample_frame(&dec->mix_resampler, frame, &actual_format); + AVFrame *dec_frame = resample_frame(&deco->mix_resampler, frame, &actual_format); if (!dec_frame) { pthread_mutex_unlock(&metafile->mix_lock); goto err; } - if (mix_add(metafile->mix, dec_frame, dec->mixer_idx, metafile->mix_out)) + if (mix_add(metafile->mix, dec_frame, deco->mixer_idx, metafile->mix_out)) ilog(LOG_ERR, "Failed to add decoded packet to mixed output"); } no_mix_out: pthread_mutex_unlock(&metafile->mix_lock); if (output) { + dbg("SSRC %lx of stream #%lu has single output", ssrc->ssrc, stream->id); if (output_config(output, &dec->out_format, NULL)) goto err; if (output_add(output, frame)) ilog(LOG_ERR, "Failed to add decoded packet to individual output"); } +no_recording: + if (ssrc->tls_fwd_stream) { + // XXX might be a second resampling to same format + dbg("SSRC %lx of stream #%lu has TLS forwarding stream", ssrc->ssrc, stream->id); + AVFrame *dec_frame = resample_frame(&ssrc->tls_fwd_resampler, frame, &ssrc->tls_fwd_format); + + ssrc_tls_state(ssrc); + + if (!ssrc->sent_intro) { + if (metafile->metadata) { + dbg("Writing metadata header to TLS"); + streambuf_write(ssrc->tls_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1); + } + else { + ilog(LOG_WARN, "No metadata present for forwarding connection"); + streambuf_write(ssrc->tls_fwd_stream, "\0", 1); + } + ssrc->sent_intro = 1; + } + + dbg("Writing %u bytes PCM to TLS", dec_frame->linesize[0]); + streambuf_write(ssrc->tls_fwd_stream, (char *) dec_frame->extended_data[0], + dec_frame->linesize[0]); + av_frame_free(&dec_frame); + + } + av_frame_free(&frame); return 0; @@ -120,6 +164,14 @@ err: } -int decoder_input(decoder_t *dec, const str *data, unsigned long ts, output_t *output, metafile_t *metafile) { - return decoder_input_data(dec, data, ts, decoder_got_frame, output, metafile); +int decoder_input(decode_t *deco, const str *data, unsigned long ts, ssrc_t *ssrc) { + return decoder_input_data(deco->dec, data, ts, decoder_got_frame, ssrc, deco); +} + +void decoder_free(decode_t *deco) { + if (!deco) + return; + decoder_close(deco->dec); + resample_shutdown(&deco->mix_resampler); + g_slice_free1(sizeof(*deco), deco); } diff --git a/recording-daemon/decoder.h b/recording-daemon/decoder.h index 7f2ba6da4..955d509ee 100644 --- a/recording-daemon/decoder.h +++ b/recording-daemon/decoder.h @@ -8,8 +8,9 @@ extern int resample_audio; -decoder_t *decoder_new(const char *payload_str, output_t *); -int decoder_input(decoder_t *, const str *, unsigned long ts, output_t *, metafile_t *); +decode_t *decoder_new(const char *payload_str, output_t *); +int decoder_input(decode_t *, const str *, unsigned long ts, ssrc_t *); +void decoder_free(decode_t *); #endif diff --git a/recording-daemon/log.h b/recording-daemon/log.h index f005174c4..bb6574823 100644 --- a/recording-daemon/log.h +++ b/recording-daemon/log.h @@ -9,6 +9,7 @@ #include #define dbg(fmt, ...) ilog(LOG_DEBUG, "[%s:%i] " fmt, __FILE__, __LINE__, ##__VA_ARGS__) +#define __C_DBG(x...) ilog(LOG_DEBUG, x) void __ilog(int prio, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); diff --git a/recording-daemon/main.c b/recording-daemon/main.c index 03d03f030..42a9f6632 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -24,6 +24,8 @@ #include "output.h" #include "forward.h" #include "codeclib.h" +#include "socket.h" +#include "ssllib.h" @@ -36,12 +38,16 @@ static const char *output_format = "wav"; int output_mixed; int output_single; int output_enabled = 1; +int decoding_enabled; const char *c_mysql_host, *c_mysql_user, *c_mysql_pass, *c_mysql_db; int c_mysql_port; const char *forward_to = NULL; +static const char *tls_send_to = NULL; +endpoint_t tls_send_to_ep; +int tls_resample = 8000; static GQueue threads = G_QUEUE_INIT; // only accessed from main thread @@ -65,8 +71,11 @@ static void signals(void) { static void setup(void) { log_init("rtpengine-recording"); - if (output_enabled) { + rtpe_ssl_init(); + socket_init(); + if (decoding_enabled) codeclib_init(0); + if (output_enabled) { output_init(output_format); if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) { ilog(LOG_INFO, "Creating output dir '%s'", output_dir); @@ -153,24 +162,35 @@ static void options(int *argc, char ***argv) { { "mysql-user", 0, 0, G_OPTION_ARG_STRING, &c_mysql_user, "MySQL connection credentials", "USERNAME" }, { "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &c_mysql_pass, "MySQL connection credentials", "PASSWORD" }, { "mysql-db", 0, 0, G_OPTION_ARG_STRING, &c_mysql_db, "MySQL database name", "STRING" }, - { "forward-to", 0, 0, G_OPTION_ARG_STRING, &forward_to, "Where to forward to (unix socket)", "PATH" }, + { "forward-to", 0, 0, G_OPTION_ARG_STRING, &forward_to, "Where to forward to (unix socket)", "PATH" }, + { "tls-send-to", 0, 0, G_OPTION_ARG_STRING, &tls_send_to, "Where to send to (TLS destination)", "IP:PORT" }, + { "tls-resample", 0, 0, G_OPTION_ARG_INT, &tls_resample, "Sampling rate for TLS PCM output", "INT" }, { NULL, } }; config_load(argc, argv, e, " - rtpengine recording daemon", "/etc/rtpengine/rtpengine-recording.conf", "rtpengine-recording", &rtpe_common_config); + if (tls_send_to) { + if (endpoint_parse_any_getaddrinfo_full(&tls_send_to_ep, tls_send_to)) + die("Failed to parse 'tls-send-to' option"); + } + if (!strcmp(output_format, "none")) { output_enabled = 0; if (output_mixed || output_single) die("Output is disabled, but output-mixed or output-single is set"); - if (!forward_to) { + if (!forward_to && !tls_send_to_ep.port) { //the daemon has no function - die("Both output and packet forwarding are disabled"); + die("Both output and forwarding are disabled"); } + output_format = NULL; } else if (!output_mixed && !output_single) output_mixed = output_single = 1; + if (output_enabled || tls_send_to_ep.port) + decoding_enabled = 1; + if (!os_str || !strcmp(os_str, "file")) output_storage = OUTPUT_STORAGE_FILE; else if (!strcmp(os_str, "db")) diff --git a/recording-daemon/main.h b/recording-daemon/main.h index 2973ea17c..aa683b4b0 100644 --- a/recording-daemon/main.h +++ b/recording-daemon/main.h @@ -3,6 +3,7 @@ #include "auxlib.h" +#include "socket.h" enum output_storage_enum { @@ -19,12 +20,15 @@ extern const char *output_dir; extern int output_mixed; extern int output_single; extern int output_enabled; +extern int decoding_enabled; extern const char *c_mysql_host, *c_mysql_user, *c_mysql_pass, *c_mysql_db; extern int c_mysql_port; extern const char *forward_to; +extern endpoint_t tls_send_to_ep; +extern int tls_resample; extern volatile int shutdown_flag; diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index 5e1d88f37..451add550 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -70,9 +70,9 @@ static void meta_destroy(metafile_t *mf) { // mf is locked static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) { db_do_call(mf); - if (output_enabled) { + if (output_enabled && output_mixed) { pthread_mutex_lock(&mf->mix_lock); - if (!mf->mix && output_mixed) { + if (!mf->mix) { char buf[256]; snprintf(buf, sizeof(buf), "%s-mix", mf->parent); mf->mix_out = output_new(output_dir, buf); @@ -107,7 +107,7 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i ilog(LOG_ERR, "Payload type number %u is invalid", payload_num); return; } - if (output_enabled) { + if (decoding_enabled) { pthread_mutex_lock(&mf->payloads_lock); mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc, payload_type); @@ -119,6 +119,7 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i // mf is locked static void meta_metadata(metafile_t *mf, char *content) { mf->metadata = g_string_chunk_insert(mf->gsc, content); + mf->metadata_db = mf->metadata; db_do_call(mf); if (forward_to) start_forwarding_capture(mf, content); @@ -146,6 +147,12 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned tag_name(mf, lu, content); else if (sscanf_match(section, "LABEL %lu", &lu) == 1) tag_label(mf, lu, content); + else if (sscanf_match(section, "RECORDING %u", &u) == 1) + mf->recording_on = u ? 1 : 0; + else if (sscanf_match(section, "FORWARDING %u", &u) == 1) + mf->forwarding_on = u ? 1 : 0; + else if (sscanf_match(section, "STREAM %lu FORWARDING %u", &lu, &u) == 2) + stream_forwarding_on(mf, lu, u); } @@ -167,8 +174,9 @@ static metafile_t *metafile_get(char *name) { mf->forward_fd = -1; mf->forward_count = 0; mf->forward_failed = 0; + mf->recording_on = 1; - if (output_enabled) { + if (decoding_enabled) { pthread_mutex_init(&mf->payloads_lock, NULL); pthread_mutex_init(&mf->mix_lock, NULL); mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free); diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 301ea3323..6a3e6dc38 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -4,6 +4,7 @@ #include #include #include +#include #include "types.h" #include "log.h" #include "rtplib.h" @@ -13,8 +14,65 @@ #include "main.h" #include "output.h" #include "db.h" +#include "streambuf.h" +#include "resample.h" +static ssize_t ssrc_tls_write(void *, const void *, size_t); +static ssize_t ssrc_tls_read(void *, void *, size_t); + +static struct streambuf_funcs ssrc_tls_funcs = { + .write = ssrc_tls_write, + .read = ssrc_tls_read, +}; + +static void ssrc_tls_log_errors(void) { + int i; + char err[160]; + while ((i = ERR_get_error())) { + ERR_error_string(i, err); + dbg("TLS error: %s", err); + } +} + +static int ssrc_tls_check_blocked(SSL *ssl, int ret) { + if (!ssl) + return 0; + int err = SSL_get_error(ssl, ret); + dbg("TLS error code: %i -> %i", ret, err); + switch (err) { + case SSL_ERROR_ZERO_RETURN: + return 0; // eof + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_CONNECT: + case SSL_ERROR_WANT_ACCEPT: + errno = EAGAIN; + return -1; + case SSL_ERROR_SYSCALL: + return -1; + } + errno = EFAULT; + return -1; +} + +static ssize_t ssrc_tls_write(void *fd, const void *b, size_t s) { + SSL *ssl = fd; + ssrc_tls_log_errors(); + int ret = SSL_write(ssl, b, s); + if (ret > 0) + return ret; + return ssrc_tls_check_blocked(ssl, ret); +} +static ssize_t ssrc_tls_read(void *fd, void *b, size_t s) { + SSL *ssl = fd; + ssrc_tls_log_errors(); + int ret = SSL_read(ssl, b, s); + if (ret > 0) + return ret; + return ssrc_tls_check_blocked(ssl, ret); +} + static void packet_free(void *p) { packet_t *packet = p; if (!packet) @@ -24,12 +82,73 @@ static void packet_free(void *p) { } +static void ssrc_tls_shutdown(ssrc_t *ssrc) { + streambuf_destroy(ssrc->tls_fwd_stream); + ssrc->tls_fwd_stream = NULL; + resample_shutdown(&ssrc->tls_fwd_resampler); + if (ssrc->ssl) + SSL_free(ssrc->ssl); + ssrc->ssl = NULL; + if (ssrc->ssl_ctx) + SSL_CTX_free(ssrc->ssl_ctx); + ssrc->ssl_ctx = NULL; + close_socket(&ssrc->tls_fwd_sock); + ssrc->sent_intro = 0; +} + + +void ssrc_tls_state(ssrc_t *ssrc) { + int ret; + + ssrc_tls_log_errors(); + if (ssrc->tls_fwd_poller.state == PS_CONNECTING) { + int status = connect_socket_retry(&ssrc->tls_fwd_sock); + if (status == 0) { + dbg("TLS connection to %s doing handshake", + endpoint_print_buf(&tls_send_to_ep)); + ssrc->tls_fwd_poller.state = PS_HANDSHAKE; + if ((ret = SSL_connect(ssrc->ssl)) == 1) { + dbg("TLS connection to %s established", + endpoint_print_buf(&tls_send_to_ep)); + ssrc->tls_fwd_poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd_stream); + } + else + ssrc_tls_check_blocked(ssrc->ssl, ret); + } + else if (status < 0) { + ilog(LOG_ERR, "Failed to connect TLS socket: %s", strerror(errno)); + ssrc_tls_shutdown(ssrc); + } + } + else if (ssrc->tls_fwd_poller.state == PS_HANDSHAKE) { + if ((ret = SSL_connect(ssrc->ssl)) == 1) { + dbg("TLS connection to %s established", + endpoint_print_buf(&tls_send_to_ep)); + ssrc->tls_fwd_poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd_stream); + } + else + ssrc_tls_check_blocked(ssrc->ssl, ret); + } + else if (ssrc->tls_fwd_poller.state == PS_WRITE_BLOCKED) { + ssrc->tls_fwd_poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd_stream); + } + else if (ssrc->tls_fwd_poller.state == PS_ERROR) + ssrc_tls_shutdown(ssrc); + ssrc_tls_log_errors(); +} + + void ssrc_free(void *p) { ssrc_t *s = p; packet_sequencer_destroy(&s->sequencer); output_close(s->output); for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++) - decoder_close(s->decoders[i]); + decoder_free(s->decoders[i]); + if (s->tls_fwd_stream) + ssrc_tls_shutdown(s); g_slice_free1(sizeof(*s), s); } @@ -49,18 +168,66 @@ static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) { ret->ssrc = ssrc; packet_sequencer_init(&ret->sequencer, packet_free); - char buf[256]; - snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); - if (output_single) { - ret->output = output_new(output_dir, buf); - db_do_stream(mf, ret->output, "single", stream, ssrc); - } - g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret); out: pthread_mutex_lock(&ret->lock); pthread_mutex_unlock(&mf->lock); + + dbg("Init for SSRC %lx of stream #%lu", ret->ssrc, stream->id); + + if (mf->recording_on && !ret->output && output_single) { + char buf[256]; + snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); + ret->output = output_new(output_dir, buf); + db_do_stream(mf, ret->output, "single", stream, ssrc); + } + if ((stream->forwarding_on || mf->forwarding_on) && !ret->tls_fwd_stream) { + // initialise the connection + ZERO(ret->tls_fwd_poller); + dbg("Starting TLS connection to %s", endpoint_print_buf(&tls_send_to_ep)); + ret->ssl_ctx = SSL_CTX_new(TLS_client_method()); + if (!ret->ssl_ctx) { + ilog(LOG_ERR, "Failed to create TLS context"); + ssrc_tls_shutdown(ret); + goto tls_out; + } + ret->ssl = SSL_new(ret->ssl_ctx); + if (!ret->ssl) { + ilog(LOG_ERR, "Failed to create TLS connection"); + ssrc_tls_shutdown(ret); + goto tls_out; + } + int status = connect_socket_nb(&ret->tls_fwd_sock, SOCK_STREAM, &tls_send_to_ep); + if (status < 0) { + ilog(LOG_ERR, "Failed to open/connect TLS socket to %s: %s", + endpoint_print_buf(&tls_send_to_ep), + strerror(errno)); + ssrc_tls_shutdown(ret); + goto tls_out; + } + + ret->tls_fwd_poller.state = PS_CONNECTING; + if (SSL_set_fd(ret->ssl, ret->tls_fwd_sock.fd) != 1) { + ilog(LOG_ERR, "Failed to set TLS fd"); + ssrc_tls_shutdown(ret); + goto tls_out; + } + ret->tls_fwd_stream = streambuf_new_ptr(&ret->tls_fwd_poller, ret->ssl, &ssrc_tls_funcs); + + ssrc_tls_state(ret); + + ret->tls_fwd_format = (format_t) { + .clockrate = tls_resample, + .channels = 1, + .format = AV_SAMPLE_FMT_S16, + }; +tls_out: + ; + } + else if (!(stream->forwarding_on || mf->forwarding_on) && ret->tls_fwd_stream) + ssrc_tls_shutdown(ret); + return ret; } @@ -103,7 +270,7 @@ static void packet_decode(ssrc_t *ssrc, packet_t *packet) { } if (decoder_input(ssrc->decoders[payload_type], &packet->payload, ntohl(packet->rtp->timestamp), - ssrc->output, ssrc->metafile)) + ssrc)) ilog(LOG_ERR, "Failed to decode media packet"); } diff --git a/recording-daemon/packet.h b/recording-daemon/packet.h index c261189b3..31b0c1640 100644 --- a/recording-daemon/packet.h +++ b/recording-daemon/packet.h @@ -7,4 +7,6 @@ void ssrc_free(void *p); void packet_process(stream_t *, unsigned char *, unsigned len); +void ssrc_tls_state(ssrc_t *ssrc); + #endif diff --git a/recording-daemon/poller.c b/recording-daemon/poller.c new file mode 100644 index 000000000..48d75e3b7 --- /dev/null +++ b/recording-daemon/poller.c @@ -0,0 +1,11 @@ +#include "poller.h" + +void poller_blocked(struct poller *p, void *fdp) { + p->state = PS_WRITE_BLOCKED; +} +int poller_isblocked(struct poller *p, void *fdp) { + return p->state != PS_OPEN; +} +void poller_error(struct poller *p, void *fdp) { + p->state = PS_ERROR; +} diff --git a/recording-daemon/poller.h b/recording-daemon/poller.h new file mode 100644 index 000000000..887a52c16 --- /dev/null +++ b/recording-daemon/poller.h @@ -0,0 +1,22 @@ +#ifndef __POLLER_H__ +#define __POLLER_H__ + + +// dummy poller +struct poller { + enum { + PS_CLOSED = 0, + PS_CONNECTING, + PS_HANDSHAKE, + PS_OPEN, + PS_WRITE_BLOCKED, + PS_ERROR, + } state; +}; + +void poller_blocked(struct poller *, void *); +int poller_isblocked(struct poller *, void *); +void poller_error(struct poller *, void *); + + +#endif diff --git a/recording-daemon/stream.c b/recording-daemon/stream.c index eba560aee..93111f650 100644 --- a/recording-daemon/stream.c +++ b/recording-daemon/stream.c @@ -75,7 +75,7 @@ static void stream_handler(handler_t *handler) { else g_atomic_int_inc(&stream->metafile->forward_count); } - if (output_enabled) + if (decoding_enabled) packet_process(stream, buf, ret); // consumes buf else free(buf); @@ -140,3 +140,9 @@ void stream_details(metafile_t *mf, unsigned long id, unsigned int tag) { stream_t *stream = stream_get(mf, id); stream->tag = tag; } + +void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on) { + stream_t *stream = stream_get(mf, id); + dbg("Setting forwarding flag to %u for stream #%lu", on, stream->id); + stream->forwarding_on = on ? 1 : 0; +} diff --git a/recording-daemon/stream.h b/recording-daemon/stream.h index e1859cc86..499c99eb6 100644 --- a/recording-daemon/stream.h +++ b/recording-daemon/stream.h @@ -5,6 +5,7 @@ void stream_open(metafile_t *mf, unsigned long id, char *name); void stream_details(metafile_t *mf, unsigned long id, unsigned int tag); +void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on); void stream_close(stream_t *stream); void stream_free(stream_t *stream); diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 5215e5444..5217ba7c2 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -10,14 +10,19 @@ #include #include #include +#include +#include #include "str.h" #include "codeclib.h" +#include "poller.h" +#include "socket.h" struct iphdr; struct ip6_hdr; struct udphdr; struct rtp_header; +struct streambuf; struct handler_s; @@ -28,6 +33,8 @@ struct output_s; typedef struct output_s output_t; struct mix_s; typedef struct mix_s mix_t; +struct decode_s; +typedef struct decode_s decode_t; typedef void handler_func(handler_t *); @@ -47,6 +54,7 @@ struct stream_s { unsigned long tag; int fd; handler_t handler; + int forwarding_on:1; }; typedef struct stream_s stream_t; @@ -71,8 +79,19 @@ struct ssrc_s { metafile_t *metafile; unsigned long ssrc; packet_sequencer_t sequencer; - decoder_t *decoders[128]; + decode_t *decoders[128]; output_t *output; + + // TLS output + format_t tls_fwd_format; + resample_t tls_fwd_resampler; + socket_t tls_fwd_sock; + //BIO *bio; + SSL_CTX *ssl_ctx; + SSL *ssl; + struct streambuf *tls_fwd_stream; + struct poller tls_fwd_poller; + int sent_intro:1; }; typedef struct ssrc_s ssrc_t; @@ -91,6 +110,7 @@ struct metafile_s { char *parent; char *call_id; char *metadata; + char *metadata_db; off_t pos; unsigned long long db_id; @@ -110,6 +130,9 @@ struct metafile_s { pthread_mutex_t payloads_lock; char *payload_types[128]; + + int recording_on:1; + int forwarding_on:1; }; @@ -135,5 +158,12 @@ struct output_s { }; +struct decode_s { + decoder_t *dec; + resample_t mix_resampler; + unsigned int mixer_idx; +}; + + #endif diff --git a/t/Makefile b/t/Makefile index 7a52d85cf..f0f904f27 100644 --- a/t/Makefile +++ b/t/Makefile @@ -62,10 +62,10 @@ SRCS+= transcode-test.c ifeq ($(with_amr_tests),yes) SRCS+= amr-decode-test.c amr-encode-test.c endif -LIBSRCS+= codeclib.c resample.c -DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c socket.c poller.c \ +LIBSRCS+= codeclib.c resample.c socket.c streambuf.c +DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ - streambuf.c cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c + cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif diff --git a/t/log.h b/t/log.h index 2ebaa381b..3bfc3bb5e 100644 --- a/t/log.h +++ b/t/log.h @@ -3,6 +3,7 @@ #include "loglib.h" #define __ilog(prio, fmt, ...) fprintf(stderr, fmt "\n", ##__VA_ARGS__) +#define __C_DBG(x...) ilog(LOG_DEBUG, x) INLINE void rtcplog(const char *x) { }