Browse Source

MT#62571 split out TLS send code

Change-Id: Icb645aae9cab775b86fa50c7d832dd621af2a3a1
pull/1967/head
Richard Fuchs 6 months ago
parent
commit
b39a43a576
6 changed files with 257 additions and 228 deletions
  1. +1
    -1
      recording-daemon/Makefile
  2. +1
    -0
      recording-daemon/decoder.c
  3. +2
    -224
      recording-daemon/packet.c
  4. +0
    -3
      recording-daemon/packet.h
  5. +241
    -0
      recording-daemon/tls_send.c
  6. +12
    -0
      recording-daemon/tls_send.h

+ 1
- 1
recording-daemon/Makefile View File

@ -46,7 +46,7 @@ CFLAGS+= $(CFLAGS_BCG729)
LDLIBS+= $(LDLIBS_BCG729)
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c notify.c
decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c notify.c tls_send.c
LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \
dtmflib.c bufferpool.c bencode.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S


+ 1
- 0
recording-daemon/decoder.c View File

@ -19,6 +19,7 @@
#include "main.h"
#include "packet.h"
#include "tag.h"
#include "tls_send.h"
int resample_audio;


+ 2
- 224
recording-daemon/packet.c View File

@ -18,63 +18,9 @@
#include "resample.h"
#include "tag.h"
#include "fix_frame_channel_layout.h"
#include "tls_send.h"
static ssize_t ssrc_tls_write(void *, const void *, size_t);
static ssize_t ssrc_tls_read(void *, void *, size_t);
static struct streambuf_funcs ssrc_tls_funcs = {
.write = ssrc_tls_write,
.read = ssrc_tls_read,
};
static void ssrc_tls_log_errors(void) {
int i;
char err[160];
while ((i = ERR_get_error())) {
ERR_error_string(i, err);
dbg("TLS error: %s", err);
}
}
static int ssrc_tls_check_blocked(SSL *ssl, int ret) {
if (!ssl)
return 0;
int err = SSL_get_error(ssl, ret);
dbg("TLS error code: %i -> %i", ret, err);
switch (err) {
case SSL_ERROR_ZERO_RETURN:
return 0; // eof
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_ACCEPT:
errno = EAGAIN;
return -1;
case SSL_ERROR_SYSCALL:
return -1;
}
errno = EFAULT;
return -1;
}
static ssize_t ssrc_tls_write(void *fd, const void *b, size_t s) {
SSL *ssl = fd;
ssrc_tls_log_errors();
int ret = SSL_write(ssl, b, s);
if (ret > 0)
return ret;
return ssrc_tls_check_blocked(ssl, ret);
}
static ssize_t ssrc_tls_read(void *fd, void *b, size_t s) {
SSL *ssl = fd;
ssrc_tls_log_errors();
int ret = SSL_read(ssl, b, s);
if (ret > 0)
return ret;
return ssrc_tls_check_blocked(ssl, ret);
}
static void packet_free(void *p) {
packet_t *packet = p;
if (!packet)
@ -83,120 +29,6 @@ static void packet_free(void *p) {
g_free(packet);
}
static void ssrc_tls_shutdown(ssrc_t *ssrc) {
if (!ssrc->tls_fwd_stream)
return;
streambuf_destroy(ssrc->tls_fwd_stream);
ssrc->tls_fwd_stream = NULL;
resample_shutdown(&ssrc->tls_fwd_resampler);
if (ssrc->ssl) {
SSL_free(ssrc->ssl);
ssrc->ssl = NULL;
}
if (ssrc->ssl_ctx) {
SSL_CTX_free(ssrc->ssl_ctx);
ssrc->ssl_ctx = NULL;
}
close_socket(&ssrc->tls_fwd_sock);
ssrc->sent_intro = 0;
}
void ssrc_tls_state(ssrc_t *ssrc) {
int ret;
ssrc_tls_log_errors();
if (ssrc->tls_fwd_poller.state == PS_CONNECTING) {
int status = connect_socket_retry(&ssrc->tls_fwd_sock);
if (status == 0) {
if (tls_disable) {
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
} else {
dbg("TLS connection to %s doing handshake",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_HANDSHAKE;
if ((ret = SSL_connect(ssrc->ssl)) == 1) {
dbg("TLS connection to %s established",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
}
else
ssrc_tls_check_blocked(ssrc->ssl, ret);
}
}
else if (status < 0) {
ilog(LOG_ERR, "Failed to connect TLS/TCP socket: %s", strerror(errno));
ssrc_tls_shutdown(ssrc);
}
}
else if (ssrc->tls_fwd_poller.state == PS_HANDSHAKE) {
if (!tls_disable) {
if ((ret = SSL_connect(ssrc->ssl)) == 1) {
dbg("TLS connection to %s established",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
}
else
ssrc_tls_check_blocked(ssrc->ssl, ret);
}
}
else if (ssrc->tls_fwd_poller.state == PS_WRITE_BLOCKED) {
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
}
else if (ssrc->tls_fwd_poller.state == PS_ERROR)
ssrc_tls_shutdown(ssrc);
ssrc_tls_log_errors();
}
void ssrc_tls_fwd_silence_frames_upto(ssrc_t *ssrc, AVFrame *frame, int64_t upto) {
unsigned int silence_samples = ssrc->tls_fwd_format.clockrate / 100;
while (ssrc->tls_in_pts < upto) {
if (G_UNLIKELY(upto - ssrc->tls_in_pts > ssrc->tls_fwd_format.clockrate * 30)) {
ilog(LOG_WARN, "More than 30 seconds of silence needed to fill mix buffer, resetting");
ssrc->tls_in_pts = upto;
break;
}
if (G_UNLIKELY(!ssrc->tls_silence_frame)) {
ssrc->tls_silence_frame = av_frame_alloc();
ssrc->tls_silence_frame->format = ssrc->tls_fwd_format.format;
DEF_CH_LAYOUT(&ssrc->tls_silence_frame->CH_LAYOUT, ssrc->tls_fwd_format.channels);
ssrc->tls_silence_frame->nb_samples = silence_samples;
ssrc->tls_silence_frame->sample_rate = ssrc->tls_fwd_format.clockrate;
if (av_frame_get_buffer(ssrc->tls_silence_frame, 0) < 0) {
ilog(LOG_ERR, "Failed to get silence frame buffers");
return;
}
int planes = av_sample_fmt_is_planar(ssrc->tls_silence_frame->format) ? ssrc->tls_fwd_format.channels : 1;
for (int i = 0; i < planes; i++)
memset(ssrc->tls_silence_frame->extended_data[i], 0, ssrc->tls_silence_frame->linesize[0]);
}
dbg("pushing silence frame into TLS-formward stream (%lli < %llu)",
(long long unsigned) ssrc->tls_in_pts,
(long long unsigned) upto);
ssrc->tls_silence_frame->pts = ssrc->tls_in_pts;
ssrc->tls_silence_frame->nb_samples = MIN(silence_samples, upto - ssrc->tls_in_pts);
ssrc->tls_in_pts += ssrc->tls_silence_frame->nb_samples;
CH_LAYOUT_T channel_layout;
DEF_CH_LAYOUT(&channel_layout, ssrc->tls_fwd_format.channels);
ssrc->tls_silence_frame->CH_LAYOUT = channel_layout;
int linesize = av_get_bytes_per_sample(frame->format) * ssrc->tls_silence_frame->nb_samples;
dbg("Writing %u bytes PCM to TLS", linesize);
streambuf_write(ssrc->tls_fwd_stream, (char *) ssrc->tls_silence_frame->extended_data[0], linesize);
}
}
// appropriate lock must be held (ssrc or metafile)
void ssrc_close(ssrc_t *s) {
output_close(s->metafile, s->output, tag_get(s->metafile, s->stream->tag), s->metafile->discard);
@ -250,62 +82,8 @@ out:
ret->output = output_new_ext(mf, buf, "single", tag->label);
db_do_stream(mf, ret->output, stream, ssrc);
}
if ((stream->forwarding_on || mf->forwarding_on) && !ret->tls_fwd_stream && tls_send_to_ep.port) {
// initialise the connection
ZERO(ret->tls_fwd_poller);
if (!tls_disable) {
dbg("Starting TLS connection to %s", endpoint_print_buf(&tls_send_to_ep));
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
ret->ssl_ctx = SSL_CTX_new(TLS_client_method());
#else
ret->ssl_ctx = SSL_CTX_new(SSLv23_client_method());
#endif
if (!ret->ssl_ctx) {
ilog(LOG_ERR, "Failed to create TLS context");
ssrc_tls_shutdown(ret);
goto tls_out;
}
ret->ssl = SSL_new(ret->ssl_ctx);
if (!ret->ssl) {
ilog(LOG_ERR, "Failed to create TLS connection");
ssrc_tls_shutdown(ret);
goto tls_out;
}
} else {
dbg("Starting TCP connection to %s", endpoint_print_buf(&tls_send_to_ep));
}
int status = connect_socket_nb(&ret->tls_fwd_sock, SOCK_STREAM, &tls_send_to_ep);
if (status < 0) {
ilog(LOG_ERR, "Failed to open/connect TLS/TCP socket to %s: %s",
endpoint_print_buf(&tls_send_to_ep),
strerror(errno));
ssrc_tls_shutdown(ret);
goto tls_out;
}
ret->tls_fwd_poller.state = PS_CONNECTING;
if (!tls_disable) {
if (SSL_set_fd(ret->ssl, ret->tls_fwd_sock.fd) != 1) {
ilog(LOG_ERR, "Failed to set TLS fd");
ssrc_tls_shutdown(ret);
goto tls_out;
}
ret->tls_fwd_stream = streambuf_new_ptr(&ret->tls_fwd_poller, ret->ssl, &ssrc_tls_funcs);
} else {
ret->tls_fwd_stream = streambuf_new(&ret->tls_fwd_poller, ret->tls_fwd_sock.fd);
}
ssrc_tls_state(ret);
ret->tls_fwd_format = (format_t) {
.clockrate = tls_resample,
.channels = 1,
.format = AV_SAMPLE_FMT_S16,
};
tls_out:
;
}
else if (!(stream->forwarding_on || mf->forwarding_on) && ret->tls_fwd_stream)
ssrc_tls_shutdown(ret);
tls_fwd_init(stream, mf, ret);
return ret;
}


+ 0
- 3
recording-daemon/packet.h View File

@ -9,7 +9,4 @@ void ssrc_free(void *p);
void packet_process(stream_t *, unsigned char *, unsigned len);
void ssrc_tls_state(ssrc_t *ssrc);
void ssrc_tls_fwd_silence_frames_upto(ssrc_t *ssrc, AVFrame *frame, int64_t upto);
#endif

+ 241
- 0
recording-daemon/tls_send.c View File

@ -0,0 +1,241 @@
#include "tls_send.h"
#include <glib.h>
#include <unistd.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#include "log.h"
#include "types.h"
#include "resample.h"
#include "main.h"
#include "streambuf.h"
#include "fix_frame_channel_layout.h"
static ssize_t ssrc_tls_write(void *, const void *, size_t);
static ssize_t ssrc_tls_read(void *, void *, size_t);
static struct streambuf_funcs ssrc_tls_funcs = {
.write = ssrc_tls_write,
.read = ssrc_tls_read,
};
static void ssrc_tls_log_errors(void) {
int i;
char err[160];
while ((i = ERR_get_error())) {
ERR_error_string(i, err);
dbg("TLS error: %s", err);
}
}
static int ssrc_tls_check_blocked(SSL *ssl, int ret) {
if (!ssl)
return 0;
int err = SSL_get_error(ssl, ret);
dbg("TLS error code: %i -> %i", ret, err);
switch (err) {
case SSL_ERROR_ZERO_RETURN:
return 0; // eof
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_ACCEPT:
errno = EAGAIN;
return -1;
case SSL_ERROR_SYSCALL:
return -1;
}
errno = EFAULT;
return -1;
}
static ssize_t ssrc_tls_write(void *fd, const void *b, size_t s) {
SSL *ssl = fd;
ssrc_tls_log_errors();
int ret = SSL_write(ssl, b, s);
if (ret > 0)
return ret;
return ssrc_tls_check_blocked(ssl, ret);
}
static ssize_t ssrc_tls_read(void *fd, void *b, size_t s) {
SSL *ssl = fd;
ssrc_tls_log_errors();
int ret = SSL_read(ssl, b, s);
if (ret > 0)
return ret;
return ssrc_tls_check_blocked(ssl, ret);
}
void ssrc_tls_shutdown(ssrc_t *ssrc) {
if (!ssrc->tls_fwd_stream)
return;
streambuf_destroy(ssrc->tls_fwd_stream);
ssrc->tls_fwd_stream = NULL;
resample_shutdown(&ssrc->tls_fwd_resampler);
if (ssrc->ssl) {
SSL_free(ssrc->ssl);
ssrc->ssl = NULL;
}
if (ssrc->ssl_ctx) {
SSL_CTX_free(ssrc->ssl_ctx);
ssrc->ssl_ctx = NULL;
}
close_socket(&ssrc->tls_fwd_sock);
ssrc->sent_intro = 0;
}
void ssrc_tls_state(ssrc_t *ssrc) {
int ret;
ssrc_tls_log_errors();
if (ssrc->tls_fwd_poller.state == PS_CONNECTING) {
int status = connect_socket_retry(&ssrc->tls_fwd_sock);
if (status == 0) {
if (tls_disable) {
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
} else {
dbg("TLS connection to %s doing handshake",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_HANDSHAKE;
if ((ret = SSL_connect(ssrc->ssl)) == 1) {
dbg("TLS connection to %s established",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
}
else
ssrc_tls_check_blocked(ssrc->ssl, ret);
}
}
else if (status < 0) {
ilog(LOG_ERR, "Failed to connect TLS/TCP socket: %s", strerror(errno));
ssrc_tls_shutdown(ssrc);
}
}
else if (ssrc->tls_fwd_poller.state == PS_HANDSHAKE) {
if (!tls_disable) {
if ((ret = SSL_connect(ssrc->ssl)) == 1) {
dbg("TLS connection to %s established",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
}
else
ssrc_tls_check_blocked(ssrc->ssl, ret);
}
}
else if (ssrc->tls_fwd_poller.state == PS_WRITE_BLOCKED) {
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
}
else if (ssrc->tls_fwd_poller.state == PS_ERROR)
ssrc_tls_shutdown(ssrc);
ssrc_tls_log_errors();
}
void ssrc_tls_fwd_silence_frames_upto(ssrc_t *ssrc, AVFrame *frame, int64_t upto) {
unsigned int silence_samples = ssrc->tls_fwd_format.clockrate / 100;
while (ssrc->tls_in_pts < upto) {
if (G_UNLIKELY(upto - ssrc->tls_in_pts > ssrc->tls_fwd_format.clockrate * 30)) {
ilog(LOG_WARN, "More than 30 seconds of silence needed to fill mix buffer, resetting");
ssrc->tls_in_pts = upto;
break;
}
if (G_UNLIKELY(!ssrc->tls_silence_frame)) {
ssrc->tls_silence_frame = av_frame_alloc();
ssrc->tls_silence_frame->format = ssrc->tls_fwd_format.format;
DEF_CH_LAYOUT(&ssrc->tls_silence_frame->CH_LAYOUT, ssrc->tls_fwd_format.channels);
ssrc->tls_silence_frame->nb_samples = silence_samples;
ssrc->tls_silence_frame->sample_rate = ssrc->tls_fwd_format.clockrate;
if (av_frame_get_buffer(ssrc->tls_silence_frame, 0) < 0) {
ilog(LOG_ERR, "Failed to get silence frame buffers");
return;
}
int planes = av_sample_fmt_is_planar(ssrc->tls_silence_frame->format) ? ssrc->tls_fwd_format.channels : 1;
for (int i = 0; i < planes; i++)
memset(ssrc->tls_silence_frame->extended_data[i], 0, ssrc->tls_silence_frame->linesize[0]);
}
dbg("pushing silence frame into TLS-formward stream (%lli < %llu)",
(long long unsigned) ssrc->tls_in_pts,
(long long unsigned) upto);
ssrc->tls_silence_frame->pts = ssrc->tls_in_pts;
ssrc->tls_silence_frame->nb_samples = MIN(silence_samples, upto - ssrc->tls_in_pts);
ssrc->tls_in_pts += ssrc->tls_silence_frame->nb_samples;
CH_LAYOUT_T channel_layout;
DEF_CH_LAYOUT(&channel_layout, ssrc->tls_fwd_format.channels);
ssrc->tls_silence_frame->CH_LAYOUT = channel_layout;
int linesize = av_get_bytes_per_sample(frame->format) * ssrc->tls_silence_frame->nb_samples;
dbg("Writing %u bytes PCM to TLS", linesize);
streambuf_write(ssrc->tls_fwd_stream, (char *) ssrc->tls_silence_frame->extended_data[0], linesize);
}
}
void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) {
if ((!stream->forwarding_on && !mf->forwarding_on) || !tls_send_to_ep.port) {
ssrc_tls_shutdown(ssrc);
return;
}
if (ssrc->tls_fwd_stream)
return;
// initialise the connection
ZERO(ssrc->tls_fwd_poller);
if (!tls_disable) {
dbg("Starting TLS connection to %s", endpoint_print_buf(&tls_send_to_ep));
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
ssrc->ssl_ctx = SSL_CTX_new(TLS_client_method());
#else
ssrc->ssl_ctx = SSL_CTX_new(SSLv23_client_method());
#endif
if (!ssrc->ssl_ctx) {
ilog(LOG_ERR, "Failed to create TLS context");
ssrc_tls_shutdown(ssrc);
return;
}
ssrc->ssl = SSL_new(ssrc->ssl_ctx);
if (!ssrc->ssl) {
ilog(LOG_ERR, "Failed to create TLS connection");
ssrc_tls_shutdown(ssrc);
return;
}
} else {
dbg("Starting TCP connection to %s", endpoint_print_buf(&tls_send_to_ep));
}
int status = connect_socket_nb(&ssrc->tls_fwd_sock, SOCK_STREAM, &tls_send_to_ep);
if (status < 0) {
ilog(LOG_ERR, "Failed to open/connect TLS/TCP socket to %s: %s",
endpoint_print_buf(&tls_send_to_ep),
strerror(errno));
ssrc_tls_shutdown(ssrc);
return;
}
ssrc->tls_fwd_poller.state = PS_CONNECTING;
if (!tls_disable) {
if (SSL_set_fd(ssrc->ssl, ssrc->tls_fwd_sock.fd) != 1) {
ilog(LOG_ERR, "Failed to set TLS fd");
ssrc_tls_shutdown(ssrc);
return;
}
ssrc->tls_fwd_stream = streambuf_new_ptr(&ssrc->tls_fwd_poller, ssrc->ssl, &ssrc_tls_funcs);
} else {
ssrc->tls_fwd_stream = streambuf_new(&ssrc->tls_fwd_poller, ssrc->tls_fwd_sock.fd);
}
ssrc_tls_state(ssrc);
ssrc->tls_fwd_format = (format_t) {
.clockrate = tls_resample,
.channels = 1,
.format = AV_SAMPLE_FMT_S16,
};
}

+ 12
- 0
recording-daemon/tls_send.h View File

@ -0,0 +1,12 @@
#ifndef _TLS_SEND_H_
#define _TLS_SEND_H_
#include "types.h"
void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc);
void ssrc_tls_shutdown(ssrc_t *ssrc);
void ssrc_tls_state(ssrc_t *ssrc);
void ssrc_tls_fwd_silence_frames_upto(ssrc_t *ssrc, AVFrame *frame, int64_t upto);
#endif

Loading…
Cancel
Save