From ed68ee3ca59833e41c42170f59fe25fea9e25d5e Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 26 Aug 2025 14:04:30 -0400 Subject: [PATCH] MT#61977 add GCS storage option Change-Id: Idc97ea18bae6215379072bbec05a7b5afae73f00 --- docs/rtpengine-recording.md | 32 ++++++- etc/rtpengine-recording.conf | 9 +- recording-daemon/Makefile | 7 +- recording-daemon/gcs.c | 162 +++++++++++++++++++++++++++++++++++ recording-daemon/gcs.h | 11 +++ recording-daemon/main.c | 24 +++++- recording-daemon/main.h | 6 ++ recording-daemon/output.c | 2 + utils/gen-common-flags | 1 + 9 files changed, 248 insertions(+), 6 deletions(-) create mode 100644 recording-daemon/gcs.c create mode 100644 recording-daemon/gcs.h diff --git a/docs/rtpengine-recording.md b/docs/rtpengine-recording.md index c83dbf1d9..7bf898a97 100644 --- a/docs/rtpengine-recording.md +++ b/docs/rtpengine-recording.md @@ -138,7 +138,7 @@ sufficient for a standard installation of rtpengine. Points to the shared object file (__.so__) containing the reference implementation for the EVS codec. See the `README` for more details. -- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__s3__\|__none__ +- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__s3__\|__gcs__\|__none__ Where to store media files. This option can be given multiple times (or, in the config file, using a comma-separated list) to enable multiple storage @@ -171,6 +171,9 @@ sufficient for a standard installation of rtpengine. HTTPS, for example to AWS or MiniO. See the related option below for configuration. + Use the __gcs__ storage option to enable uploads to Google Cloud Storage or + a compatible service. See the related options below. + - __\-\-output-dir=__*PATH* Path for media files to be written to if file output is enabled. Defaults to @@ -442,6 +445,33 @@ sufficient for a standard installation of rtpengine. TLS certificates are verified by default, unless the `no-verify` option is set. +- __\-\-gcs-uri=__*URI* +- __\-\-gcs-key=__*STR* +- __\-\-gcs-service-account=__*FILE* +- __\-\-gcs-scope=__*STR* +- __\-\-gcs-no-verify__ + + Configure these settings to use GCS storage. At least the URI and an + authentication method must be set. + + The __gcs-uri__ must point to the full URI to post uploads to. Typically it + would contain the name of the storage bucket. The URI must not contain a + query string (i.e. no `?`). For example: + `https://storage.googleapis.com/upload/storage/v1/b/examplebucket/o` + + For authentication, either an API key must be provided via __gcs-key__, or + OAuth2 authentication via a service account file must be configured. + + To use OAuth2/JWT authentication, the __gcs-service-account__ setting must + point to a service account file in JSON format. This file should at least + contain a `client_email`, a `private_key` in RSA/PEM format, and a + `token_uri`. The authentication scope defaults to + `https://www.googleapis.com/auth/cloud-platform` but can be changed via + __gcs-scope__. + + Set __gcs-no-verify__ to disable TLS certificate verification. Note that + this only applies to uploads themselves, not to OAuth2 requests. + ## EXIT STATUS - __0__ diff --git a/etc/rtpengine-recording.conf b/etc/rtpengine-recording.conf index c3fc51ad1..f24067573 100644 --- a/etc/rtpengine-recording.conf +++ b/etc/rtpengine-recording.conf @@ -8,7 +8,7 @@ table = 0 ### where to forward to (unix socket) # forward-to = /run/rtpengine/sock -### where to store recordings: file (default), db, memory, s3 +### where to store recordings: file (default), db, memory, s3, gcs # output-storage = db;memory;s3 ### format of stored recordings: wav (default), mp3 @@ -77,3 +77,10 @@ table = 0 # s3-secret-key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY # s3-region = us-east-1 # s3-no-verify = false + +### GCS storage options +# gcs-uri = https://storage.googleapis.com/upload/storage/v1/b/examplebucket/o +# gcs-key = +# gcs-service-account = /etc/rtpengine/secret-service-account.json +# gcs-scope = https://www.googleapis.com/auth/cloud-platform +# gcs-no-verify = false diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index dcae56bae..6d121fb27 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -27,6 +27,7 @@ CFLAGS+= $(CFLAGS_OPUS) CFLAGS+= $(CFLAGS_MYSQL) CFLAGS+= $(CFLAGS_OPENSSL) CFLAGS+= $(CFLAGS_LIBCURL) +CFLAGS+= $(CFLAGS_LIBJWT) LDLIBS:= -lm -ldl LDLIBS+= $(LDLIBS_GLIB) @@ -41,6 +42,7 @@ LDLIBS+= $(LDLIBS_OPUS) LDLIBS+= $(LDLIBS_MYSQL) LDLIBS+= $(LDLIBS_OPENSSL) LDLIBS+= $(LDLIBS_LIBCURL) +LDLIBS+= $(LDLIBS_LIBJWT) CFLAGS+= $(CFLAGS_BCG729) LDLIBS+= $(LDLIBS_BCG729) @@ -48,9 +50,10 @@ LDLIBS+= $(LDLIBS_BCG729) CFLAGS+= -DCUSTOM_POLLER SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \ - decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c s3.c + decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c s3.c \ + gcs.c LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \ - dtmflib.c bufferpool.c bencode.c http.c s3utils.c + dtmflib.c bufferpool.c bencode.c http.c s3utils.c oauth.c LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S MDS= rtpengine-recording.ronn diff --git a/recording-daemon/gcs.c b/recording-daemon/gcs.c new file mode 100644 index 000000000..626664c42 --- /dev/null +++ b/recording-daemon/gcs.c @@ -0,0 +1,162 @@ +#include "gcs.h" +#include "notify.h" +#include "main.h" +#include "output.h" +#include "http.h" +#include "oauth.h" + + +static oauth_context_t auth_ctx; + + +static void gcs_setup(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) { + req->object_name = g_strdup_printf("%s.%s", o->file_name, o->file_format); + req->content = output_get_content(o); +} + + +static void gcs_failed(notif_req_t *req) { + if (req->content) + output_content_failure(req->content); +} + + +static void gcs_cleanup(notif_req_t *req) { + obj_release(req->content); + g_free(req->object_name); +} + + +static bool gcs_perform(notif_req_t *req) { + if (!req->content) { + ilog(LOG_ERR, "Content for GCS upload unavailable ('%s%s%s')", FMT_M(req->name)); + return true; // no point in retrying + } + + ilog(LOG_DEBUG, "Launching GCS upload for '%s%s%s' as '%s'", FMT_M(req->name), + req->object_name); + + const char *err = NULL; + CURLcode ret; + + struct curl_slist *headers = NULL; + + if (gcs_service_account && gcs_service_account[0]) { + g_autoptr(char) jwt_err = NULL; + oauth_add_auth(&headers, &auth_ctx, &jwt_err); + if (jwt_err) { + ilog(LOG_ERR, "Failed to obtain OAuth/JWT token: %s", jwt_err); + return false; + } + } + + http_add_header(&headers, "Content-length: %zu", req->content->s->len); + http_add_header(&headers, "Content-type: application/data"); + + g_autoptr(GString) response = g_string_new(""); + + g_autoptr(char) uri; + + if (gcs_key && gcs_key[0]) + uri = g_strdup_printf("%s?name=%s&uploadType=media&key=%s", + gcs_uri, req->object_name, gcs_key); + else + uri = g_strdup_printf("%s?name=%s&uploadType=media", + gcs_uri, req->object_name); + + g_autoptr(CURL) c = http_create_req(uri, + http_download_write, + response, + http_upload_read, + &(http_upload) {.s = STR_GS(req->content->s) }, + headers, !gcs_nverify, &ret, &err); + if (!c) + goto err; + + + // POST + err = "setting CURLOPT_POST"; + if ((ret = curl_easy_setopt(c, CURLOPT_POST, 1L)) != CURLE_OK) + goto err; + + err = "performing request"; + if ((ret = curl_easy_perform(c)) != CURLE_OK) + goto err; + + long code; + err = "getting CURLINFO_RESPONSE_CODE"; + if ((ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code)) != CURLE_OK) + goto err; + + err = "checking response code (not 2xx)"; + if (code < 200 || code >= 300) { + ilog(LOG_ERR, "GCS upload returned code %ld, with body: '%s%.*s%s'", + code, FMT_M((int) response->len, response->str)); + goto err; + } + + ilog(LOG_DEBUG, "GCS upload for '%s%s%s' successful", FMT_M(req->name)); + + return true; + +err: + ilog(LOG_ERR, "Failed to perform GCS upload for '%s%s%s': " + "Error while %s: %s", + FMT_M(req->name), + err, curl_easy_strerror(ret)); + + curl_slist_free_all(headers); + + return false; +} + + +static const notif_action_t action = { + .name = "GCS", + .setup = gcs_setup, + .perform = gcs_perform, + .failed = gcs_failed, + .cleanup = gcs_cleanup, +}; + + +void gcs_store(output_t *o, metafile_t *mf) { + if ((output_storage & OUTPUT_STORAGE_GCS)) + notify_push_setup(&action, o, mf, NULL); +} + + +bool gcs_init(void) { + if (!(output_storage & OUTPUT_STORAGE_GCS)) + return true; + + if (gcs_service_account && gcs_service_account[0]) { + if (gcs_key && gcs_key[0]) { + ilog(LOG_ERR, "Both GCS service account file and API key are configured"); + return false; + } + + auth_ctx = (oauth_context_t) { + .service_account_file = gcs_service_account, + .scope = gcs_scope, + .algorithm = "RS256", + }; + + g_autoptr(char) err = oauth_init(&auth_ctx); + if (err) { + ilog(LOG_ERR, "Failed to initialise OAuth/JWT context: %s", err); + return false; + } + } + else if (!gcs_key || !gcs_key[0]) { + ilog(LOG_ERR, "No GCS service account file and no API key configured"); + return false; + } + + return true; +} + + +void gcs_shutdown(void) { + oauth_cleanup(&auth_ctx); +} diff --git a/recording-daemon/gcs.h b/recording-daemon/gcs.h new file mode 100644 index 000000000..9746cc5f2 --- /dev/null +++ b/recording-daemon/gcs.h @@ -0,0 +1,11 @@ +#ifndef _GCS_H_ +#define _GCS_H_ + +#include "types.h" + +void gcs_store(output_t *, metafile_t *); + +bool gcs_init(void); +void gcs_shutdown(void); + +#endif diff --git a/recording-daemon/main.c b/recording-daemon/main.c index 572c2d3e5..f3ac5fe95 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -30,6 +30,7 @@ #include "socket.h" #include "ssllib.h" #include "notify.h" +#include "gcs.h" @@ -77,6 +78,11 @@ char *s3_access_key; char *s3_secret_key; char *s3_region; gboolean s3_nverify; +char *gcs_uri; +char *gcs_key; +char *gcs_service_account; +char *gcs_scope; +gboolean gcs_nverify; static GQueue threads = G_QUEUE_INIT; // only accessed from main thread @@ -116,7 +122,8 @@ static void setup(void) { metafile_setup(); epoll_setup(); inotify_setup(); - + if (!gcs_init()) + die("GCS failure"); } @@ -210,7 +217,7 @@ static void options(int *argc, char ***argv) { { "table", 't', 0, G_OPTION_ARG_INT, &ktable, "Kernel table rtpengine uses", "INT" }, { "spool-dir", 0, 0, G_OPTION_ARG_FILENAME, &spool_dir, "Directory containing rtpengine metadata files", "PATH" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads", "INT" }, - { "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|s3|memory"}, + { "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|s3|gcs|memory"}, { "output-dir", 0, 0, G_OPTION_ARG_STRING, &output_dir, "Where to write media files to", "PATH" }, { "output-pattern", 0, 0, G_OPTION_ARG_STRING, &output_pattern,"File name pattern for recordings", "STRING" }, { "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3|none" }, @@ -256,6 +263,11 @@ static void options(int *argc, char ***argv) { { "s3-secret-key", 0, 0, G_OPTION_ARG_STRING, &s3_secret_key, "Secret key for S3 authentication", "STRING" }, { "s3-region", 0, 0, G_OPTION_ARG_STRING, &s3_region, "Region configuration for S3 storage", "STRING" }, { "s3-no-verify", 0, 0, G_OPTION_ARG_NONE, &s3_nverify, "Disable TLS verification for S3", NULL }, + { "gcs-uri", 0, 0, G_OPTION_ARG_STRING, &gcs_uri, "URI for GCS uploads", "STRING" }, + { "gcs-key", 0, 0, G_OPTION_ARG_STRING, &gcs_key, "API key for GCS uploads", "STRING" }, + { "gcs-service-account", 0, 0, G_OPTION_ARG_FILENAME, &gcs_service_account,"Service account JSON file for GCS JWT authentication","FILE" }, + { "gcs-scope", 0, 0, G_OPTION_ARG_STRING, &gcs_scope, "Scope for GCS JWT authentication", "STRING" }, + { "gcs-no-verify", 0, 0, G_OPTION_ARG_NONE, &gcs_nverify, "Disable TLS verification for GCS", NULL }, { NULL, } }; @@ -303,6 +315,8 @@ static void options(int *argc, char ***argv) { #endif else if (!strcmp(*iter, "s3")) output_storage |= OUTPUT_STORAGE_S3; + else if (!strcmp(*iter, "gcs")) + output_storage |= OUTPUT_STORAGE_GCS; else if (!strcmp(*iter, "db")) output_storage |= OUTPUT_STORAGE_DB; else if (!strcmp(*iter, "db-mem")) @@ -332,6 +346,8 @@ static void options(int *argc, char ***argv) { die("Notify storage is enabled but notify URI is not set"); if ((output_storage & OUTPUT_STORAGE_S3) && (!s3_host || !s3_access_key || !s3_secret_key || !s3_path || !s3_region)) die("S3 storage is enabled but S3 config is incomplete"); + if ((output_storage & OUTPUT_STORAGE_GCS) && !gcs_uri) + die("GCS storage is enabled but GCS config is incomplete"); if ((output_storage & OUTPUT_STORAGE_MASK) == 0) { if (output_mixed || output_single) @@ -377,6 +393,9 @@ static void options(int *argc, char ***argv) { if (notify_purge && (output_storage & OUTPUT_STORAGE_FILE)) output_storage &= ~OUTPUT_STORAGE_FILE; + if (!gcs_scope || !gcs_scope[0]) + gcs_scope = g_strdup("https://www.googleapis.com/auth/cloud-platform"); + if (!mix_method_str || !mix_method_str[0] || !strcmp(mix_method_str, "direct")) mix_method = MM_DIRECT; else if (!strcmp(mix_method_str, "channels")) @@ -474,6 +493,7 @@ int main(int argc, char **argv) { if (decoding_enabled) codeclib_free(); + gcs_shutdown(); cleanup(); log_free(); options_free(); diff --git a/recording-daemon/main.h b/recording-daemon/main.h index aae6cbf48..e855f89a0 100644 --- a/recording-daemon/main.h +++ b/recording-daemon/main.h @@ -12,6 +12,7 @@ enum output_storage_enum { OUTPUT_STORAGE_DB = 0x2, OUTPUT_STORAGE_NOTIFY = 0x4, OUTPUT_STORAGE_S3 = 0x8, + OUTPUT_STORAGE_GCS = 0x10, OUTPUT_STORAGE_MASK = 0xff, @@ -66,6 +67,11 @@ extern char *s3_access_key; extern char *s3_secret_key; extern char *s3_region; extern gboolean s3_nverify; +extern char *gcs_uri; +extern char *gcs_key; +extern char *gcs_service_account; +extern char *gcs_scope; +extern gboolean gcs_nverify; extern struct rtpengine_common_config rtpe_common_config; diff --git a/recording-daemon/output.c b/recording-daemon/output.c index b1c4228b0..f71ad96c8 100644 --- a/recording-daemon/output.c +++ b/recording-daemon/output.c @@ -15,6 +15,7 @@ #include "resample.h" #include "fix_frame_channel_layout.h" #include "s3.h" +#include "gcs.h" #define DEFAULT_AVIO_BUFSIZE 4096 @@ -676,6 +677,7 @@ void output_close(metafile_t *mf, output_t *output, tag_t *tag, bool discard) { db_close_stream(output); notify_push_output(output, mf, tag); s3_store(output, mf); + gcs_store(output, mf); } else db_delete_stream(mf, output); diff --git a/utils/gen-common-flags b/utils/gen-common-flags index 7db0ed958..320208ac0 100755 --- a/utils/gen-common-flags +++ b/utils/gen-common-flags @@ -86,6 +86,7 @@ gen-pkgconf-flags NCURSESW ncursesw gen-pkgconf-flags OPENSSL openssl gen-pkgconf-flags OPUS opus gen-pkgconf-flags SPANDSP spandsp +gen-pkgconf-flags LIBJWT libjwt echo "export CFLAGS_MYSQL := $(mysql_config --cflags)" echo "export LDLIBS_MYSQL := $(mysql_config --libs)"