Browse Source

MT#62571 encapsulate tls_fwd

Change-Id: I06b925e2eb2bbdb7a4741a93a64701f495409616
pull/1967/head
Richard Fuchs 6 months ago
parent
commit
ad063360bd
4 changed files with 92 additions and 88 deletions
  1. +11
    -11
      recording-daemon/decoder.c
  2. +1
    -1
      recording-daemon/packet.c
  3. +64
    -64
      recording-daemon/tls_send.c
  4. +16
    -12
      recording-daemon/types.h

+ 11
- 11
recording-daemon/decoder.c View File

@ -117,7 +117,7 @@ static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp)
}
no_recording:
if (ssrc->tls_fwd_stream) {
if (ssrc->tls_fwd.stream) {
// XXX might be a second resampling to same format
dbg("SSRC %lx of stream #%lu has TLS forwarding stream", ssrc->ssrc, stream->id);
@ -125,12 +125,12 @@ no_recording:
// if we're in the middle of a disconnect then ssrc_tls_state may have destroyed the streambuf
// so we need to skip the below to ensure we only send metadata for the new connection
// once we've got a new streambuf
if (!ssrc->tls_fwd_stream)
if (!ssrc->tls_fwd.stream)
goto err;
AVFrame *dec_frame = resample_frame(&ssrc->tls_fwd_resampler, frame, &ssrc->tls_fwd_format);
AVFrame *dec_frame = resample_frame(&ssrc->tls_fwd.resampler, frame, &ssrc->tls_fwd.format);
if (!ssrc->sent_intro) {
if (!ssrc->tls_fwd.sent_intro) {
tag_t *tag = NULL;
if (ssrc->stream)
@ -138,27 +138,27 @@ no_recording:
if (tag && tag->metadata) {
dbg("Writing tag metadata header to TLS");
streambuf_write(ssrc->tls_fwd_stream, tag->metadata, strlen(tag->metadata) + 1);
streambuf_write(ssrc->tls_fwd.stream, tag->metadata, strlen(tag->metadata) + 1);
}
else if (metafile->metadata) {
dbg("Writing call metadata header to TLS");
streambuf_write(ssrc->tls_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1);
streambuf_write(ssrc->tls_fwd.stream, metafile->metadata, strlen(metafile->metadata) + 1);
}
else {
ilog(LOG_WARN, "No metadata present for forwarding connection");
streambuf_write(ssrc->tls_fwd_stream, "\0", 1);
streambuf_write(ssrc->tls_fwd.stream, "\0", 1);
}
ssrc->sent_intro = 1;
ssrc->tls_fwd.sent_intro = 1;
}
ssrc_tls_fwd_silence_frames_upto(ssrc, dec_frame, dec_frame->pts);
uint64_t next_pts = dec_frame->pts + dec_frame->nb_samples;
if (next_pts > ssrc->tls_in_pts)
ssrc->tls_in_pts = next_pts;
if (next_pts > ssrc->tls_fwd.in_pts)
ssrc->tls_fwd.in_pts = next_pts;
int linesize = av_get_bytes_per_sample(dec_frame->format) * dec_frame->nb_samples;
dbg("Writing %u bytes PCM to TLS", linesize);
streambuf_write(ssrc->tls_fwd_stream, (char *) dec_frame->extended_data[0], linesize);
streambuf_write(ssrc->tls_fwd.stream, (char *) dec_frame->extended_data[0], linesize);
if (dec_frame != frame)
av_frame_free(&dec_frame);


+ 1
- 1
recording-daemon/packet.c View File

@ -43,7 +43,7 @@ void ssrc_close(ssrc_t *s) {
void ssrc_free(void *p) {
ssrc_t *s = p;
av_frame_free(&s->tls_silence_frame);
av_frame_free(&s->tls_fwd.silence_frame);
packet_sequencer_destroy(&s->sequencer);
ssrc_close(s);
g_free(s);


+ 64
- 64
recording-daemon/tls_send.c View File

@ -68,21 +68,21 @@ static ssize_t ssrc_tls_read(void *fd, void *b, size_t s) {
void ssrc_tls_shutdown(ssrc_t *ssrc) {
if (!ssrc->tls_fwd_stream)
if (!ssrc->tls_fwd.stream)
return;
streambuf_destroy(ssrc->tls_fwd_stream);
ssrc->tls_fwd_stream = NULL;
resample_shutdown(&ssrc->tls_fwd_resampler);
if (ssrc->ssl) {
SSL_free(ssrc->ssl);
ssrc->ssl = NULL;
streambuf_destroy(ssrc->tls_fwd.stream);
ssrc->tls_fwd.stream = NULL;
resample_shutdown(&ssrc->tls_fwd.resampler);
if (ssrc->tls_fwd.ssl) {
SSL_free(ssrc->tls_fwd.ssl);
ssrc->tls_fwd.ssl = NULL;
}
if (ssrc->ssl_ctx) {
SSL_CTX_free(ssrc->ssl_ctx);
ssrc->ssl_ctx = NULL;
if (ssrc->tls_fwd.ssl_ctx) {
SSL_CTX_free(ssrc->tls_fwd.ssl_ctx);
ssrc->tls_fwd.ssl_ctx = NULL;
}
close_socket(&ssrc->tls_fwd_sock);
ssrc->sent_intro = 0;
close_socket(&ssrc->tls_fwd.sock);
ssrc->tls_fwd.sent_intro = 0;
}
@ -90,24 +90,24 @@ void ssrc_tls_state(ssrc_t *ssrc) {
int ret;
ssrc_tls_log_errors();
if (ssrc->tls_fwd_poller.state == PS_CONNECTING) {
int status = connect_socket_retry(&ssrc->tls_fwd_sock);
if (ssrc->tls_fwd.poller.state == PS_CONNECTING) {
int status = connect_socket_retry(&ssrc->tls_fwd.sock);
if (status == 0) {
if (tls_disable) {
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
ssrc->tls_fwd.poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd.stream);
} else {
dbg("TLS connection to %s doing handshake",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_HANDSHAKE;
if ((ret = SSL_connect(ssrc->ssl)) == 1) {
ssrc->tls_fwd.poller.state = PS_HANDSHAKE;
if ((ret = SSL_connect(ssrc->tls_fwd.ssl)) == 1) {
dbg("TLS connection to %s established",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
ssrc->tls_fwd.poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd.stream);
}
else
ssrc_tls_check_blocked(ssrc->ssl, ret);
ssrc_tls_check_blocked(ssrc->tls_fwd.ssl, ret);
}
}
else if (status < 0) {
@ -115,67 +115,67 @@ void ssrc_tls_state(ssrc_t *ssrc) {
ssrc_tls_shutdown(ssrc);
}
}
else if (ssrc->tls_fwd_poller.state == PS_HANDSHAKE) {
else if (ssrc->tls_fwd.poller.state == PS_HANDSHAKE) {
if (!tls_disable) {
if ((ret = SSL_connect(ssrc->ssl)) == 1) {
if ((ret = SSL_connect(ssrc->tls_fwd.ssl)) == 1) {
dbg("TLS connection to %s established",
endpoint_print_buf(&tls_send_to_ep));
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
ssrc->tls_fwd.poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd.stream);
}
else
ssrc_tls_check_blocked(ssrc->ssl, ret);
ssrc_tls_check_blocked(ssrc->tls_fwd.ssl, ret);
}
}
else if (ssrc->tls_fwd_poller.state == PS_WRITE_BLOCKED) {
ssrc->tls_fwd_poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd_stream);
else if (ssrc->tls_fwd.poller.state == PS_WRITE_BLOCKED) {
ssrc->tls_fwd.poller.state = PS_OPEN;
streambuf_writeable(ssrc->tls_fwd.stream);
}
else if (ssrc->tls_fwd_poller.state == PS_ERROR)
else if (ssrc->tls_fwd.poller.state == PS_ERROR)
ssrc_tls_shutdown(ssrc);
ssrc_tls_log_errors();
}
void ssrc_tls_fwd_silence_frames_upto(ssrc_t *ssrc, AVFrame *frame, int64_t upto) {
unsigned int silence_samples = ssrc->tls_fwd_format.clockrate / 100;
unsigned int silence_samples = ssrc->tls_fwd.format.clockrate / 100;
while (ssrc->tls_in_pts < upto) {
if (G_UNLIKELY(upto - ssrc->tls_in_pts > ssrc->tls_fwd_format.clockrate * 30)) {
while (ssrc->tls_fwd.in_pts < upto) {
if (G_UNLIKELY(upto - ssrc->tls_fwd.in_pts > ssrc->tls_fwd.format.clockrate * 30)) {
ilog(LOG_WARN, "More than 30 seconds of silence needed to fill mix buffer, resetting");
ssrc->tls_in_pts = upto;
ssrc->tls_fwd.in_pts = upto;
break;
}
if (G_UNLIKELY(!ssrc->tls_silence_frame)) {
ssrc->tls_silence_frame = av_frame_alloc();
ssrc->tls_silence_frame->format = ssrc->tls_fwd_format.format;
DEF_CH_LAYOUT(&ssrc->tls_silence_frame->CH_LAYOUT, ssrc->tls_fwd_format.channels);
ssrc->tls_silence_frame->nb_samples = silence_samples;
ssrc->tls_silence_frame->sample_rate = ssrc->tls_fwd_format.clockrate;
if (av_frame_get_buffer(ssrc->tls_silence_frame, 0) < 0) {
if (G_UNLIKELY(!ssrc->tls_fwd.silence_frame)) {
ssrc->tls_fwd.silence_frame = av_frame_alloc();
ssrc->tls_fwd.silence_frame->format = ssrc->tls_fwd.format.format;
DEF_CH_LAYOUT(&ssrc->tls_fwd.silence_frame->CH_LAYOUT, ssrc->tls_fwd.format.channels);
ssrc->tls_fwd.silence_frame->nb_samples = silence_samples;
ssrc->tls_fwd.silence_frame->sample_rate = ssrc->tls_fwd.format.clockrate;
if (av_frame_get_buffer(ssrc->tls_fwd.silence_frame, 0) < 0) {
ilog(LOG_ERR, "Failed to get silence frame buffers");
return;
}
int planes = av_sample_fmt_is_planar(ssrc->tls_silence_frame->format) ? ssrc->tls_fwd_format.channels : 1;
int planes = av_sample_fmt_is_planar(ssrc->tls_fwd.silence_frame->format) ? ssrc->tls_fwd.format.channels : 1;
for (int i = 0; i < planes; i++)
memset(ssrc->tls_silence_frame->extended_data[i], 0, ssrc->tls_silence_frame->linesize[0]);
memset(ssrc->tls_fwd.silence_frame->extended_data[i], 0, ssrc->tls_fwd.silence_frame->linesize[0]);
}
dbg("pushing silence frame into TLS-formward stream (%lli < %llu)",
(long long unsigned) ssrc->tls_in_pts,
(long long unsigned) ssrc->tls_fwd.in_pts,
(long long unsigned) upto);
ssrc->tls_silence_frame->pts = ssrc->tls_in_pts;
ssrc->tls_silence_frame->nb_samples = MIN(silence_samples, upto - ssrc->tls_in_pts);
ssrc->tls_in_pts += ssrc->tls_silence_frame->nb_samples;
ssrc->tls_fwd.silence_frame->pts = ssrc->tls_fwd.in_pts;
ssrc->tls_fwd.silence_frame->nb_samples = MIN(silence_samples, upto - ssrc->tls_fwd.in_pts);
ssrc->tls_fwd.in_pts += ssrc->tls_fwd.silence_frame->nb_samples;
CH_LAYOUT_T channel_layout;
DEF_CH_LAYOUT(&channel_layout, ssrc->tls_fwd_format.channels);
ssrc->tls_silence_frame->CH_LAYOUT = channel_layout;
DEF_CH_LAYOUT(&channel_layout, ssrc->tls_fwd.format.channels);
ssrc->tls_fwd.silence_frame->CH_LAYOUT = channel_layout;
int linesize = av_get_bytes_per_sample(frame->format) * ssrc->tls_silence_frame->nb_samples;
int linesize = av_get_bytes_per_sample(frame->format) * ssrc->tls_fwd.silence_frame->nb_samples;
dbg("Writing %u bytes PCM to TLS", linesize);
streambuf_write(ssrc->tls_fwd_stream, (char *) ssrc->tls_silence_frame->extended_data[0], linesize);
streambuf_write(ssrc->tls_fwd.stream, (char *) ssrc->tls_fwd.silence_frame->extended_data[0], linesize);
}
}
@ -185,25 +185,25 @@ void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) {
ssrc_tls_shutdown(ssrc);
return;
}
if (ssrc->tls_fwd_stream)
if (ssrc->tls_fwd.stream)
return;
// initialise the connection
ZERO(ssrc->tls_fwd_poller);
ZERO(ssrc->tls_fwd.poller);
if (!tls_disable) {
dbg("Starting TLS connection to %s", endpoint_print_buf(&tls_send_to_ep));
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
ssrc->ssl_ctx = SSL_CTX_new(TLS_client_method());
ssrc->tls_fwd.ssl_ctx = SSL_CTX_new(TLS_client_method());
#else
ssrc->ssl_ctx = SSL_CTX_new(SSLv23_client_method());
ssrc->tls_fwd.ssl_ctx = SSL_CTX_new(SSLv23_client_method());
#endif
if (!ssrc->ssl_ctx) {
if (!ssrc->tls_fwd.ssl_ctx) {
ilog(LOG_ERR, "Failed to create TLS context");
ssrc_tls_shutdown(ssrc);
return;
}
ssrc->ssl = SSL_new(ssrc->ssl_ctx);
if (!ssrc->ssl) {
ssrc->tls_fwd.ssl = SSL_new(ssrc->tls_fwd.ssl_ctx);
if (!ssrc->tls_fwd.ssl) {
ilog(LOG_ERR, "Failed to create TLS connection");
ssrc_tls_shutdown(ssrc);
return;
@ -211,7 +211,7 @@ void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) {
} else {
dbg("Starting TCP connection to %s", endpoint_print_buf(&tls_send_to_ep));
}
int status = connect_socket_nb(&ssrc->tls_fwd_sock, SOCK_STREAM, &tls_send_to_ep);
int status = connect_socket_nb(&ssrc->tls_fwd.sock, SOCK_STREAM, &tls_send_to_ep);
if (status < 0) {
ilog(LOG_ERR, "Failed to open/connect TLS/TCP socket to %s: %s",
endpoint_print_buf(&tls_send_to_ep),
@ -220,20 +220,20 @@ void tls_fwd_init(stream_t *stream, metafile_t *mf, ssrc_t *ssrc) {
return;
}
ssrc->tls_fwd_poller.state = PS_CONNECTING;
ssrc->tls_fwd.poller.state = PS_CONNECTING;
if (!tls_disable) {
if (SSL_set_fd(ssrc->ssl, ssrc->tls_fwd_sock.fd) != 1) {
if (SSL_set_fd(ssrc->tls_fwd.ssl, ssrc->tls_fwd.sock.fd) != 1) {
ilog(LOG_ERR, "Failed to set TLS fd");
ssrc_tls_shutdown(ssrc);
return;
}
ssrc->tls_fwd_stream = streambuf_new_ptr(&ssrc->tls_fwd_poller, ssrc->ssl, &ssrc_tls_funcs);
ssrc->tls_fwd.stream = streambuf_new_ptr(&ssrc->tls_fwd.poller, ssrc->tls_fwd.ssl, &ssrc_tls_funcs);
} else {
ssrc->tls_fwd_stream = streambuf_new(&ssrc->tls_fwd_poller, ssrc->tls_fwd_sock.fd);
ssrc->tls_fwd.stream = streambuf_new(&ssrc->tls_fwd.poller, ssrc->tls_fwd.sock.fd);
}
ssrc_tls_state(ssrc);
ssrc->tls_fwd_format = (format_t) {
ssrc->tls_fwd.format = (format_t) {
.clockrate = tls_resample,
.channels = 1,
.format = AV_SAMPLE_FMT_S16,


+ 16
- 12
recording-daemon/types.h View File

@ -34,6 +34,7 @@ typedef struct packet_s packet_t;
typedef struct stream_s stream_t;
typedef struct ssrc_s ssrc_t;
typedef struct sink_s sink_t;
typedef struct tls_fwd_s tls_fwd_t;
typedef void handler_func(handler_t *);
@ -90,6 +91,20 @@ struct packet_s {
};
struct tls_fwd_s {
format_t format;
resample_t resampler;
socket_t sock;
uint64_t in_pts;
AVFrame *silence_frame;
SSL_CTX *ssl_ctx;
SSL *ssl;
struct streambuf *stream;
struct poller poller;
unsigned int sent_intro:1;
};
struct ssrc_s {
pthread_mutex_t lock;
stream_t *stream;
@ -98,18 +113,7 @@ struct ssrc_s {
packet_sequencer_t sequencer;
decode_t *decoders[128];
output_t *output;
// TLS output
format_t tls_fwd_format;
resample_t tls_fwd_resampler;
socket_t tls_fwd_sock;
uint64_t tls_in_pts;
AVFrame *tls_silence_frame;
SSL_CTX *ssl_ctx;
SSL *ssl;
struct streambuf *tls_fwd_stream;
struct poller tls_fwd_poller;
unsigned int sent_intro:1;
tls_fwd_t tls_fwd;
};


Loading…
Cancel
Save