#include "notify.h" #include #include #include "main.h" #include "log.h" #include "recaux.h" #include "output.h" #include "http.h" static GThreadPool *notify_threadpool; static pthread_mutex_t timer_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t timer_cond = PTHREAD_COND_INITIALIZER; static pthread_t notify_waiter; static GTree *notify_timers; static bool do_notify_http(notif_req_t *req) { const char *err = NULL; CURLcode ret; ilog(LOG_DEBUG, "Launching HTTP notification for '%s%s%s'", FMT_M(req->name)); #if CURL_AT_LEAST_VERSION(7,56,0) g_autoptr(curl_mime) mime = NULL; #endif g_autoptr(CURL) c = http_create_req(notify_uri, http_dummy_write, NULL, http_dummy_read, NULL, req->headers, !notify_nverify, &ret, &err); if (!c) goto fail; /* POST vs GET */ if (notify_post) { err = "setting CURLOPT_POST"; if ((ret = curl_easy_setopt(c, CURLOPT_POST, 1L)) != CURLE_OK) goto fail; } #if CURL_AT_LEAST_VERSION(7,56,0) if (req->content) { err = "initializing curl mime&part"; curl_mimepart *part; mime = curl_mime_init(c); part = curl_mime_addpart(mime); if ((ret = curl_mime_name(part, "ngfile")) != CURLE_OK) goto fail; if ((ret = curl_mime_data(part, req->content->s->str, req->content->s->len)) != CURLE_OK) goto fail; if ((ret = curl_easy_setopt(c, CURLOPT_MIMEPOST, mime)) != CURLE_OK) goto fail; } #endif err = "performing request"; if ((ret = curl_easy_perform(c)) != CURLE_OK) goto fail; long code; err = "getting CURLINFO_RESPONSE_CODE"; if ((ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code)) != CURLE_OK) goto fail; err = "checking response code (not 2xx)"; if (code < 200 || code >= 300) goto fail; /* success */ ilog(LOG_NOTICE, "HTTP notification for '%s%s%s' was successful", FMT_M(req->name)); return true; fail: ilog(LOG_ERR, "Failed to perform HTTP notification for '%s%s%s': " "Error while %s: %s", FMT_M(req->name), err, curl_easy_strerror(ret)); return false; } static void failed_http(notif_req_t *req) { if (req->content) output_content_failure(req->content); } static bool do_notify_command(notif_req_t *req) { ilog(LOG_DEBUG, "Executing notification command for '%s%s%s'", FMT_M(req->name)); GError *err = NULL; bool success = g_spawn_sync(NULL, req->argv, NULL, G_SPAWN_SEARCH_PATH | G_SPAWN_STDOUT_TO_DEV_NULL | G_SPAWN_STDERR_TO_DEV_NULL, NULL, NULL, NULL, NULL, NULL, &err); if (!success) { ilog(LOG_ERR, "Failed to execute notification command for '%s%s%s': %s", FMT_M(req->name), err->message); g_error_free(err); } return success; } static void do_notify(void *p, void *u) { notif_req_t *req = p; bool ok = req->action->perform(req); if (!ok) { if (notify_retries >= 0 && req->retries < notify_retries) { /* schedule retry */ req->retries++; ilog(LOG_INFO, "Failure while sending notification for '%s%s%s': " "Will retry in %" PRId64 " seconds (#%u)", FMT_M(req->name), req->falloff_us / 1000000L, req->retries); req->retry_time = now_us() + req->falloff_us; req->falloff_us *= 2; pthread_mutex_lock(&timer_lock); g_tree_insert(notify_timers, req, req); pthread_cond_signal(&timer_cond); pthread_mutex_unlock(&timer_lock); return; // skip cleanup } ilog(LOG_ERR, "Failure while sending notification for '%s%s%s' after %u retries. " "Giving up", FMT_M(req->name), req->retries); if (req->action->failed) req->action->failed(req); } req->action->cleanup(req); g_free(req->name); g_free(req); } static void *notify_timer(void *p) { pthread_mutex_lock(&timer_lock); // notify_timers being NULL acts as our shutdown flag while (notify_timers) { ilog(LOG_DEBUG, "Notification timer thread looping"); // grab first entry in list, check retry time, sleep if it's in the future notif_req_t *first = rtpe_g_tree_first(notify_timers); if (!first) { ilog(LOG_DEBUG, "No scheduled notification retries, sleeping"); pthread_cond_wait(&timer_cond, &timer_lock); continue; } int64_t now = now_us(); if (now < first->retry_time) { ilog(LOG_DEBUG, "Sleeping until next scheduled notification retry in %" PRId64 " seconds", (first->retry_time - now) / 1000000L); cond_timedwait(&timer_cond, &timer_lock, first->retry_time); continue; } // first entry is ready to run g_tree_remove(notify_timers, first); ilog(LOG_DEBUG, "Notification retry for '%s%s%s' is scheduled now", FMT_M(first->name)); g_thread_pool_push(notify_threadpool, first, NULL); } // clean up pthread_mutex_unlock(&timer_lock); pthread_mutex_destroy(&timer_lock); pthread_cond_destroy(&timer_cond); return NULL; } static int notify_req_cmp(const void *A, const void *B) { const notif_req_t *a = A, *b = B; if (a->retry_time < b->retry_time) return -1; if (a->retry_time > b->retry_time) return 1; if (a < b) return -1; if (a > b) return 1; return 0; } void notify_setup(void) { if (notify_threads <= 0) return; notify_threadpool = g_thread_pool_new(do_notify, NULL, notify_threads, false, NULL); notify_timers = g_tree_new(notify_req_cmp); int ret = pthread_create(¬ify_waiter, NULL, notify_timer, NULL); if (ret) ilog(LOG_ERR, "Failed to launch thread for HTTP notification"); } void notify_cleanup(void) { if (notify_waiter && notify_timers) { // get lock, free GTree, signal thread to shut down pthread_mutex_lock(&timer_lock); g_tree_destroy(notify_timers); notify_timers = NULL; pthread_cond_signal(&timer_cond); pthread_mutex_unlock(&timer_lock); } if (notify_threadpool) g_thread_pool_free(notify_threadpool, true, false); notify_threadpool = NULL; } #define notify_add_header(req, f, ...) http_add_header(&(req)->headers, f, __VA_ARGS__) static void notify_req_setup_http(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) { double now = (double) now_us() / 1000000.; notify_add_header(req, "X-Recording-Call-ID: %s", mf->call_id); notify_add_header(req, "X-Recording-File-Name: %s.%s", o->file_name, o->file_format); notify_add_header(req, "X-Recording-Full-File-Name: %s.%s", o->full_filename, o->file_format); notify_add_header(req, "X-Recording-File-Format: %s", o->file_format); notify_add_header(req, "X-Recording-Kind: %s", o->kind); notify_add_header(req, "X-Recording-Call-Start-Time: %.06f", (double) mf->start_time_us / 1000000.); notify_add_header(req, "X-Recording-Stream-Start-Time: %.06f", (double) o->start_time_us / 1000000.); notify_add_header(req, "X-Recording-Call-End-Time: %.06f", now); notify_add_header(req, "X-Recording-Stream-End-Time: %.06f", now); if (mf->db_id) notify_add_header(req, "X-Recording-Call-DB-ID: %llu", mf->db_id); if (o->db_id) notify_add_header(req, "X-Recording-Stream-DB-ID: %llu", o->db_id); if (mf->metadata) notify_add_header(req, "X-Recording-Call-Metadata: %s", mf->metadata); if (mf->metadata) notify_add_header(req, "X-Recording-DB-Metadata: %s", mf->metadata); if (tag) { notify_add_header(req, "X-Recording-Tag: %s", tag->name); if (tag->label) notify_add_header(req, "X-Recording-Label: %s", tag->label); if (tag->metadata) notify_add_header(req, "X-Recording-Tag-Metadata: %s", tag->metadata); } if ((output_storage & OUTPUT_STORAGE_NOTIFY)) req->content = output_get_content(o); } static void cleanup_http(notif_req_t *req) { curl_slist_free_all(req->headers); obj_release(req->content); } static const notif_action_t http_action = { .name = "HTTP", .setup = notify_req_setup_http, .perform = do_notify_http, .cleanup = cleanup_http, .failed = failed_http, }; static void notify_req_setup_command(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) { req->argv = g_new(char *, 4); req->argv[0] = g_strdup(notify_command); if ((output_storage & OUTPUT_STORAGE_FILE)) req->argv[1] = g_strdup_printf("%s.%s", o->full_filename, o->file_format); else req->argv[1] = g_strdup(""); req->argv[2] = g_strdup_printf("%llu", req->db_id); req->argv[3] = NULL; } static void cleanup_command(notif_req_t *req) { g_strfreev(req->argv); } static const notif_action_t command_action = { .name = "command", .setup = notify_req_setup_command, .perform = do_notify_command, .cleanup = cleanup_command, }; void notify_push_setup(const notif_action_t *action, output_t *o, metafile_t *mf, tag_t *tag) { notif_req_t *req = g_new0(__typeof(*req), 1); req->name = g_strdup_printf("%s for '%s'", action->name, o->file_name); req->action = action; req->db_id = o->db_id; action->setup(req, o, mf, tag); req->falloff_us = 5000000LL; // initial retry time g_thread_pool_push(notify_threadpool, req, NULL); } void notify_push_output(output_t *o, metafile_t *mf, tag_t *tag) { if (notify_uri) notify_push_setup(&http_action, o, mf, tag); if (notify_command) notify_push_setup(&command_action, o, mf, tag); }