diff --git a/recording-daemon/decoder.c b/recording-daemon/decoder.c index 66efff075..86caf9fb8 100644 --- a/recording-daemon/decoder.c +++ b/recording-daemon/decoder.c @@ -117,7 +117,7 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp) } no_recording: - if (ssrc->tls_fwd_stream) { + if (ssrc->tls_fwd.stream) { // XXX might be a second resampling to same format dbg("SSRC %lx of stream #%lu has TLS forwarding stream", ssrc->ssrc, stream->id); @@ -125,12 +125,12 @@ no_recording: // if we're in the middle of a disconnect then ssrc_tls_state may have destroyed the streambuf // so we need to skip the below to ensure we only send metadata for the new connection // once we've got a new streambuf - if (!ssrc->tls_fwd_stream) + if (!ssrc->tls_fwd.stream) goto err; - AVFrame *dec_frame = resample_frame(&ssrc->tls_fwd_resampler, frame, &ssrc->tls_fwd_format); + AVFrame *dec_frame = resample_frame(&ssrc->tls_fwd.resampler, frame, &ssrc->tls_fwd.format); - if (!ssrc->sent_intro) { + if (!ssrc->tls_fwd.sent_intro) { tag_t *tag = NULL; if (ssrc->stream) @@ -138,27 +138,27 @@ no_recording: if (tag && tag->metadata) { dbg("Writing tag metadata header to TLS"); - streambuf_write(ssrc->tls_fwd_stream, tag->metadata, strlen(tag->metadata) + 1); + streambuf_write(ssrc->tls_fwd.stream, tag->metadata, strlen(tag->metadata) + 1); } else if (metafile->metadata) { dbg("Writing call metadata header to TLS"); - streambuf_write(ssrc->tls_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1); + streambuf_write(ssrc->tls_fwd.stream, metafile->metadata, strlen(metafile->metadata) + 1); } else { ilog(LOG_WARN, "No metadata present for forwarding connection"); - streambuf_write(ssrc->tls_fwd_stream, "\0", 1); + streambuf_write(ssrc->tls_fwd.stream, "\0", 1); } - ssrc->sent_intro = 1; + ssrc->tls_fwd.sent_intro = 1; } ssrc_tls_fwd_silence_frames_upto(ssrc, dec_frame, dec_frame->pts); uint64_t next_pts = dec_frame->pts + dec_frame->nb_samples; - if (next_pts > ssrc->tls_in_pts) - ssrc->tls_in_pts = next_pts; + if (next_pts > ssrc->tls_fwd.in_pts) + ssrc->tls_fwd.in_pts = next_pts; int linesize = av_get_bytes_per_sample(dec_frame->format) * dec_frame->nb_samples; dbg("Writing %u bytes PCM to TLS", linesize); - streambuf_write(ssrc->tls_fwd_stream, (char *) dec_frame->extended_data[0], linesize); + streambuf_write(ssrc->tls_fwd.stream, (char *) dec_frame->extended_data[0], linesize); if (dec_frame != frame) av_frame_free(&dec_frame); diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index af07481ec..26dad4600 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -43,7 +43,7 @@ void ssrc_close(ssrc_t *s) { void ssrc_free(void *p) { ssrc_t *s = p; - av_frame_free(&s->tls_silence_frame); + av_frame_free(&s->tls_fwd.silence_frame); packet_sequencer_destroy(&s->sequencer); ssrc_close(s); g_free(s); diff --git a/recording-daemon/tls_send.c b/recording-daemon/tls_send.c index e68796b4a..2c679273d 100644 --- a/recording-daemon/tls_send.c +++ b/recording-daemon/tls_send.c @@ -68,21 +68,21 @@ static ssize_t ssrc_tls_read(void *fd, void *b, size_t s) { void ssrc_tls_shutdown(ssrc_t *ssrc) { - if (!ssrc->tls_fwd_stream) + if (!ssrc->tls_fwd.stream) return; - streambuf_destroy(ssrc->tls_fwd_stream); - ssrc->tls_fwd_stream = NULL; - resample_shutdown(&ssrc->tls_fwd_resampler); - if (ssrc->ssl) { - SSL_free(ssrc->ssl); - ssrc->ssl = NULL; + streambuf_destroy(ssrc->tls_fwd.stream); + ssrc->tls_fwd.stream = NULL; + resample_shutdown(&ssrc->tls_fwd.resampler); + if (ssrc->tls_fwd.ssl) { + SSL_free(ssrc->tls_fwd.ssl); + ssrc->tls_fwd.ssl = NULL; } - if (ssrc->ssl_ctx) { - SSL_CTX_free(ssrc->ssl_ctx); - ssrc->ssl_ctx = NULL; + if (ssrc->tls_fwd.ssl_ctx) { + SSL_CTX_free(ssrc->tls_fwd.ssl_ctx); + ssrc->tls_fwd.ssl_ctx = NULL; } - close_socket(&ssrc->tls_fwd_sock); - ssrc->sent_intro = 0; + close_socket(&ssrc->tls_fwd.sock); + ssrc->tls_fwd.sent_intro = 0; } @@ -90,24 +90,24 @@ void ssrc_tls_state(ssrc_t *ssrc) { int ret; ssrc_tls_log_errors(); - if (ssrc->tls_fwd_poller.state == PS_CONNECTING) { - int status = connect_socket_retry(&ssrc->tls_fwd_sock); + if (ssrc->tls_fwd.poller.state == PS_CONNECTING) { + int status = connect_socket_retry(&ssrc->tls_fwd.sock); if (status == 0) { if (tls_disable) { - ssrc->tls_fwd_poller.state = PS_OPEN; - streambuf_writeable(ssrc->tls_fwd_stream); + ssrc->tls_fwd.poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd.stream); } else { dbg("TLS connection to %s doing handshake", endpoint_print_buf(&tls_send_to_ep)); - ssrc->tls_fwd_poller.state = PS_HANDSHAKE; - if ((ret = SSL_connect(ssrc->ssl)) == 1) { + ssrc->tls_fwd.poller.state = PS_HANDSHAKE; + if ((ret = SSL_connect(ssrc->tls_fwd.ssl)) == 1) { dbg("TLS connection to %s established", endpoint_print_buf(&tls_send_to_ep)); - ssrc->tls_fwd_poller.state = PS_OPEN; - streambuf_writeable(ssrc->tls_fwd_stream); + ssrc->tls_fwd.poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd.stream); } else - ssrc_tls_check_blocked(ssrc->ssl, ret); + ssrc_tls_check_blocked(ssrc->tls_fwd.ssl, ret); } } else if (status < 0) { @@ -115,67 +115,67 @@ void ssrc_tls_state(ssrc_t *ssrc) { ssrc_tls_shutdown(ssrc); } } - else if (ssrc->tls_fwd_poller.state == PS_HANDSHAKE) { + else if (ssrc->tls_fwd.poller.state == PS_HANDSHAKE) { if (!tls_disable) { - if ((ret = SSL_connect(ssrc->ssl)) == 1) { + if ((ret = SSL_connect(ssrc->tls_fwd.ssl)) == 1) { dbg("TLS connection to %s established", endpoint_print_buf(&tls_send_to_ep)); - ssrc->tls_fwd_poller.state = PS_OPEN; - streambuf_writeable(ssrc->tls_fwd_stream); + ssrc->tls_fwd.poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd.stream); } else - ssrc_tls_check_blocked(ssrc->ssl, ret); + ssrc_tls_check_blocked(ssrc->tls_fwd.ssl, ret); } } - else if (ssrc->tls_fwd_poller.state == PS_WRITE_BLOCKED) { - ssrc->tls_fwd_poller.state = PS_OPEN; - streambuf_writeable(ssrc->tls_fwd_stream); + else if (ssrc->tls_fwd.poller.state == PS_WRITE_BLOCKED) { + ssrc->tls_fwd.poller.state = PS_OPEN; + streambuf_writeable(ssrc->tls_fwd.stream); } - else if (ssrc->tls_fwd_poller.state == PS_ERROR) + else if (ssrc->tls_fwd.poller.state == PS_ERROR) ssrc_tls_shutdown(ssrc); ssrc_tls_log_errors(); } void ssrc_tls_fwd_silence_frames_upto(ssrc_t *ssrc, AVFrame *frame, int64_t upto) { - unsigned int silence_samples = ssrc->tls_fwd_format.clockrate / 100; + unsigned int silence_samples = ssrc->tls_fwd.format.clockrate / 100; - while (ssrc->tls_in_pts < upto) { - if (G_UNLIKELY(upto - ssrc->tls_in_pts > ssrc->tls_fwd_format.clockrate * 30)) { + while (ssrc->tls_fwd.in_pts < upto) { + if (G_UNLIKELY(upto - ssrc->tls_fwd.in_pts > ssrc->tls_fwd.format.clockrate * 30)) { ilog(LOG_WARN, "More than 30 seconds of silence needed to fill mix buffer, resetting"); - ssrc->tls_in_pts = upto; + ssrc->tls_fwd.in_pts = upto; break; } - if (G_UNLIKELY(!ssrc->tls_silence_frame)) { - ssrc->tls_silence_frame = av_frame_alloc(); - ssrc->tls_silence_frame->format = ssrc->tls_fwd_format.format; - DEF_CH_LAYOUT(&ssrc->tls_silence_frame->CH_LAYOUT, ssrc->tls_fwd_format.channels); - ssrc->tls_silence_frame->nb_samples = silence_samples; - ssrc->tls_silence_frame->sample_rate = ssrc->tls_fwd_format.clockrate; - if (av_frame_get_buffer(ssrc->tls_silence_frame, 0) < 0) { + if (G_UNLIKELY(!ssrc->tls_fwd.silence_frame)) { + ssrc->tls_fwd.silence_frame = av_frame_alloc(); + ssrc->tls_fwd.silence_frame->format = ssrc->tls_fwd.format.format; + DEF_CH_LAYOUT(&ssrc->tls_fwd.silence_frame->CH_LAYOUT, ssrc->tls_fwd.format.channels); + ssrc->tls_fwd.silence_frame->nb_samples = silence_samples; + ssrc->tls_fwd.silence_frame->sample_rate = ssrc->tls_fwd.format.clockrate; + if (av_frame_get_buffer(ssrc->tls_fwd.silence_frame, 0) < 0) { ilog(LOG_ERR, "Failed to get silence frame buffers"); return; } - int planes = av_sample_fmt_is_planar(ssrc->tls_silence_frame->format) ? ssrc->tls_fwd_format.channels : 1; + int planes = av_sample_fmt_is_planar(ssrc->tls_fwd.silence_frame->format) ? ssrc->tls_fwd.format.channels : 1; for (int i = 0; i < planes; i++) - memset(ssrc->tls_silence_frame->extended_data[i], 0, ssrc->tls_silence_frame->linesize[0]); + memset(ssrc->tls_fwd.silence_frame->extended_data[i], 0, ssrc->tls_fwd.silence_frame->linesize[0]); } dbg("pushing silence frame into TLS-formward stream (%lli < %llu)", - (long long unsigned) ssrc->tls_in_pts, + (long long unsigned) ssrc->tls_fwd.in_pts, (long long unsigned) upto); - ssrc->tls_silence_frame->pts = ssrc->tls_in_pts; - ssrc->tls_silence_frame->nb_samples = MIN(silence_samples, upto - ssrc->tls_in_pts); - ssrc->tls_in_pts += ssrc->tls_silence_frame->nb_samples; + ssrc->tls_fwd.silence_frame->pts = ssrc->tls_fwd.in_pts; + ssrc->tls_fwd.silence_frame->nb_samples = MIN(silence_samples, upto - ssrc->tls_fwd.in_pts); + ssrc->tls_fwd.in_pts += ssrc->tls_fwd.silence_frame->nb_samples; CH_LAYOUT_T channel_layout; - DEF_CH_LAYOUT(&channel_layout, ssrc->tls_fwd_format.channels); - ssrc->tls_silence_frame->CH_LAYOUT = channel_layout; + DEF_CH_LAYOUT(&channel_layout, ssrc->tls_fwd.format.channels); + ssrc->tls_fwd.silence_frame->CH_LAYOUT = channel_layout; - int linesize = av_get_bytes_per_sample(frame->format) * ssrc->tls_silence_frame->nb_samples; + int linesize = av_get_bytes_per_sample(frame->format) * ssrc->tls_fwd.silence_frame->nb_samples; dbg("Writing %u bytes PCM to TLS", linesize); - streambuf_write(ssrc->tls_fwd_stream, (char *) ssrc->tls_silence_frame->extended_data[0], linesize); + streambuf_write(ssrc->tls_fwd.stream, (char *) ssrc->tls_fwd.silence_frame->extended_data[0], linesize); } } @@ -185,25 +185,25 @@ void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) { ssrc_tls_shutdown(ssrc); return; } - if (ssrc->tls_fwd_stream) + if (ssrc->tls_fwd.stream) return; // initialise the connection - ZERO(ssrc->tls_fwd_poller); + ZERO(ssrc->tls_fwd.poller); if (!tls_disable) { dbg("Starting TLS connection to %s", endpoint_print_buf(&tls_send_to_ep)); #if OPENSSL_VERSION_NUMBER >= 0x10100000L - ssrc->ssl_ctx = SSL_CTX_new(TLS_client_method()); + ssrc->tls_fwd.ssl_ctx = SSL_CTX_new(TLS_client_method()); #else - ssrc->ssl_ctx = SSL_CTX_new(SSLv23_client_method()); + ssrc->tls_fwd.ssl_ctx = SSL_CTX_new(SSLv23_client_method()); #endif - if (!ssrc->ssl_ctx) { + if (!ssrc->tls_fwd.ssl_ctx) { ilog(LOG_ERR, "Failed to create TLS context"); ssrc_tls_shutdown(ssrc); return; } - ssrc->ssl = SSL_new(ssrc->ssl_ctx); - if (!ssrc->ssl) { + ssrc->tls_fwd.ssl = SSL_new(ssrc->tls_fwd.ssl_ctx); + if (!ssrc->tls_fwd.ssl) { ilog(LOG_ERR, "Failed to create TLS connection"); ssrc_tls_shutdown(ssrc); return; @@ -211,7 +211,7 @@ void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) { } else { dbg("Starting TCP connection to %s", endpoint_print_buf(&tls_send_to_ep)); } - int status = connect_socket_nb(&ssrc->tls_fwd_sock, SOCK_STREAM, &tls_send_to_ep); + int status = connect_socket_nb(&ssrc->tls_fwd.sock, SOCK_STREAM, &tls_send_to_ep); if (status < 0) { ilog(LOG_ERR, "Failed to open/connect TLS/TCP socket to %s: %s", endpoint_print_buf(&tls_send_to_ep), @@ -220,20 +220,20 @@ void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) { return; } - ssrc->tls_fwd_poller.state = PS_CONNECTING; + ssrc->tls_fwd.poller.state = PS_CONNECTING; if (!tls_disable) { - if (SSL_set_fd(ssrc->ssl, ssrc->tls_fwd_sock.fd) != 1) { + if (SSL_set_fd(ssrc->tls_fwd.ssl, ssrc->tls_fwd.sock.fd) != 1) { ilog(LOG_ERR, "Failed to set TLS fd"); ssrc_tls_shutdown(ssrc); return; } - ssrc->tls_fwd_stream = streambuf_new_ptr(&ssrc->tls_fwd_poller, ssrc->ssl, &ssrc_tls_funcs); + ssrc->tls_fwd.stream = streambuf_new_ptr(&ssrc->tls_fwd.poller, ssrc->tls_fwd.ssl, &ssrc_tls_funcs); } else { - ssrc->tls_fwd_stream = streambuf_new(&ssrc->tls_fwd_poller, ssrc->tls_fwd_sock.fd); + ssrc->tls_fwd.stream = streambuf_new(&ssrc->tls_fwd.poller, ssrc->tls_fwd.sock.fd); } ssrc_tls_state(ssrc); - ssrc->tls_fwd_format = (format_t) { + ssrc->tls_fwd.format = (format_t) { .clockrate = tls_resample, .channels = 1, .format = AV_SAMPLE_FMT_S16, diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 8c0b9117c..7b52b7465 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -34,6 +34,7 @@ typedef struct packet_s packet_t; typedef struct stream_s stream_t; typedef struct ssrc_s ssrc_t; typedef struct sink_s sink_t; +typedef struct tls_fwd_s tls_fwd_t; typedef void handler_func(handler_t *); @@ -90,6 +91,20 @@ struct packet_s { }; +struct tls_fwd_s { + format_t format; + resample_t resampler; + socket_t sock; + uint64_t in_pts; + AVFrame *silence_frame; + SSL_CTX *ssl_ctx; + SSL *ssl; + struct streambuf *stream; + struct poller poller; + unsigned int sent_intro:1; +}; + + struct ssrc_s { pthread_mutex_t lock; stream_t *stream; @@ -98,18 +113,7 @@ struct ssrc_s { packet_sequencer_t sequencer; decode_t *decoders[128]; output_t *output; - - // TLS output - format_t tls_fwd_format; - resample_t tls_fwd_resampler; - socket_t tls_fwd_sock; - uint64_t tls_in_pts; - AVFrame *tls_silence_frame; - SSL_CTX *ssl_ctx; - SSL *ssl; - struct streambuf *tls_fwd_stream; - struct poller tls_fwd_poller; - unsigned int sent_intro:1; + tls_fwd_t tls_fwd; };