diff --git a/README.md b/README.md index b3971e5f5..da4ab0f41 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,8 @@ option and which are reproduced below: --sip-source Use SIP source address by default --dtls-passive Always prefer DTLS passive role --max-sessions=INT Limit the number of maximum concurrent sessions + --max-load=FLOAT Reject new sessions if load averages exceeds this value + --max-cpu=FLOAT Reject new sessions if CPU usage (in percent) exceeds this value --homer=IP46:PORT Address of Homer server for RTCP stats --homer-protocol=udp|tcp Transport protocol for Homer (default udp) --homer-id=INT 'Capture ID' to use within the HEP protocol @@ -513,6 +515,17 @@ The options are described in more detail below. Disable feature: 'rtpengine-ctl set maxsessions -1' By default, the feature is disabled (i.e. maxsessions == -1). +* --max-load + + If the current 1-minute load average exceeds the value given here, reject new sessions until + the load average drops below the threshold. + +* --max-cpu + + If the current CPU usage (in percent) exceeds the value given here, reject new sessions until + the load average drops below the threshold. CPU usage is sampled in 0.5 second intervals. Only + supported on systems providing a Linux-style `/proc/stat`. + * --homer Enables sending the decoded contents of RTCP packets to a Homer SIP capture server. The transport diff --git a/daemon/Makefile b/daemon/Makefile index 418854e74..fb8808575 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -117,7 +117,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_u bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c + codec.c load.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index c1b3c84f7..14a158a6c 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -26,6 +26,7 @@ #include "tcp_listener.h" #include "streambuf.h" #include "main.h" +#include "load.h" static pcre *info_re; @@ -740,6 +741,24 @@ static enum load_limit_reasons call_offer_session_limit(void) { rwlock_unlock_r(&rtpe_callhash_lock); } + if (!ret && rtpe_config.load_limit) { + int loadavg = g_atomic_int_get(&load_average); + if (loadavg >= rtpe_config.load_limit) { + ilog(LOG_WARN, "Load limit exceeded (%.2f > %.2f)", + (double) loadavg / 100.0, (double) rtpe_config.load_limit / 100.0); + ret = LOAD_LIMIT_LOAD; + } + } + + if (!ret && rtpe_config.cpu_limit) { + int cpu = g_atomic_int_get(&cpu_usage); + if (cpu >= rtpe_config.cpu_limit) { + ilog(LOG_WARN, "CPU usage limit exceeded (%.1f%% > %.1f%%)", + (double) cpu / 100.0, (double) rtpe_config.cpu_limit / 100.0); + ret = LOAD_LIMIT_CPU; + } + } + rwlock_unlock_r(&rtpe_config.config_lock); return ret; diff --git a/daemon/cli.c b/daemon/cli.c index b1f5494ab..2079f8d65 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -44,6 +44,8 @@ static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer); static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer); static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_maxcpu(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_maxload(str *instr, struct streambuf *replybuffer); static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer); static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer); static void cli_incoming_set_offertimeout(str *instr, struct streambuf *replybuffer); @@ -61,6 +63,8 @@ static void cli_incoming_params_diff(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_maxcpu(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_maxload(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer); @@ -90,6 +94,8 @@ static const cli_handler_t cli_top_handlers[] = { static const cli_handler_t cli_set_handlers[] = { { "maxopenfiles", cli_incoming_set_maxopenfiles }, { "maxsessions", cli_incoming_set_maxsessions }, + { "maxcpu", cli_incoming_set_maxcpu }, + { "maxload", cli_incoming_set_maxload }, { "timeout", cli_incoming_set_timeout }, { "silenttimeout", cli_incoming_set_silenttimeout }, { "offertimeout", cli_incoming_set_offertimeout }, @@ -108,6 +114,8 @@ static const cli_handler_t cli_list_handlers[] = { { "totals", cli_incoming_list_totals }, { "maxopenfiles", cli_incoming_list_maxopenfiles }, { "maxsessions", cli_incoming_list_maxsessions }, + { "maxcpu", cli_incoming_list_maxcpu }, + { "maxload", cli_incoming_list_maxload }, { "timeout", cli_incoming_list_timeout }, { "silenttimeout", cli_incoming_list_silenttimeout }, { "offertimeout", cli_incoming_list_offertimeout }, @@ -219,7 +227,9 @@ static void cli_incoming_params_start(str *instr, struct streambuf *replybuffer) "delete-delay = %d\nredis-expires = %d\ntos = %d\ncontrol-tos = %d\ngraphite-interval = %d\nredis-num-threads = %d\n" "homer-protocol = %d\nhomer-id = %d\nno-fallback = %d\nport-min = %d\nport-max = %d\nredis = %s:%d/%d\n" "redis-write = %s:%d/%d\nno-redis-required = %d\nnum-threads = %d\nxmlrpc-format = %d\nlog_format = %d\n" - "redis_allowed_errors = %d\nredis_disable_time = %d\nredis_cmd_timeout = %d\nredis_connect_timeout = %d\n", + "redis_allowed_errors = %d\nredis_disable_time = %d\nredis_cmd_timeout = %d\nredis_connect_timeout = %d\n" + "max-cpu = %.1f\n" + "max-load = %.2f\n", initial_rtpe_config.kernel_table, initial_rtpe_config.max_sessions, initial_rtpe_config.timeout, initial_rtpe_config.silent_timeout, initial_rtpe_config.final_timeout, initial_rtpe_config.offer_timeout, initial_rtpe_config.delete_delay, @@ -230,7 +240,9 @@ static void cli_incoming_params_start(str *instr, struct streambuf *replybuffer) sockaddr_print_buf(&initial_rtpe_config.redis_write_ep.address), initial_rtpe_config.redis_write_ep.port, initial_rtpe_config.redis_write_db, initial_rtpe_config.no_redis_required, initial_rtpe_config.num_threads, initial_rtpe_config.fmt, initial_rtpe_config.log_format, initial_rtpe_config.redis_allowed_errors, - initial_rtpe_config.redis_disable_time, initial_rtpe_config.redis_cmd_timeout, initial_rtpe_config.redis_connect_timeout); + initial_rtpe_config.redis_disable_time, initial_rtpe_config.redis_cmd_timeout, initial_rtpe_config.redis_connect_timeout, + (double) initial_rtpe_config.cpu_limit / 100, + (double) initial_rtpe_config.load_limit / 100); for(s = initial_rtpe_config.interfaces.head; s ; s = s->next) { ifa = s->data; @@ -263,7 +275,9 @@ static void cli_incoming_params_current(str *instr, struct streambuf *replybuffe "delete-delay = %d\nredis-expires = %d\ntos = %d\ncontrol-tos = %d\ngraphite-interval = %d\nredis-num-threads = %d\n" "homer-protocol = %d\nhomer-id = %d\nno-fallback = %d\nport-min = %d\nport-max = %d\nredis-db = %d\n" "redis-write-db = %d\nno-redis-required = %d\nnum-threads = %d\nxmlrpc-format = %d\nlog_format = %d\n" - "redis_allowed_errors = %d\nredis_disable_time = %d\nredis_cmd_timeout = %d\nredis_connect_timeout = %d\n", + "redis_allowed_errors = %d\nredis_disable_time = %d\nredis_cmd_timeout = %d\nredis_connect_timeout = %d\n" + "max-cpu = %.1f\n" + "max-load = %.2f\n", rtpe_config.kernel_table, rtpe_config.max_sessions, rtpe_config.timeout, rtpe_config.silent_timeout, rtpe_config.final_timeout, rtpe_config.offer_timeout, @@ -272,7 +286,9 @@ static void cli_incoming_params_current(str *instr, struct streambuf *replybuffe rtpe_config.homer_id, rtpe_config.no_fallback, rtpe_config.port_min, rtpe_config.port_max, rtpe_config.redis_db, rtpe_config.redis_write_db, rtpe_config.no_redis_required, rtpe_config.num_threads, rtpe_config.fmt, rtpe_config.log_format, rtpe_config.redis_allowed_errors, - rtpe_config.redis_disable_time, rtpe_config.redis_cmd_timeout, rtpe_config.redis_connect_timeout); + rtpe_config.redis_disable_time, rtpe_config.redis_cmd_timeout, rtpe_config.redis_connect_timeout, + (double) rtpe_config.cpu_limit / 100, + (double) rtpe_config.load_limit / 100); for(c = rtpe_config.interfaces.head; c ; c = c->next) { ifa = c->data; @@ -320,6 +336,8 @@ static void cli_incoming_params_diff(str *instr, struct streambuf *replybuffer) int_diff_print(initial_rtpe_config.kernel_table, rtpe_config.kernel_table, "table", replybuffer); int_diff_print(initial_rtpe_config.max_sessions, rtpe_config.max_sessions, "max-sessions", replybuffer); + int_diff_print(initial_rtpe_config.cpu_limit, rtpe_config.cpu_limit, "max-cpu", replybuffer); + int_diff_print(initial_rtpe_config.load_limit, rtpe_config.load_limit, "max-load", replybuffer); int_diff_print(initial_rtpe_config.timeout, rtpe_config.timeout, "timeout", replybuffer); int_diff_print(initial_rtpe_config.silent_timeout, rtpe_config.silent_timeout, "silent-timeout", replybuffer); int_diff_print(initial_rtpe_config.final_timeout, rtpe_config.final_timeout, "final-timeout", replybuffer); @@ -505,6 +523,18 @@ static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuf return ; } +static void cli_incoming_list_maxcpu(str *instr, struct streambuf *replybuffer) { + /* don't lock anything while reading the value */ + streambuf_printf(replybuffer, "Maximum CPU usage configured on rtpengine: %.1f\n", (double) rtpe_config.cpu_limit / 100.0); + + return ; +} +static void cli_incoming_list_maxload(str *instr, struct streambuf *replybuffer) { + /* don't lock anything while reading the value */ + streambuf_printf(replybuffer, "Maximum load average configured on rtpengine: %.2f\n", (double) rtpe_config.load_limit / 100.0); + + return ; +} static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer) { struct rlimit rlim; @@ -807,6 +837,61 @@ static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuff return; } +// XXX lots of code duplication, unify those set functions +static void cli_incoming_set_maxcpu(str *instr, struct streambuf *replybuffer) { + char *endptr; + + if (str_shift(instr, 1)) { + streambuf_printf(replybuffer, "%s\n", "More parameters required."); + return; + } + + errno = 0; + double num = strtod(instr->s, &endptr); + + if ((errno == ERANGE && (num == HUGE_VAL || num == -HUGE_VAL)) || (errno != 0 && num == 0) || isnan(num) || !isfinite(num)) { + streambuf_printf(replybuffer, "Fail setting maxcpu to %s; errno=%d\n", instr->s, errno); + return; + } else if (endptr == instr->s) { + streambuf_printf(replybuffer, "Fail setting maxcpu to %s; no digists found\n", instr->s); + return; + } else { + rwlock_lock_w(&rtpe_config.config_lock); + rtpe_config.cpu_limit = num * 100; + rwlock_unlock_w(&rtpe_config.config_lock); + streambuf_printf(replybuffer, "Success setting maxcpu to %.1f\n", num); + } + + return; +} + +static void cli_incoming_set_maxload(str *instr, struct streambuf *replybuffer) { + char *endptr; + + if (str_shift(instr, 1)) { + streambuf_printf(replybuffer, "%s\n", "More parameters required."); + return; + } + + errno = 0; + double num = strtod(instr->s, &endptr); + + if ((errno == ERANGE && (num == HUGE_VAL || num == -HUGE_VAL)) || (errno != 0 && num == 0) || isnan(num) || !isfinite(num)) { + streambuf_printf(replybuffer, "Fail setting maxload to %s; errno=%d\n", instr->s, errno); + return; + } else if (endptr == instr->s) { + streambuf_printf(replybuffer, "Fail setting maxload to %s; no digists found\n", instr->s); + return; + } else { + rwlock_lock_w(&rtpe_config.config_lock); + rtpe_config.load_limit = num * 100; + rwlock_unlock_w(&rtpe_config.config_lock); + streambuf_printf(replybuffer, "Success setting maxload to %.2f\n", num); + } + + return; +} + static void cli_incoming_set_gentimeout(str *instr, struct streambuf *replybuffer, int *conf_timeout) { long timeout_num; char *endptr; diff --git a/daemon/control_ng.c b/daemon/control_ng.c index aabc43b83..14a5fac2a 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -22,6 +22,8 @@ struct control_ng *rtpe_control_ng; const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = { [LOAD_LIMIT_MAX_SESSIONS] = "Parallel session limit reached", + [LOAD_LIMIT_CPU] = "CPU usage limit exceeded", + [LOAD_LIMIT_LOAD] = "Load limit exceeded", }; diff --git a/daemon/control_ng.h b/daemon/control_ng.h index deea2928a..9e5e3a54d 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -38,6 +38,8 @@ extern struct control_ng *rtpe_control_ng; enum load_limit_reasons { LOAD_LIMIT_NONE = -1, LOAD_LIMIT_MAX_SESSIONS = 0, + LOAD_LIMIT_CPU, + LOAD_LIMIT_LOAD, __LOAD_LIMIT_MAX }; diff --git a/daemon/load.c b/daemon/load.c new file mode 100644 index 000000000..a7afe2fd5 --- /dev/null +++ b/daemon/load.c @@ -0,0 +1,53 @@ +#include "load.h" +#include +#include +#include +#include +#include +#include +#include "aux.h" +#include "log.h" +#include "main.h" + +int load_average; // times 100 +int cpu_usage; // percent times 100 (0 - 9999) + +static long used_last, idle_last; + +void load_thread(void *dummy) { + while (!rtpe_shutdown) { + if (rtpe_config.load_limit) { + double loadavg; + if (getloadavg(&loadavg, 1) >= 1) + g_atomic_int_set(&load_average, (int) (loadavg * 100.0)); + else + ilog(LOG_WARN, "Failed to obtain load average: %s", strerror(errno)); + } + + if (rtpe_config.cpu_limit) { + FILE *f; + f = fopen("/proc/stat", "r"); + if (f) { + long user_now, nice_now, system_now, idle_now; + if (fscanf(f, "cpu %li %li %li %li", + &user_now, &nice_now, &system_now, &idle_now) == 4) + { + long used_now = user_now + nice_now + system_now; + long used_secs = used_now - used_last; + long idle_secs = idle_now - idle_last; + long total_secs = used_secs + idle_secs; + if (used_last && idle_last) + g_atomic_int_set(&cpu_usage, (int) (used_secs + * 10000 / total_secs)); + used_last = used_now; + idle_last = idle_now; + } + else + ilog(LOG_WARN, "Failed to obtain CPU usage"); + fclose(f); + } + } + + usleep(500000); + } +} diff --git a/daemon/load.h b/daemon/load.h new file mode 100644 index 000000000..d11be7756 --- /dev/null +++ b/daemon/load.h @@ -0,0 +1,9 @@ +#ifndef _LOAD_H_ +#define _LOAD_H_ + +extern int load_average; // times 100 +extern int cpu_usage; // times 100 + +void load_thread(void *); + +#endif diff --git a/daemon/main.c b/daemon/main.c index 86faba62f..08113e5a9 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -39,6 +39,7 @@ #include "statistics.h" #include "graphite.h" #include "codeclib.h" +#include "load.h" @@ -249,6 +250,8 @@ static void options(int *argc, char ***argv) { char *homerproto = NULL; char *endptr; int codecs = 0; + double max_load = 0; + double max_cpu = 0; GOptionEntry e[] = { { "table", 't', 0, G_OPTION_ARG_INT, &rtpe_config.kernel_table, "Kernel table to use", "INT" }, @@ -289,6 +292,8 @@ static void options(int *argc, char ***argv) { { "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL }, { "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL }, { "max-sessions", 0, 0, G_OPTION_ARG_INT, &rtpe_config.max_sessions, "Limit of maximum number of sessions", "INT" }, + { "max-load", 0, 0, G_OPTION_ARG_DOUBLE, &max_load, "Reject new sessions if load averages exceeds this value", "FLOAT" }, + { "max-cpu", 0, 0, G_OPTION_ARG_DOUBLE, &max_cpu, "Reject new sessions if CPU usage (in percent) exceeds this value", "FLOAT" }, { "homer", 0, 0, G_OPTION_ARG_STRING, &homerp, "Address of Homer server for RTCP stats","IP46|HOSTNAME:PORT"}, { "homer-protocol",0,0,G_OPTION_ARG_STRING, &homerproto, "Transport protocol for Homer (default udp)", "udp|tcp" }, { "homer-id", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.homer_id, "'Capture ID' to use within the HEP protocol", "INT" }, @@ -433,6 +438,9 @@ static void options(int *argc, char ***argv) { if (!sip_source) trust_address_def = 1; + + rtpe_config.cpu_limit = max_cpu * 100; + rtpe_config.load_limit = max_load * 100; } void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { @@ -455,6 +463,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->kernel_table = rtpe_config.kernel_table; ini_rtpe_cfg->max_sessions = rtpe_config.max_sessions; + ini_rtpe_cfg->cpu_limit = rtpe_config.cpu_limit; + ini_rtpe_cfg->load_limit = rtpe_config.load_limit; ini_rtpe_cfg->timeout = rtpe_config.timeout; ini_rtpe_cfg->silent_timeout = rtpe_config.silent_timeout; ini_rtpe_cfg->offer_timeout = rtpe_config.offer_timeout; @@ -714,6 +724,7 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(poller_timer_loop, rtpe_poller); + thread_create_detach(load_thread, NULL); if (!is_addr_unspecified(&rtpe_config.redis_ep.address)) thread_create_detach(redis_notify_loop, NULL); diff --git a/daemon/main.h b/daemon/main.h index 5f7089f07..f82ed3f64 100644 --- a/daemon/main.h +++ b/daemon/main.h @@ -68,6 +68,8 @@ struct rtpengine_config { char *rec_method; char *rec_format; char *iptables_chain; + int load_limit; + int cpu_limit; };