Browse Source

MT#61977 add GCS storage option

Change-Id: Idc97ea18bae6215379072bbec05a7b5afae73f00
pull/1998/head
Richard Fuchs 4 months ago
parent
commit
ed68ee3ca5
9 changed files with 248 additions and 6 deletions
  1. +31
    -1
      docs/rtpengine-recording.md
  2. +8
    -1
      etc/rtpengine-recording.conf
  3. +5
    -2
      recording-daemon/Makefile
  4. +162
    -0
      recording-daemon/gcs.c
  5. +11
    -0
      recording-daemon/gcs.h
  6. +22
    -2
      recording-daemon/main.c
  7. +6
    -0
      recording-daemon/main.h
  8. +2
    -0
      recording-daemon/output.c
  9. +1
    -0
      utils/gen-common-flags

+ 31
- 1
docs/rtpengine-recording.md View File

@ -138,7 +138,7 @@ sufficient for a standard installation of rtpengine.
Points to the shared object file (__.so__) containing the reference
implementation for the EVS codec. See the `README` for more details.
- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__s3__\|__none__
- __\-\-output-storage=file__\|__db__\|__memory__\|__notify__\|__s3__\|__gcs__\|__none__
Where to store media files. This option can be given multiple times (or, in
the config file, using a comma-separated list) to enable multiple storage
@ -171,6 +171,9 @@ sufficient for a standard installation of rtpengine.
HTTPS, for example to AWS or MiniO. See the related option below for
configuration.
Use the __gcs__ storage option to enable uploads to Google Cloud Storage or
a compatible service. See the related options below.
- __\-\-output-dir=__*PATH*
Path for media files to be written to if file output is enabled. Defaults to
@ -442,6 +445,33 @@ sufficient for a standard installation of rtpengine.
TLS certificates are verified by default, unless the `no-verify` option is
set.
- __\-\-gcs-uri=__*URI*
- __\-\-gcs-key=__*STR*
- __\-\-gcs-service-account=__*FILE*
- __\-\-gcs-scope=__*STR*
- __\-\-gcs-no-verify__
Configure these settings to use GCS storage. At least the URI and an
authentication method must be set.
The __gcs-uri__ must point to the full URI to post uploads to. Typically it
would contain the name of the storage bucket. The URI must not contain a
query string (i.e. no `?`). For example:
`https://storage.googleapis.com/upload/storage/v1/b/examplebucket/o`
For authentication, either an API key must be provided via __gcs-key__, or
OAuth2 authentication via a service account file must be configured.
To use OAuth2/JWT authentication, the __gcs-service-account__ setting must
point to a service account file in JSON format. This file should at least
contain a `client_email`, a `private_key` in RSA/PEM format, and a
`token_uri`. The authentication scope defaults to
`https://www.googleapis.com/auth/cloud-platform` but can be changed via
__gcs-scope__.
Set __gcs-no-verify__ to disable TLS certificate verification. Note that
this only applies to uploads themselves, not to OAuth2 requests.
## EXIT STATUS
- __0__


+ 8
- 1
etc/rtpengine-recording.conf View File

@ -8,7 +8,7 @@ table = 0
### where to forward to (unix socket)
# forward-to = /run/rtpengine/sock
### where to store recordings: file (default), db, memory, s3
### where to store recordings: file (default), db, memory, s3, gcs
# output-storage = db;memory;s3
### format of stored recordings: wav (default), mp3
@ -77,3 +77,10 @@ table = 0
# s3-secret-key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# s3-region = us-east-1
# s3-no-verify = false
### GCS storage options
# gcs-uri = https://storage.googleapis.com/upload/storage/v1/b/examplebucket/o
# gcs-key =
# gcs-service-account = /etc/rtpengine/secret-service-account.json
# gcs-scope = https://www.googleapis.com/auth/cloud-platform
# gcs-no-verify = false

+ 5
- 2
recording-daemon/Makefile View File

@ -27,6 +27,7 @@ CFLAGS+= $(CFLAGS_OPUS)
CFLAGS+= $(CFLAGS_MYSQL)
CFLAGS+= $(CFLAGS_OPENSSL)
CFLAGS+= $(CFLAGS_LIBCURL)
CFLAGS+= $(CFLAGS_LIBJWT)
LDLIBS:= -lm -ldl
LDLIBS+= $(LDLIBS_GLIB)
@ -41,6 +42,7 @@ LDLIBS+= $(LDLIBS_OPUS)
LDLIBS+= $(LDLIBS_MYSQL)
LDLIBS+= $(LDLIBS_OPENSSL)
LDLIBS+= $(LDLIBS_LIBCURL)
LDLIBS+= $(LDLIBS_LIBJWT)
CFLAGS+= $(CFLAGS_BCG729)
LDLIBS+= $(LDLIBS_BCG729)
@ -48,9 +50,10 @@ LDLIBS+= $(LDLIBS_BCG729)
CFLAGS+= -DCUSTOM_POLLER
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c s3.c
decoder.c output.c mix.c db.c log.c forward.c tag.c custom_poller.c notify.c tls_send.c s3.c \
gcs.c
LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \
dtmflib.c bufferpool.c bencode.c http.c s3utils.c
dtmflib.c bufferpool.c bencode.c http.c s3utils.c oauth.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S
MDS= rtpengine-recording.ronn


+ 162
- 0
recording-daemon/gcs.c View File

@ -0,0 +1,162 @@
#include "gcs.h"
#include "notify.h"
#include "main.h"
#include "output.h"
#include "http.h"
#include "oauth.h"
static oauth_context_t auth_ctx;
static void gcs_setup(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) {
req->object_name = g_strdup_printf("%s.%s", o->file_name, o->file_format);
req->content = output_get_content(o);
}
static void gcs_failed(notif_req_t *req) {
if (req->content)
output_content_failure(req->content);
}
static void gcs_cleanup(notif_req_t *req) {
obj_release(req->content);
g_free(req->object_name);
}
static bool gcs_perform(notif_req_t *req) {
if (!req->content) {
ilog(LOG_ERR, "Content for GCS upload unavailable ('%s%s%s')", FMT_M(req->name));
return true; // no point in retrying
}
ilog(LOG_DEBUG, "Launching GCS upload for '%s%s%s' as '%s'", FMT_M(req->name),
req->object_name);
const char *err = NULL;
CURLcode ret;
struct curl_slist *headers = NULL;
if (gcs_service_account && gcs_service_account[0]) {
g_autoptr(char) jwt_err = NULL;
oauth_add_auth(&headers, &auth_ctx, &jwt_err);
if (jwt_err) {
ilog(LOG_ERR, "Failed to obtain OAuth/JWT token: %s", jwt_err);
return false;
}
}
http_add_header(&headers, "Content-length: %zu", req->content->s->len);
http_add_header(&headers, "Content-type: application/data");
g_autoptr(GString) response = g_string_new("");
g_autoptr(char) uri;
if (gcs_key && gcs_key[0])
uri = g_strdup_printf("%s?name=%s&uploadType=media&key=%s",
gcs_uri, req->object_name, gcs_key);
else
uri = g_strdup_printf("%s?name=%s&uploadType=media",
gcs_uri, req->object_name);
g_autoptr(CURL) c = http_create_req(uri,
http_download_write,
response,
http_upload_read,
&(http_upload) {.s = STR_GS(req->content->s) },
headers, !gcs_nverify, &ret, &err);
if (!c)
goto err;
// POST
err = "setting CURLOPT_POST";
if ((ret = curl_easy_setopt(c, CURLOPT_POST, 1L)) != CURLE_OK)
goto err;
err = "performing request";
if ((ret = curl_easy_perform(c)) != CURLE_OK)
goto err;
long code;
err = "getting CURLINFO_RESPONSE_CODE";
if ((ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code)) != CURLE_OK)
goto err;
err = "checking response code (not 2xx)";
if (code < 200 || code >= 300) {
ilog(LOG_ERR, "GCS upload returned code %ld, with body: '%s%.*s%s'",
code, FMT_M((int) response->len, response->str));
goto err;
}
ilog(LOG_DEBUG, "GCS upload for '%s%s%s' successful", FMT_M(req->name));
return true;
err:
ilog(LOG_ERR, "Failed to perform GCS upload for '%s%s%s': "
"Error while %s: %s",
FMT_M(req->name),
err, curl_easy_strerror(ret));
curl_slist_free_all(headers);
return false;
}
static const notif_action_t action = {
.name = "GCS",
.setup = gcs_setup,
.perform = gcs_perform,
.failed = gcs_failed,
.cleanup = gcs_cleanup,
};
void gcs_store(output_t *o, metafile_t *mf) {
if ((output_storage & OUTPUT_STORAGE_GCS))
notify_push_setup(&action, o, mf, NULL);
}
bool gcs_init(void) {
if (!(output_storage & OUTPUT_STORAGE_GCS))
return true;
if (gcs_service_account && gcs_service_account[0]) {
if (gcs_key && gcs_key[0]) {
ilog(LOG_ERR, "Both GCS service account file and API key are configured");
return false;
}
auth_ctx = (oauth_context_t) {
.service_account_file = gcs_service_account,
.scope = gcs_scope,
.algorithm = "RS256",
};
g_autoptr(char) err = oauth_init(&auth_ctx);
if (err) {
ilog(LOG_ERR, "Failed to initialise OAuth/JWT context: %s", err);
return false;
}
}
else if (!gcs_key || !gcs_key[0]) {
ilog(LOG_ERR, "No GCS service account file and no API key configured");
return false;
}
return true;
}
void gcs_shutdown(void) {
oauth_cleanup(&auth_ctx);
}

+ 11
- 0
recording-daemon/gcs.h View File

@ -0,0 +1,11 @@
#ifndef _GCS_H_
#define _GCS_H_
#include "types.h"
void gcs_store(output_t *, metafile_t *);
bool gcs_init(void);
void gcs_shutdown(void);
#endif

+ 22
- 2
recording-daemon/main.c View File

@ -30,6 +30,7 @@
#include "socket.h"
#include "ssllib.h"
#include "notify.h"
#include "gcs.h"
@ -77,6 +78,11 @@ char *s3_access_key;
char *s3_secret_key;
char *s3_region;
gboolean s3_nverify;
char *gcs_uri;
char *gcs_key;
char *gcs_service_account;
char *gcs_scope;
gboolean gcs_nverify;
static GQueue threads = G_QUEUE_INIT; // only accessed from main thread
@ -116,7 +122,8 @@ static void setup(void) {
metafile_setup();
epoll_setup();
inotify_setup();
if (!gcs_init())
die("GCS failure");
}
@ -210,7 +217,7 @@ static void options(int *argc, char ***argv) {
{ "table", 't', 0, G_OPTION_ARG_INT, &ktable, "Kernel table rtpengine uses", "INT" },
{ "spool-dir", 0, 0, G_OPTION_ARG_FILENAME, &spool_dir, "Directory containing rtpengine metadata files", "PATH" },
{ "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads", "INT" },
{ "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|s3|memory"},
{ "output-storage", 0, 0, G_OPTION_ARG_STRING_ARRAY,&os_a, "Where to store audio streams", "file|db|notify|s3|gcs|memory"},
{ "output-dir", 0, 0, G_OPTION_ARG_STRING, &output_dir, "Where to write media files to", "PATH" },
{ "output-pattern", 0, 0, G_OPTION_ARG_STRING, &output_pattern,"File name pattern for recordings", "STRING" },
{ "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3|none" },
@ -256,6 +263,11 @@ static void options(int *argc, char ***argv) {
{ "s3-secret-key", 0, 0, G_OPTION_ARG_STRING, &s3_secret_key, "Secret key for S3 authentication", "STRING" },
{ "s3-region", 0, 0, G_OPTION_ARG_STRING, &s3_region, "Region configuration for S3 storage", "STRING" },
{ "s3-no-verify", 0, 0, G_OPTION_ARG_NONE, &s3_nverify, "Disable TLS verification for S3", NULL },
{ "gcs-uri", 0, 0, G_OPTION_ARG_STRING, &gcs_uri, "URI for GCS uploads", "STRING" },
{ "gcs-key", 0, 0, G_OPTION_ARG_STRING, &gcs_key, "API key for GCS uploads", "STRING" },
{ "gcs-service-account", 0, 0, G_OPTION_ARG_FILENAME, &gcs_service_account,"Service account JSON file for GCS JWT authentication","FILE" },
{ "gcs-scope", 0, 0, G_OPTION_ARG_STRING, &gcs_scope, "Scope for GCS JWT authentication", "STRING" },
{ "gcs-no-verify", 0, 0, G_OPTION_ARG_NONE, &gcs_nverify, "Disable TLS verification for GCS", NULL },
{ NULL, }
};
@ -303,6 +315,8 @@ static void options(int *argc, char ***argv) {
#endif
else if (!strcmp(*iter, "s3"))
output_storage |= OUTPUT_STORAGE_S3;
else if (!strcmp(*iter, "gcs"))
output_storage |= OUTPUT_STORAGE_GCS;
else if (!strcmp(*iter, "db"))
output_storage |= OUTPUT_STORAGE_DB;
else if (!strcmp(*iter, "db-mem"))
@ -332,6 +346,8 @@ static void options(int *argc, char ***argv) {
die("Notify storage is enabled but notify URI is not set");
if ((output_storage & OUTPUT_STORAGE_S3) && (!s3_host || !s3_access_key || !s3_secret_key || !s3_path || !s3_region))
die("S3 storage is enabled but S3 config is incomplete");
if ((output_storage & OUTPUT_STORAGE_GCS) && !gcs_uri)
die("GCS storage is enabled but GCS config is incomplete");
if ((output_storage & OUTPUT_STORAGE_MASK) == 0) {
if (output_mixed || output_single)
@ -377,6 +393,9 @@ static void options(int *argc, char ***argv) {
if (notify_purge && (output_storage & OUTPUT_STORAGE_FILE))
output_storage &= ~OUTPUT_STORAGE_FILE;
if (!gcs_scope || !gcs_scope[0])
gcs_scope = g_strdup("https://www.googleapis.com/auth/cloud-platform");
if (!mix_method_str || !mix_method_str[0] || !strcmp(mix_method_str, "direct"))
mix_method = MM_DIRECT;
else if (!strcmp(mix_method_str, "channels"))
@ -474,6 +493,7 @@ int main(int argc, char **argv) {
if (decoding_enabled)
codeclib_free();
gcs_shutdown();
cleanup();
log_free();
options_free();


+ 6
- 0
recording-daemon/main.h View File

@ -12,6 +12,7 @@ enum output_storage_enum {
OUTPUT_STORAGE_DB = 0x2,
OUTPUT_STORAGE_NOTIFY = 0x4,
OUTPUT_STORAGE_S3 = 0x8,
OUTPUT_STORAGE_GCS = 0x10,
OUTPUT_STORAGE_MASK = 0xff,
@ -66,6 +67,11 @@ extern char *s3_access_key;
extern char *s3_secret_key;
extern char *s3_region;
extern gboolean s3_nverify;
extern char *gcs_uri;
extern char *gcs_key;
extern char *gcs_service_account;
extern char *gcs_scope;
extern gboolean gcs_nverify;
extern struct rtpengine_common_config rtpe_common_config;


+ 2
- 0
recording-daemon/output.c View File

@ -15,6 +15,7 @@
#include "resample.h"
#include "fix_frame_channel_layout.h"
#include "s3.h"
#include "gcs.h"
#define DEFAULT_AVIO_BUFSIZE 4096
@ -676,6 +677,7 @@ void output_close(metafile_t *mf, output_t *output, tag_t *tag, bool discard) {
db_close_stream(output);
notify_push_output(output, mf, tag);
s3_store(output, mf);
gcs_store(output, mf);
}
else
db_delete_stream(mf, output);


+ 1
- 0
utils/gen-common-flags View File

@ -86,6 +86,7 @@ gen-pkgconf-flags NCURSESW ncursesw
gen-pkgconf-flags OPENSSL openssl
gen-pkgconf-flags OPUS opus
gen-pkgconf-flags SPANDSP spandsp
gen-pkgconf-flags LIBJWT libjwt
echo "export CFLAGS_MYSQL := $(mysql_config --cflags)"
echo "export LDLIBS_MYSQL := $(mysql_config --libs)"


Loading…
Cancel
Save