diff --git a/docs/rtpengine-recording.md b/docs/rtpengine-recording.md index 59e5a2034..c83dbf1d9 100644 --- a/docs/rtpengine-recording.md +++ b/docs/rtpengine-recording.md @@ -138,7 +138,7 @@ sufficient for a standard installation of rtpengine. Points to the shared object file (__.so__) containing the reference implementation for the EVS codec. See the `README` for more details. -- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__none__ +- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__s3__\|__none__ Where to store media files. This option can be given multiple times (or, in the config file, using a comma-separated list) to enable multiple storage @@ -167,6 +167,10 @@ sufficient for a standard installation of rtpengine. __db-mem__ can be used as a shortcut to setting both __db__ and __memory__. + The __s3__ storage option enables upload to an S3-compatible service via + HTTPS, for example to AWS or MiniO. See the related option below for + configuration. + - __\-\-output-dir=__*PATH* Path for media files to be written to if file output is enabled. Defaults to @@ -407,7 +411,37 @@ sufficient for a standard installation of rtpengine. - __\-\-flush-packets__ - Forces that the output buffer will be flushed after every packet, ensuring that the recording file grows steadily and becomes available for processing without delays. + Forces that the output buffer will be flushed after every packet, ensuring + that the recording file grows steadily and becomes available for processing + without delays. + +- __\-\-s3-host=__*HOST* +- __\-\-s3-port=__*INT* +- __\-\-s3-path=__*STR* +- __\-\-s3-access-key=__*STR* +- __\-\-s3-secret-key=__*STR* +- __\-\-s3-region=__*STR* +- __\-\-s3-no-verify__ + + These options must be configured if S3 storage upload is enabled. The port + is optional and defaults to 443 (HTTPS) if not configured. + + The host name may or may not need to include the bucket name. For example + `minio.example.com` or `examplebucket.s3.amazonaws.com`. + + The path can be blank if the host name includes the bucket name. Otherwise + the bucket name becomes part of the path (such as `/examplebucket`). This + option must still be set even if the path is blank. + + Access key and secret key are the credentials provided by the service (e.g. + `AKIAIOSFODNN7EXAMPLE` and `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`). + + The region code is part of the credentials (e.g. `us-east-1`). If the + service doesn't use a region code, then this must be set to a blank string. + + TLS certificates are verified by default, unless the `no-verify` option is + set. + ## EXIT STATUS - __0__ diff --git a/etc/rtpengine-recording.conf b/etc/rtpengine-recording.conf index a3dd277bb..c3fc51ad1 100644 --- a/etc/rtpengine-recording.conf +++ b/etc/rtpengine-recording.conf @@ -8,8 +8,8 @@ table = 0 ### where to forward to (unix socket) # forward-to = /run/rtpengine/sock -### where to store recordings: file (default), db, memory -# output-storage = db;memory +### where to store recordings: file (default), db, memory, s3 +# output-storage = db;memory;s3 ### format of stored recordings: wav (default), mp3 # output-format = mp3 @@ -68,3 +68,12 @@ table = 0 # notify-purge = false # notify-concurrency = 5 # notify-retries = 10 + +### S3 storage options +# s3-host = examplebucket.s3.amazonaws.com +# s3-port = 0 +# s3-path = +# s3-access-key = AKIAIOSFODNN7EXAMPLE +# s3-secret-key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY +# s3-region = us-east-1 +# s3-no-verify = false diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index 6a91fb7e1..dcae56bae 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -48,9 +48,9 @@ LDLIBS+= $(LDLIBS_BCG729) CFLAGS+= -DCUSTOM_POLLER SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \ - decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c + decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c s3.c LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \ - dtmflib.c bufferpool.c bencode.c http.c + dtmflib.c bufferpool.c bencode.c http.c s3utils.c LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S MDS= rtpengine-recording.ronn diff --git a/recording-daemon/main.c b/recording-daemon/main.c index 720d09e85..572c2d3e5 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -70,6 +70,14 @@ char *notify_command; gboolean mix_output_per_media = 0; gboolean flush_packets = 0; int resample_audio; +char *s3_host; +unsigned int s3_port; +char *s3_path; +char *s3_access_key; +char *s3_secret_key; +char *s3_region; +gboolean s3_nverify; + static GQueue threads = G_QUEUE_INIT; // only accessed from main thread @@ -202,7 +210,7 @@ static void options(int *argc, char ***argv) { { "table", 't', 0, G_OPTION_ARG_INT, &ktable, "Kernel table rtpengine uses", "INT" }, { "spool-dir", 0, 0, G_OPTION_ARG_FILENAME, &spool_dir, "Directory containing rtpengine metadata files", "PATH" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads", "INT" }, - { "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|memory"}, + { "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|s3|memory"}, { "output-dir", 0, 0, G_OPTION_ARG_STRING, &output_dir, "Where to write media files to", "PATH" }, { "output-pattern", 0, 0, G_OPTION_ARG_STRING, &output_pattern,"File name pattern for recordings", "STRING" }, { "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3|none" }, @@ -241,6 +249,13 @@ static void options(int *argc, char ***argv) { { "notify-purge", 0, 0, G_OPTION_ARG_NONE, ¬ify_purge, "Remove the local file if notify success", NULL }, #endif { "flush-packets", 0, 0, G_OPTION_ARG_NONE, &flush_packets, "Output buffer will be flushed after every packet", NULL }, + { "s3-host", 0, 0, G_OPTION_ARG_STRING, &s3_host, "Host name of S3 service", "HOST" }, + { "s3-port", 0, 0, G_OPTION_ARG_INT, &s3_port, "S3 service port if non-standard", "INT" }, + { "s3-path", 0, 0, G_OPTION_ARG_STRING, &s3_path, "Path prefix for S3 storage or bucket", "STRING" }, + { "s3-access-key", 0, 0, G_OPTION_ARG_STRING, &s3_access_key, "Access key for S3 storage", "STRING" }, + { "s3-secret-key", 0, 0, G_OPTION_ARG_STRING, &s3_secret_key, "Secret key for S3 authentication", "STRING" }, + { "s3-region", 0, 0, G_OPTION_ARG_STRING, &s3_region, "Region configuration for S3 storage", "STRING" }, + { "s3-no-verify", 0, 0, G_OPTION_ARG_NONE, &s3_nverify, "Disable TLS verification for S3", NULL }, { NULL, } }; @@ -286,6 +301,8 @@ static void options(int *argc, char ***argv) { #else die("cURL version too old to support notify storage"); #endif + else if (!strcmp(*iter, "s3")) + output_storage |= OUTPUT_STORAGE_S3; else if (!strcmp(*iter, "db")) output_storage |= OUTPUT_STORAGE_DB; else if (!strcmp(*iter, "db-mem")) @@ -313,6 +330,8 @@ static void options(int *argc, char ***argv) { die("DB output storage is enabled but no DB is configured"); if ((output_storage & OUTPUT_STORAGE_NOTIFY) && !notify_uri) die("Notify storage is enabled but notify URI is not set"); + if ((output_storage & OUTPUT_STORAGE_S3) && (!s3_host || !s3_access_key || !s3_secret_key || !s3_path || !s3_region)) + die("S3 storage is enabled but S3 config is incomplete"); if ((output_storage & OUTPUT_STORAGE_MASK) == 0) { if (output_mixed || output_single) @@ -329,6 +348,32 @@ static void options(int *argc, char ***argv) { if ((output_storage & OUTPUT_STORAGE_MASK) || tls_send_to_ep.port) decoding_enabled = true; + // make sure S3 path always leads with a slash and always ends with one + if (!s3_path) + s3_path = g_strdup("/"); + else { + char *tail = s3_path; + // skip heading slashes + while (tail[0] == '/') + tail++; + + size_t len = strlen(tail); + + // trim trailing slashes + while (len > 0 && tail[len - 1] == '/') + len--; + + char *np; + if (len == 0) + np = g_strdup("/"); // nothing left, blank path + else + np = g_strdup_printf("/%.*s/", (int) len, tail); + + g_free(s3_path); + s3_path = np; + + } + if (notify_purge && (output_storage & OUTPUT_STORAGE_FILE)) output_storage &= ~OUTPUT_STORAGE_FILE; diff --git a/recording-daemon/main.h b/recording-daemon/main.h index 9dab3da64..aae6cbf48 100644 --- a/recording-daemon/main.h +++ b/recording-daemon/main.h @@ -11,6 +11,7 @@ enum output_storage_enum { OUTPUT_STORAGE_FILE = 0x1, OUTPUT_STORAGE_DB = 0x2, OUTPUT_STORAGE_NOTIFY = 0x4, + OUTPUT_STORAGE_S3 = 0x8, OUTPUT_STORAGE_MASK = 0xff, @@ -58,6 +59,13 @@ extern gboolean mix_output_per_media; extern volatile int shutdown_flag; extern gboolean flush_packets; extern int resample_audio; +extern char *s3_host; +extern unsigned int s3_port; +extern char *s3_path; +extern char *s3_access_key; +extern char *s3_secret_key; +extern char *s3_region; +extern gboolean s3_nverify; extern struct rtpengine_common_config rtpe_common_config; diff --git a/recording-daemon/output.c b/recording-daemon/output.c index 4ed0a134a..b1c4228b0 100644 --- a/recording-daemon/output.c +++ b/recording-daemon/output.c @@ -14,6 +14,7 @@ #include "notify.h" #include "resample.h" #include "fix_frame_channel_layout.h" +#include "s3.h" #define DEFAULT_AVIO_BUFSIZE 4096 @@ -674,6 +675,7 @@ void output_close(metafile_t *mf, output_t *output, tag_t *tag, bool discard) { if (output_shutdown(output)) { db_close_stream(output); notify_push_output(output, mf, tag); + s3_store(output, mf); } else db_delete_stream(mf, output); diff --git a/recording-daemon/s3.c b/recording-daemon/s3.c new file mode 100644 index 000000000..aafb66741 --- /dev/null +++ b/recording-daemon/s3.c @@ -0,0 +1,163 @@ +#include "s3.h" +#include "output.h" +#include "main.h" +#include "notify.h" +#include "http.h" +#include "s3utils.h" + + +static void s3_setup(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) { + req->object_name = g_strdup_printf("%s.%s", o->file_name, o->file_format); + req->content = output_get_content(o); +} + + +static bool s3_perform(notif_req_t *req) { + const char *err = NULL; + CURLcode ret; + + if (!req->content) { + ilog(LOG_ERR, "Content for S3 upload unavailable ('%s%s%s')", FMT_M(req->name)); + return true; // no point in retrying + } + + if (!req->content_sha256) { + // do this here, in a separate thread + req->content_sha256 = g_string_sized_new(SHA256_DIGEST_LENGTH * 2 + 1); + g_string_set_size(req->content_sha256, SHA256_DIGEST_LENGTH * 2); + sha256_digest_hex(req->content_sha256->str, + req->content->s->str, req->content->s->len); + } + + ilog(LOG_DEBUG, "Launching S3 upload for '%s%s%s' as '%s'", FMT_M(req->name), + req->object_name); + + time_t now = time(NULL); + struct tm tm; + gmtime_r(&now, &tm); + + g_autoptr(GString) auth = s3_make_auth(s3_host, s3_path, req->object_name, + s3_region, &tm, req->content_sha256->str, + s3_access_key, s3_secret_key); + + if (!auth) { + ilog(LOG_ERR, "Failed to create S3 authentication string " + "for '%s%s%s'", FMT_M(req->name)); + return false; + } + + // build headers + struct curl_slist *headers = NULL; + + // hard coded S3 header list, must match s3_make_auth() + http_add_header(&headers, "x-amz-content-sha256: %s", req->content_sha256->str); + http_add_header(&headers, "x-amz-date: %04d%02d%02dT%02d%02d%02dZ", + tm.tm_year + 1900, + tm.tm_mon + 1, + tm.tm_mday, + tm.tm_hour, + tm.tm_min, + tm.tm_sec); + + http_add_header(&headers, "Authorization: %s", auth->str); + + http_add_header(&headers, "Content-length: %zu", req->content->s->len); + http_add_header(&headers, "Content-type: application/data"); + + g_autoptr(char) uri = NULL; + + if (s3_port) + uri = g_strdup_printf("https://%s:%d%s%s", + s3_host, + s3_port, + s3_path, + req->object_name); + else + uri = g_strdup_printf("https://%s%s%s", + s3_host, + s3_path, + req->object_name); + + g_autoptr(GString) response = g_string_new(""); + + g_autoptr(CURL) c = http_create_req(uri, + http_download_write, + response, + http_upload_read, + &(http_upload) {.s = STR_GS(req->content->s) }, + headers, !s3_nverify, &ret, &err); + if (!c) + goto fail; + + // PUT + err = "setting CURLOPT_UPLOAD"; + if ((ret = curl_easy_setopt(c, CURLOPT_UPLOAD, 1L)) != CURLE_OK) + goto fail; + + err = "setting CURLOPT_INFILESIZE_LARGE"; + if ((ret = curl_easy_setopt(c, CURLOPT_INFILESIZE_LARGE, + (curl_off_t) req->content->s->len)) != CURLE_OK) + goto fail; + + + err = "performing request"; + if ((ret = curl_easy_perform(c)) != CURLE_OK) + goto fail; + + long code; + err = "getting CURLINFO_RESPONSE_CODE"; + if ((ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code)) != CURLE_OK) + goto fail; + + err = "checking response code (not 2xx)"; + if (code < 200 || code >= 300) { + ilog(LOG_ERR, "S3 upload returned code %ld, with body: '%s%.*s%s'", + code, FMT_M((int) response->len, response->str)); + goto fail; + } + + ilog(LOG_DEBUG, "S3 upload for '%s%s%s' successful", FMT_M(req->name)); + + curl_slist_free_all(headers); + + return true; + +fail: + ilog(LOG_ERR, "Failed to perform S3 upload for '%s%s%s': " + "Error while %s: %s", + FMT_M(req->name), + err, curl_easy_strerror(ret)); + + curl_slist_free_all(headers); + + return false; +} + + +static void s3_failed(notif_req_t *req) { + if (req->content) + output_content_failure(req->content); +} + + +static void s3_cleanup(notif_req_t *req) { + obj_release(req->content); + if (req->content_sha256) + g_string_free(req->content_sha256, TRUE); + g_free(req->object_name); +} + + +static const notif_action_t action = { + .name = "S3", + .setup = s3_setup, + .perform = s3_perform, + .failed = s3_failed, + .cleanup = s3_cleanup, +}; + + +void s3_store(output_t *o, metafile_t *mf) { + if ((output_storage & OUTPUT_STORAGE_S3)) + notify_push_setup(&action, o, mf, NULL); +} diff --git a/recording-daemon/s3.h b/recording-daemon/s3.h new file mode 100644 index 000000000..69be0d125 --- /dev/null +++ b/recording-daemon/s3.h @@ -0,0 +1,8 @@ +#ifndef _S3_H_ +#define _S3_H_ + +#include "types.h" + +void s3_store(output_t *, metafile_t *); + +#endif diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 83f2d82d7..eee0270de 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -259,11 +259,17 @@ struct notif_req_s { struct { int64_t end_time; }; + + // S3 + struct { + GString *content_sha256; + }; }; // used by multiple actions unsigned long long db_id; content_t *content; + char *object_name; const notif_action_t *action;