Browse Source

MT#61977 add S3 storage option

Change-Id: Ifa84b06a88189440b8e16da7f079a1dbdaf87125
pull/1998/head
Richard Fuchs 4 months ago
parent
commit
0d1eeddd14
9 changed files with 282 additions and 7 deletions
  1. +36
    -2
      docs/rtpengine-recording.md
  2. +11
    -2
      etc/rtpengine-recording.conf
  3. +2
    -2
      recording-daemon/Makefile
  4. +46
    -1
      recording-daemon/main.c
  5. +8
    -0
      recording-daemon/main.h
  6. +2
    -0
      recording-daemon/output.c
  7. +163
    -0
      recording-daemon/s3.c
  8. +8
    -0
      recording-daemon/s3.h
  9. +6
    -0
      recording-daemon/types.h

+ 36
- 2
docs/rtpengine-recording.md View File

@ -138,7 +138,7 @@ sufficient for a standard installation of rtpengine.
Points to the shared object file (__.so__) containing the reference
implementation for the EVS codec. See the `README` for more details.
- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__none__
- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__s3__\|__none__
Where to store media files. This option can be given multiple times (or, in
the config file, using a comma-separated list) to enable multiple storage
@ -167,6 +167,10 @@ sufficient for a standard installation of rtpengine.
__db-mem__ can be used as a shortcut to setting both __db__ and __memory__.
The __s3__ storage option enables upload to an S3-compatible service via
HTTPS, for example to AWS or MiniO. See the related option below for
configuration.
- __\-\-output-dir=__*PATH*
Path for media files to be written to if file output is enabled. Defaults to
@ -407,7 +411,37 @@ sufficient for a standard installation of rtpengine.
- __\-\-flush-packets__
Forces that the output buffer will be flushed after every packet, ensuring that the recording file grows steadily and becomes available for processing without delays.
Forces that the output buffer will be flushed after every packet, ensuring
that the recording file grows steadily and becomes available for processing
without delays.
- __\-\-s3-host=__*HOST*
- __\-\-s3-port=__*INT*
- __\-\-s3-path=__*STR*
- __\-\-s3-access-key=__*STR*
- __\-\-s3-secret-key=__*STR*
- __\-\-s3-region=__*STR*
- __\-\-s3-no-verify__
These options must be configured if S3 storage upload is enabled. The port
is optional and defaults to 443 (HTTPS) if not configured.
The host name may or may not need to include the bucket name. For example
`minio.example.com` or `examplebucket.s3.amazonaws.com`.
The path can be blank if the host name includes the bucket name. Otherwise
the bucket name becomes part of the path (such as `/examplebucket`). This
option must still be set even if the path is blank.
Access key and secret key are the credentials provided by the service (e.g.
`AKIAIOSFODNN7EXAMPLE` and `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`).
The region code is part of the credentials (e.g. `us-east-1`). If the
service doesn't use a region code, then this must be set to a blank string.
TLS certificates are verified by default, unless the `no-verify` option is
set.
## EXIT STATUS
- __0__


+ 11
- 2
etc/rtpengine-recording.conf View File

@ -8,8 +8,8 @@ table = 0
### where to forward to (unix socket)
# forward-to = /run/rtpengine/sock
### where to store recordings: file (default), db, memory
# output-storage = db;memory
### where to store recordings: file (default), db, memory, s3
# output-storage = db;memory;s3
### format of stored recordings: wav (default), mp3
# output-format = mp3
@ -68,3 +68,12 @@ table = 0
# notify-purge = false
# notify-concurrency = 5
# notify-retries = 10
### S3 storage options
# s3-host = examplebucket.s3.amazonaws.com
# s3-port = 0
# s3-path =
# s3-access-key = AKIAIOSFODNN7EXAMPLE
# s3-secret-key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# s3-region = us-east-1
# s3-no-verify = false

+ 2
- 2
recording-daemon/Makefile View File

@ -48,9 +48,9 @@ LDLIBS+= $(LDLIBS_BCG729)
CFLAGS+= -DCUSTOM_POLLER
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c
decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c s3.c
LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \
dtmflib.c bufferpool.c bencode.c http.c
dtmflib.c bufferpool.c bencode.c http.c s3utils.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S
MDS= rtpengine-recording.ronn


+ 46
- 1
recording-daemon/main.c View File

@ -70,6 +70,14 @@ char *notify_command;
gboolean mix_output_per_media = 0;
gboolean flush_packets = 0;
int resample_audio;
char *s3_host;
unsigned int s3_port;
char *s3_path;
char *s3_access_key;
char *s3_secret_key;
char *s3_region;
gboolean s3_nverify;
static GQueue threads = G_QUEUE_INIT; // only accessed from main thread
@ -202,7 +210,7 @@ static void options(int *argc, char ***argv) {
{ "table", 't', 0, G_OPTION_ARG_INT, &ktable, "Kernel table rtpengine uses", "INT" },
{ "spool-dir", 0, 0, G_OPTION_ARG_FILENAME, &spool_dir, "Directory containing rtpengine metadata files", "PATH" },
{ "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads", "INT" },
{ "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|memory"},
{ "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|s3|memory"},
{ "output-dir", 0, 0, G_OPTION_ARG_STRING, &output_dir, "Where to write media files to", "PATH" },
{ "output-pattern", 0, 0, G_OPTION_ARG_STRING, &output_pattern,"File name pattern for recordings", "STRING" },
{ "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3|none" },
@ -241,6 +249,13 @@ static void options(int *argc, char ***argv) {
{ "notify-purge", 0, 0, G_OPTION_ARG_NONE, &notify_purge, "Remove the local file if notify success", NULL },
#endif
{ "flush-packets", 0, 0, G_OPTION_ARG_NONE, &flush_packets, "Output buffer will be flushed after every packet", NULL },
{ "s3-host", 0, 0, G_OPTION_ARG_STRING, &s3_host, "Host name of S3 service", "HOST" },
{ "s3-port", 0, 0, G_OPTION_ARG_INT, &s3_port, "S3 service port if non-standard", "INT" },
{ "s3-path", 0, 0, G_OPTION_ARG_STRING, &s3_path, "Path prefix for S3 storage or bucket", "STRING" },
{ "s3-access-key", 0, 0, G_OPTION_ARG_STRING, &s3_access_key, "Access key for S3 storage", "STRING" },
{ "s3-secret-key", 0, 0, G_OPTION_ARG_STRING, &s3_secret_key, "Secret key for S3 authentication", "STRING" },
{ "s3-region", 0, 0, G_OPTION_ARG_STRING, &s3_region, "Region configuration for S3 storage", "STRING" },
{ "s3-no-verify", 0, 0, G_OPTION_ARG_NONE, &s3_nverify, "Disable TLS verification for S3", NULL },
{ NULL, }
};
@ -286,6 +301,8 @@ static void options(int *argc, char ***argv) {
#else
die("cURL version too old to support notify storage");
#endif
else if (!strcmp(*iter, "s3"))
output_storage |= OUTPUT_STORAGE_S3;
else if (!strcmp(*iter, "db"))
output_storage |= OUTPUT_STORAGE_DB;
else if (!strcmp(*iter, "db-mem"))
@ -313,6 +330,8 @@ static void options(int *argc, char ***argv) {
die("DB output storage is enabled but no DB is configured");
if ((output_storage & OUTPUT_STORAGE_NOTIFY) && !notify_uri)
die("Notify storage is enabled but notify URI is not set");
if ((output_storage & OUTPUT_STORAGE_S3) && (!s3_host || !s3_access_key || !s3_secret_key || !s3_path || !s3_region))
die("S3 storage is enabled but S3 config is incomplete");
if ((output_storage & OUTPUT_STORAGE_MASK) == 0) {
if (output_mixed || output_single)
@ -329,6 +348,32 @@ static void options(int *argc, char ***argv) {
if ((output_storage & OUTPUT_STORAGE_MASK) || tls_send_to_ep.port)
decoding_enabled = true;
// make sure S3 path always leads with a slash and always ends with one
if (!s3_path)
s3_path = g_strdup("/");
else {
char *tail = s3_path;
// skip heading slashes
while (tail[0] == '/')
tail++;
size_t len = strlen(tail);
// trim trailing slashes
while (len > 0 && tail[len - 1] == '/')
len--;
char *np;
if (len == 0)
np = g_strdup("/"); // nothing left, blank path
else
np = g_strdup_printf("/%.*s/", (int) len, tail);
g_free(s3_path);
s3_path = np;
}
if (notify_purge && (output_storage & OUTPUT_STORAGE_FILE))
output_storage &= ~OUTPUT_STORAGE_FILE;


+ 8
- 0
recording-daemon/main.h View File

@ -11,6 +11,7 @@ enum output_storage_enum {
OUTPUT_STORAGE_FILE = 0x1,
OUTPUT_STORAGE_DB = 0x2,
OUTPUT_STORAGE_NOTIFY = 0x4,
OUTPUT_STORAGE_S3 = 0x8,
OUTPUT_STORAGE_MASK = 0xff,
@ -58,6 +59,13 @@ extern gboolean mix_output_per_media;
extern volatile int shutdown_flag;
extern gboolean flush_packets;
extern int resample_audio;
extern char *s3_host;
extern unsigned int s3_port;
extern char *s3_path;
extern char *s3_access_key;
extern char *s3_secret_key;
extern char *s3_region;
extern gboolean s3_nverify;
extern struct rtpengine_common_config rtpe_common_config;


+ 2
- 0
recording-daemon/output.c View File

@ -14,6 +14,7 @@
#include "notify.h"
#include "resample.h"
#include "fix_frame_channel_layout.h"
#include "s3.h"
#define DEFAULT_AVIO_BUFSIZE 4096
@ -674,6 +675,7 @@ void output_close(metafile_t *mf, output_t *output, tag_t *tag, bool discard) {
if (output_shutdown(output)) {
db_close_stream(output);
notify_push_output(output, mf, tag);
s3_store(output, mf);
}
else
db_delete_stream(mf, output);


+ 163
- 0
recording-daemon/s3.c View File

@ -0,0 +1,163 @@
#include "s3.h"
#include "output.h"
#include "main.h"
#include "notify.h"
#include "http.h"
#include "s3utils.h"
static void s3_setup(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) {
req->object_name = g_strdup_printf("%s.%s", o->file_name, o->file_format);
req->content = output_get_content(o);
}
static bool s3_perform(notif_req_t *req) {
const char *err = NULL;
CURLcode ret;
if (!req->content) {
ilog(LOG_ERR, "Content for S3 upload unavailable ('%s%s%s')", FMT_M(req->name));
return true; // no point in retrying
}
if (!req->content_sha256) {
// do this here, in a separate thread
req->content_sha256 = g_string_sized_new(SHA256_DIGEST_LENGTH * 2 + 1);
g_string_set_size(req->content_sha256, SHA256_DIGEST_LENGTH * 2);
sha256_digest_hex(req->content_sha256->str,
req->content->s->str, req->content->s->len);
}
ilog(LOG_DEBUG, "Launching S3 upload for '%s%s%s' as '%s'", FMT_M(req->name),
req->object_name);
time_t now = time(NULL);
struct tm tm;
gmtime_r(&now, &tm);
g_autoptr(GString) auth = s3_make_auth(s3_host, s3_path, req->object_name,
s3_region, &tm, req->content_sha256->str,
s3_access_key, s3_secret_key);
if (!auth) {
ilog(LOG_ERR, "Failed to create S3 authentication string "
"for '%s%s%s'", FMT_M(req->name));
return false;
}
// build headers
struct curl_slist *headers = NULL;
// hard coded S3 header list, must match s3_make_auth()
http_add_header(&headers, "x-amz-content-sha256: %s", req->content_sha256->str);
http_add_header(&headers, "x-amz-date: %04d%02d%02dT%02d%02d%02dZ",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
http_add_header(&headers, "Authorization: %s", auth->str);
http_add_header(&headers, "Content-length: %zu", req->content->s->len);
http_add_header(&headers, "Content-type: application/data");
g_autoptr(char) uri = NULL;
if (s3_port)
uri = g_strdup_printf("https://%s:%d%s%s",
s3_host,
s3_port,
s3_path,
req->object_name);
else
uri = g_strdup_printf("https://%s%s%s",
s3_host,
s3_path,
req->object_name);
g_autoptr(GString) response = g_string_new("");
g_autoptr(CURL) c = http_create_req(uri,
http_download_write,
response,
http_upload_read,
&(http_upload) {.s = STR_GS(req->content->s) },
headers, !s3_nverify, &ret, &err);
if (!c)
goto fail;
// PUT
err = "setting CURLOPT_UPLOAD";
if ((ret = curl_easy_setopt(c, CURLOPT_UPLOAD, 1L)) != CURLE_OK)
goto fail;
err = "setting CURLOPT_INFILESIZE_LARGE";
if ((ret = curl_easy_setopt(c, CURLOPT_INFILESIZE_LARGE,
(curl_off_t) req->content->s->len)) != CURLE_OK)
goto fail;
err = "performing request";
if ((ret = curl_easy_perform(c)) != CURLE_OK)
goto fail;
long code;
err = "getting CURLINFO_RESPONSE_CODE";
if ((ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code)) != CURLE_OK)
goto fail;
err = "checking response code (not 2xx)";
if (code < 200 || code >= 300) {
ilog(LOG_ERR, "S3 upload returned code %ld, with body: '%s%.*s%s'",
code, FMT_M((int) response->len, response->str));
goto fail;
}
ilog(LOG_DEBUG, "S3 upload for '%s%s%s' successful", FMT_M(req->name));
curl_slist_free_all(headers);
return true;
fail:
ilog(LOG_ERR, "Failed to perform S3 upload for '%s%s%s': "
"Error while %s: %s",
FMT_M(req->name),
err, curl_easy_strerror(ret));
curl_slist_free_all(headers);
return false;
}
static void s3_failed(notif_req_t *req) {
if (req->content)
output_content_failure(req->content);
}
static void s3_cleanup(notif_req_t *req) {
obj_release(req->content);
if (req->content_sha256)
g_string_free(req->content_sha256, TRUE);
g_free(req->object_name);
}
static const notif_action_t action = {
.name = "S3",
.setup = s3_setup,
.perform = s3_perform,
.failed = s3_failed,
.cleanup = s3_cleanup,
};
void s3_store(output_t *o, metafile_t *mf) {
if ((output_storage & OUTPUT_STORAGE_S3))
notify_push_setup(&action, o, mf, NULL);
}

+ 8
- 0
recording-daemon/s3.h View File

@ -0,0 +1,8 @@
#ifndef _S3_H_
#define _S3_H_
#include "types.h"
void s3_store(output_t *, metafile_t *);
#endif

+ 6
- 0
recording-daemon/types.h View File

@ -259,11 +259,17 @@ struct notif_req_s {
struct {
int64_t end_time;
};
// S3
struct {
GString *content_sha256;
};
};
// used by multiple actions
unsigned long long db_id;
content_t *content;
char *object_name;
const notif_action_t *action;


Loading…
Cancel
Save