diff --git a/daemon/call.c b/daemon/call.c index f34e30072..3603ec640 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -176,7 +176,6 @@ static void call_timer_iterator(gpointer data, gpointer user_data) { struct call *c = data; struct iterator_helper *hlp = user_data; GList *it; - struct callmaster *cm; unsigned int check; int good = 0; struct packet_stream *ps; @@ -189,11 +188,10 @@ static void call_timer_iterator(gpointer data, gpointer user_data) { rwlock_lock_r(&c->master_lock); log_info_call(c); - cm = c->callmaster; - rwlock_lock_r(&cm->conf.config_lock); + rwlock_lock_r(&rtpe_config.config_lock); // final timeout applicable to all calls (own and foreign) - if (cm->conf.final_timeout && rtpe_now.tv_sec >= (c->created.tv_sec + cm->conf.final_timeout)) { + if (rtpe_config.final_timeout && rtpe_now.tv_sec >= (c->created.tv_sec + rtpe_config.final_timeout)) { ilog(LOG_INFO, "Closing call due to final timeout"); tmp_t_reason = FINAL_TIMEOUT; for (it = c->monologues.head; it; it = it->next) { @@ -248,10 +246,10 @@ no_sfd: if (good) goto next; - check = cm->conf.timeout; + check = rtpe_config.timeout; tmp_t_reason = TIMEOUT; if (!MEDIA_ISSET(ps->media, RECV) || !sfd || !PS_ISSET(ps, FILLED)) { - check = cm->conf.silent_timeout; + check = rtpe_config.silent_timeout; tmp_t_reason = SILENT_TIMEOUT; } @@ -286,7 +284,7 @@ delete: goto out; out: - rwlock_unlock_r(&cm->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); rwlock_unlock_r(&c->master_lock); log_info_clear(); } @@ -397,7 +395,7 @@ void kill_calls_timer(GSList *list, struct callmaster *m) { return; /* if m is NULL, it's the scheduled deletions, otherwise it's the timeouts */ - url = m ? m->conf.b2b_url : NULL; + url = m ? rtpe_config.b2b_url : NULL; if (url) { xh = g_slice_alloc(sizeof(*xh)); xh->c = g_string_chunk_new(64); @@ -410,7 +408,7 @@ void kill_calls_timer(GSList *list, struct callmaster *m) { else url_suffix = g_string_chunk_insert(xh->c, url); xh->tags_urls = NULL; - xh->fmt = m->conf.fmt; + xh->fmt = rtpe_config.fmt; } while (list) { @@ -429,7 +427,7 @@ void kill_calls_timer(GSList *list, struct callmaster *m) { else snprintf(url_buf, sizeof(url_buf), "%s", url_suffix); - switch (m->conf.fmt) { + switch (rtpe_config.fmt) { case XF_SEMS: for (csl = ca->monologues.head; csl; csl = csl->next) { cm = csl->data; @@ -1343,7 +1341,7 @@ static void __tos_change(struct call *call, const struct sdp_ng_flags *flags) { return; if (!flags || flags->tos > 255) - new_tos = call->callmaster->conf.default_tos; + new_tos = rtpe_config.default_tos; else new_tos = flags->tos; @@ -2017,7 +2015,7 @@ static struct call *call_create(const str *callid, struct callmaster *m) { call_str_cpy(c, &c->callid, callid); c->created = rtpe_now; c->dtls_cert = dtls_cert(); - c->tos = m->conf.default_tos; + c->tos = rtpe_config.default_tos; c->ssrc_hash = create_ssrc_hash(); return c; @@ -2363,7 +2361,7 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc GList *i; if (delete_delay < 0) - delete_delay = m->conf.delete_delay; + delete_delay = rtpe_config.delete_delay; c = call_get(callid, m); if (!c) { diff --git a/daemon/call.h b/daemon/call.h index ca77b17f8..b11ad5452 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -61,11 +61,6 @@ enum transport_protocol_index { __PROTO_LAST, }; -enum xmlrpc_format { - XF_SEMS = 0, - XF_CALLID, -}; - enum call_stream_state { CSS_UNKNOWN = 0, CSS_SHUTDOWN, @@ -388,29 +383,12 @@ struct call { }; struct callmaster_config { - /* everything below protected by config_lock */ - rwlock_t config_lock; - int max_sessions; - unsigned int timeout; - unsigned int silent_timeout; - unsigned int final_timeout; - - unsigned int delete_delay; struct redis *redis; struct redis *redis_write; struct redis *redis_notify; - struct event_base *redis_notify_event_base; - GQueue *redis_subscribed_keyspaces; + + struct event_base *redis_notify_event_base; struct redisAsyncContext *redis_notify_async_context; - unsigned int redis_expires_secs; - char *b2b_url; - unsigned char default_tos; - unsigned char control_tos; - enum xmlrpc_format fmt; - endpoint_t graphite_ep; - int graphite_interval; - - int redis_num_threads; }; struct callmaster { diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index a6334df3d..6eecb0fc0 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -24,6 +24,7 @@ #include "ssrc.h" #include "tcp_listener.h" #include "streambuf.h" +#include "main.h" static pcre *info_re; @@ -396,12 +397,12 @@ str *call_query_udp(char **out, struct callmaster *m) { rwlock_unlock_w(&c->master_lock); - rwlock_lock_r(&m->conf.config_lock); + rwlock_lock_r(&rtpe_config.config_lock); ret = str_sprintf("%s %lld "UINT64F" "UINT64F" "UINT64F" "UINT64F"\n", out[RE_UDP_COOKIE], - (long long int) m->conf.silent_timeout - (rtpe_now.tv_sec - stats.last_packet), + (long long int) rtpe_config.silent_timeout - (rtpe_now.tv_sec - stats.last_packet), atomic64_get_na(&stats.totals[0].packets), atomic64_get_na(&stats.totals[1].packets), atomic64_get_na(&stats.totals[2].packets), atomic64_get_na(&stats.totals[3].packets)); - rwlock_unlock_r(&m->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); goto out; err: @@ -814,25 +815,25 @@ out: const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output, const char* addr, const endpoint_t *sin) { - rwlock_lock_r(&m->conf.config_lock); - if (m->conf.max_sessions>=0) { + rwlock_lock_r(&rtpe_config.config_lock); + if (rtpe_config.max_sessions>=0) { rwlock_lock_r(&rtpe_callhash_lock); if (g_hash_table_size(rtpe_callhash) - - atomic64_get(&rtpe_stats.foreign_sessions) >= m->conf.max_sessions) { + atomic64_get(&rtpe_stats.foreign_sessions) >= rtpe_config.max_sessions) { rwlock_unlock_r(&rtpe_callhash_lock); /* foreign calls can't get rejected * total_rejected_sess applies only to "own" sessions */ atomic64_inc(&rtpe_totalstats.total_rejected_sess); atomic64_inc(&rtpe_totalstats_interval.total_rejected_sess); - ilog(LOG_ERROR, "Parallel session limit reached (%i)",m->conf.max_sessions); + ilog(LOG_ERROR, "Parallel session limit reached (%i)",rtpe_config.max_sessions); - rwlock_unlock_r(&m->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); return "Parallel session limit reached"; } rwlock_unlock_r(&rtpe_callhash_lock); } - rwlock_unlock_r(&m->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); return call_offer_answer_ng(input, m, output, OP_OFFER, addr, sin); } diff --git a/daemon/cli.c b/daemon/cli.c index dec6102bb..b6a82397f 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -23,6 +23,7 @@ #include "tcp_listener.h" #include "str.h" #include "statistics.h" +#include "main.h" #include "rtpengine_config.h" @@ -263,7 +264,7 @@ static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, stru static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", m->conf.max_sessions); + streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions); return ; } @@ -287,14 +288,14 @@ static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, str } static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - rwlock_lock_r(&m->conf.config_lock); + rwlock_lock_r(&rtpe_config.config_lock); /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "TIMEOUT=%u\n", m->conf.timeout); - streambuf_printf(replybuffer, "SILENT_TIMEOUT=%u\n", m->conf.silent_timeout); - streambuf_printf(replybuffer, "FINAL_TIMEOUT=%u\n", m->conf.final_timeout); + streambuf_printf(replybuffer, "TIMEOUT=%u\n", rtpe_config.timeout); + streambuf_printf(replybuffer, "SILENT_TIMEOUT=%u\n", rtpe_config.silent_timeout); + streambuf_printf(replybuffer, "FINAL_TIMEOUT=%u\n", rtpe_config.final_timeout); - rwlock_unlock_r(&m->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); return ; } @@ -525,22 +526,22 @@ static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struc } else if (maxsessions_num < disabled) { streambuf_printf(replybuffer, "Fail setting maxsessions to %ld; either positive or -1 values allowed\n", maxsessions_num); } else if (maxsessions_num == disabled) { - rwlock_lock_w(&m->conf.config_lock); - m->conf.max_sessions = maxsessions_num; - rwlock_unlock_w(&m->conf.config_lock); + rwlock_lock_w(&rtpe_config.config_lock); + rtpe_config.max_sessions = maxsessions_num; + rwlock_unlock_w(&rtpe_config.config_lock); streambuf_printf(replybuffer, "Success setting maxsessions to %ld; disable feature\n", maxsessions_num); } else { - rwlock_lock_w(&m->conf.config_lock); - m->conf.max_sessions = maxsessions_num; - rwlock_unlock_w(&m->conf.config_lock); + rwlock_lock_w(&rtpe_config.config_lock); + rtpe_config.max_sessions = maxsessions_num; + rwlock_unlock_w(&rtpe_config.config_lock); streambuf_printf(replybuffer, "Success setting maxsessions to %ld\n", maxsessions_num); } return; } -static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer, unsigned int *conf_timeout) { - unsigned long timeout_num; +static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer, int *conf_timeout) { + long timeout_num; char *endptr; if (str_shift(instr, 1)) { @@ -548,31 +549,30 @@ static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct return; } - timeout_num = strtoul(instr->s, &endptr, 10); + timeout_num = strtol(instr->s, &endptr, 10); - if ((errno == ERANGE && (timeout_num == ULONG_MAX)) || (errno != 0 && timeout_num == 0)) { + if ((errno == ERANGE && (timeout_num == ULONG_MAX)) || (errno != 0 && timeout_num == 0) || timeout_num < 0 || timeout_num >= INT_MAX) { streambuf_printf(replybuffer, "Fail setting timeout to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { streambuf_printf(replybuffer, "Fail setting timeout to %s; no digists found\n", instr->s); return; } else { - /* don't lock anything while writing the value - only this command modifies its value */ - rwlock_lock_w(&m->conf.config_lock); - *conf_timeout = timeout_num; - rwlock_unlock_w(&m->conf.config_lock); + rwlock_lock_w(&rtpe_config.config_lock); + *conf_timeout = (int) timeout_num; + rwlock_unlock_w(&rtpe_config.config_lock); streambuf_printf(replybuffer, "Success setting timeout to %lu\n", timeout_num); } } static void cli_incoming_set_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, m, replybuffer, &m->conf.timeout); + cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.timeout); } static void cli_incoming_set_silenttimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, m, replybuffer, &m->conf.silent_timeout); + cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.silent_timeout); } static void cli_incoming_set_finaltimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, m, replybuffer, &m->conf.final_timeout); + cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.final_timeout); } static void cli_incoming_list(str *instr, struct callmaster* m, struct streambuf *replybuffer) { @@ -681,15 +681,15 @@ static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambu } else if (endptr == instr->s) { streambuf_printf(replybuffer, "Fail adding keyspace %s to redis notifications; no digists found\n", instr->s); } else { - rwlock_lock_w(&m->conf.config_lock); - if (!g_queue_find(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) { - g_queue_push_tail(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); + rwlock_lock_w(&rtpe_config.config_lock); + if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) { + g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db); streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db); } else { streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db); } - rwlock_unlock_w(&m->conf.config_lock); + rwlock_unlock_w(&rtpe_config.config_lock); } } @@ -705,15 +705,15 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf uint_keyspace_db = strtoul(instr->s, &endptr, 10); - rwlock_lock_w(&m->conf.config_lock); + rwlock_lock_w(&rtpe_config.config_lock); if ((errno == ERANGE && (uint_keyspace_db == ULONG_MAX)) || (errno != 0 && uint_keyspace_db == 0)) { streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; errono=%d\n", instr->s, errno); } else if (endptr == instr->s) { streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s); - } else if ((l = g_queue_find(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) { + } else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) { // remove this keyspace redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); - g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data); + g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data); streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db); // destroy foreign calls for this keyspace @@ -724,7 +724,7 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf } else { streambuf_printf(replybuffer, "Keyspace %lu is not among redis notifications.\n", uint_keyspace_db); } - rwlock_unlock_w(&m->conf.config_lock); + rwlock_unlock_w(&rtpe_config.config_lock); } @@ -733,11 +733,11 @@ static void cli_incoming_kslist(str *instr, struct callmaster* m, struct streamb streambuf_printf(replybuffer, "\nSubscribed-on keyspaces:\n"); - rwlock_lock_r(&m->conf.config_lock); - for (l = m->conf.redis_subscribed_keyspaces->head; l; l = l->next) { + rwlock_lock_r(&rtpe_config.config_lock); + for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { streambuf_printf(replybuffer, "%u ", GPOINTER_TO_UINT(l->data)); } - rwlock_unlock_r(&m->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); streambuf_printf(replybuffer, "\n"); } diff --git a/daemon/graphite.c b/daemon/graphite.c index 3699c6a0e..af6141a5a 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -21,6 +21,7 @@ #include "graphite.h" #include "socket.h" #include "statistics.h" +#include "main.h" struct timeval rtpe_latest_graphite_interval_start; @@ -334,13 +335,13 @@ void graphite_loop(void *d) { return ; } - if (cm->conf.graphite_interval <= 0) { + if (rtpe_config.graphite_interval <= 0) { ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); - cm->conf.graphite_interval=1; + rtpe_config.graphite_interval=1; } - connect_to_graphite_server(&cm->conf.graphite_ep); + connect_to_graphite_server(&rtpe_config.graphite_ep); while (!rtpe_shutdown) - graphite_loop_run(cm, &cm->conf.graphite_ep, cm->conf.graphite_interval); // time in seconds + graphite_loop_run(cm, &rtpe_config.graphite_ep, rtpe_config.graphite_interval); // time in seconds } diff --git a/daemon/main.c b/daemon/main.c index 9689a9dd9..8d4797312 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -50,40 +50,36 @@ struct poller *rtpe_poller; +struct rtpengine_config rtpe_config = { + // non-zero defaults + .kernel_table = -1, + .max_sessions = -1, + .delete_delay = 30, + .redis_subscribed_keyspaces = G_QUEUE_INIT, + .redis_expires_secs = 86400, +}; + + + static GQueue interfaces = G_QUEUE_INIT; -static GQueue keyspaces = G_QUEUE_INIT; static endpoint_t tcp_listen_ep; static endpoint_t udp_listen_ep; static endpoint_t ng_listen_ep; static endpoint_t cli_listen_ep; -static endpoint_t graphite_ep; static endpoint_t redis_ep; static endpoint_t redis_write_ep; static endpoint_t homer_ep; static int homer_protocol = SOCK_DGRAM; static int homer_id = 2001; -static int tos; -static int control_tos; -static int table = -1; static int no_fallback; -static unsigned int timeout; -static unsigned int silent_timeout; -static unsigned int final_timeout; -static unsigned int redis_expires = 86400; static int port_min = 30000; static int port_max = 40000; -static int max_sessions = -1; static int redis_db = -1; static int redis_write_db = -1; -static int redis_num_threads; static int no_redis_required; static char *redis_auth; static char *redis_write_auth; -static char *b2b_url; -static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static int num_threads; -static int delete_delay = 30; -static int graphite_interval = 0; static char *spooldir; static char *rec_method = "pcap"; static char *rec_format = "raw"; @@ -266,7 +262,7 @@ static void options(int *argc, char ***argv) { char *endptr; GOptionEntry e[] = { - { "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" }, + { "table", 't', 0, G_OPTION_ARG_INT, &rtpe_config.kernel_table, "Kernel table to use", "INT" }, { "no-fallback",'F', 0, G_OPTION_ARG_NONE, &no_fallback, "Only start when kernel module is available", NULL }, { "interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY,&if_a, "Local interface for RTP", "[NAME/]IP[!IP]"}, { "subscribe-keyspace", 'k', 0, G_OPTION_ARG_STRING_ARRAY,&ks_a, "Subscription keyspace list", "INT INT ..."}, @@ -275,29 +271,29 @@ static void options(int *argc, char ***argv) { { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46|HOSTNAME:]PORT" }, { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46|HOSTNAME:PORT" }, - { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, + { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &rtpe_config.graphite_interval, "Graphite send interval in seconds", "INT" }, { "graphite-prefix",0, 0, G_OPTION_ARG_STRING, &graphite_prefix_s, "Prefix for graphite line", "STRING"}, - { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, - { "control-tos",0 , 0, G_OPTION_ARG_INT, &control_tos, "Default TOS value to set on control-ng", "INT" }, - { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, - { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, - { "final-timeout",'a',0,G_OPTION_ARG_INT, &final_timeout, "Call timeout", "SECS" }, + { "tos", 'T', 0, G_OPTION_ARG_INT, &rtpe_config.default_tos, "Default TOS value to set on streams", "INT" }, + { "control-tos",0 , 0, G_OPTION_ARG_INT, &rtpe_config.control_tos, "Default TOS value to set on control-ng", "INT" }, + { "timeout", 'o', 0, G_OPTION_ARG_INT, &rtpe_config.timeout, "RTP timeout", "SECS" }, + { "silent-timeout",'s',0,G_OPTION_ARG_INT, &rtpe_config.silent_timeout,"RTP timeout for muted", "SECS" }, + { "final-timeout",'a',0,G_OPTION_ARG_INT, &rtpe_config.final_timeout, "Call timeout", "SECS" }, { "port-min", 'm', 0, G_OPTION_ARG_INT, &port_min, "Lowest port to use for RTP", "INT" }, { "port-max", 'M', 0, G_OPTION_ARG_INT, &port_max, "Highest port to use for RTP", "INT" }, { "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "[PW@]IP:PORT/INT" }, { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, - { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, - { "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" }, + { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_num_threads, "Number of Redis restore threads", "INT" }, + { "redis-expires", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_expires_secs, "Expire time in seconds for redis keys", "INT" }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, - { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, + { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &rtpe_config.b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"}, { "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"}, - { "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &xmlrpc_fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" }, + { "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &rtpe_config.fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads to create", "INT" }, - { "delete-delay", 'd', 0, G_OPTION_ARG_INT, &delete_delay, "Delay for deleting a session from memory.", "INT" }, + { "delete-delay", 'd', 0, G_OPTION_ARG_INT, &rtpe_config.delete_delay, "Delay for deleting a session from memory.", "INT" }, { "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL }, { "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL }, - { "max-sessions", 0, 0, G_OPTION_ARG_INT, &max_sessions, "Limit of maximum number of sessions", "INT" }, + { "max-sessions", 0, 0, G_OPTION_ARG_INT, &rtpe_config.max_sessions, "Limit of maximum number of sessions", "INT" }, { "homer", 0, 0, G_OPTION_ARG_STRING, &homerp, "Address of Homer server for RTCP stats","IP46|HOSTNAME:PORT"}, { "homer-protocol",0,0,G_OPTION_ARG_STRING, &homerproto, "Transport protocol for Homer (default udp)", "udp|tcp" }, { "homer-id", 0, 0, G_OPTION_ARG_STRING, &homer_id, "'Capture ID' to use within the HEP protocol", "INT" }, @@ -337,7 +333,7 @@ static void options(int *argc, char ***argv) { } else if (endptr == str_keyspace_db.s) { ilog(LOG_ERR, "Fail adding keyspace %.*s to redis notifications; no digits found\n", str_keyspace_db.len, str_keyspace_db.s); } else { - g_queue_push_tail(&keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); + g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); } } } @@ -359,7 +355,7 @@ static void options(int *argc, char ***argv) { die("Invalid IP or port (--listen-cli)"); } - if (graphitep) {if (endpoint_parse_any_getaddrinfo_full(&graphite_ep, graphitep)) + if (graphitep) {if (endpoint_parse_any_getaddrinfo_full(&rtpe_config.graphite_ep, graphitep)) die("Invalid IP or port (--graphite)"); } @@ -379,20 +375,20 @@ static void options(int *argc, char ***argv) { die("Invalid protocol (--homer-protocol)"); } - if (tos < 0 || tos > 255) + if (rtpe_config.default_tos < 0 || rtpe_config.default_tos > 255) die("Invalid TOS value"); - if (control_tos < 0 || control_tos > 255) + if (rtpe_config.control_tos < 0 || rtpe_config.control_tos > 255) die("Invalid control-ng TOS value"); - if (timeout <= 0) - timeout = 60; + if (rtpe_config.timeout <= 0) + rtpe_config.timeout = 60; - if (silent_timeout <= 0) - silent_timeout = 3600; + if (rtpe_config.silent_timeout <= 0) + rtpe_config.silent_timeout = 3600; - if (final_timeout <= 0) - final_timeout = 0; + if (rtpe_config.final_timeout <= 0) + rtpe_config.final_timeout = 0; if (redisps) if (redis_ep_parse(&redis_ep, &redis_db, &redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps)) @@ -403,7 +399,7 @@ static void options(int *argc, char ***argv) { "RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write)) die("Invalid Redis endpoint [IP:PORT/INT] (--redis-write)"); - if (xmlrpc_fmt > 1) + if (rtpe_config.fmt > 1) die("Invalid XMLRPC format"); if ((log_level < LOG_EMERG) || (log_level > LOG_DEBUG)) @@ -505,7 +501,6 @@ static void init_everything() { static void create_everything(struct main_context *ctx) { - struct callmaster_config mc; struct control_tcp *ct; struct control_udp *cu; struct control_ng *cn; @@ -514,9 +509,9 @@ static void create_everything(struct main_context *ctx) { struct timeval redis_start, redis_stop; double redis_diff = 0; - if (table < 0) + if (rtpe_config.kernel_table < 0) goto no_kernel; - if (kernel_setup_table(table)) { + if (kernel_setup_table(rtpe_config.kernel_table)) { if (no_fallback) { ilog(LOG_CRIT, "Userspace fallback disallowed - exiting"); exit(-1); @@ -535,33 +530,19 @@ no_kernel: dtls_timer(rtpe_poller); - ZERO(mc); - rwlock_init(&mc.config_lock); - if (max_sessions < -1) { - max_sessions = -1; + rwlock_init(&rtpe_config.config_lock); + if (rtpe_config.max_sessions < -1) { + rtpe_config.max_sessions = -1; } - mc.max_sessions = max_sessions; - mc.timeout = timeout; - mc.silent_timeout = silent_timeout; - mc.final_timeout = final_timeout; - mc.delete_delay = delete_delay; - mc.default_tos = tos; - mc.control_tos = control_tos; - mc.b2b_url = b2b_url; - mc.fmt = xmlrpc_fmt; - mc.graphite_ep = graphite_ep; - mc.graphite_interval = graphite_interval; - mc.redis_subscribed_keyspaces = g_queue_copy(&keyspaces); - - if (redis_num_threads < 1) { + + if (rtpe_config.redis_num_threads < 1) { #ifdef _SC_NPROCESSORS_ONLN - redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN ); + rtpe_config.redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN ); #endif - if (redis_num_threads < 1) { - redis_num_threads = REDIS_RESTORE_NUM_THREADS; + if (rtpe_config.redis_num_threads < 1) { + rtpe_config.redis_num_threads = REDIS_RESTORE_NUM_THREADS; } } - mc.redis_num_threads = redis_num_threads; ct = NULL; if (tcp_listen_ep.port) { @@ -581,7 +562,7 @@ no_kernel: cn = NULL; if (ng_listen_ep.port) { interfaces_exclude_port(ng_listen_ep.port); - cn = control_ng_new(rtpe_poller, &ng_listen_ep, ctx->m, control_tos); + cn = control_ng_new(rtpe_poller, &ng_listen_ep, ctx->m, rtpe_config.control_tos); if (!cn) die("Failed to open UDP control connection port"); } @@ -595,27 +576,23 @@ no_kernel: } if (!is_addr_unspecified(&redis_write_ep.address)) { - mc.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required); - if (!mc.redis_write) + ctx->m->conf.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required); + if (!ctx->m->conf.redis_write) die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.", endpoint_print_buf(&redis_write_ep)); } if (!is_addr_unspecified(&redis_ep.address)) { - mc.redis = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); - mc.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); - if (!mc.redis || !mc.redis_notify) + ctx->m->conf.redis = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + ctx->m->conf.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + if (!ctx->m->conf.redis || !ctx->m->conf.redis_notify) die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.", endpoint_print_buf(&redis_ep)); - if (!mc.redis_write) - mc.redis_write = mc.redis; + if (!ctx->m->conf.redis_write) + ctx->m->conf.redis_write = ctx->m->conf.redis; } - mc.redis_expires_secs = redis_expires; - - ctx->m->conf = mc; - daemonize(); wpidfile(); @@ -623,12 +600,12 @@ no_kernel: rtcp_init(); // must come after Homer init - if (mc.redis) { + if (ctx->m->conf.redis) { // start redis restore timer gettimeofday(&redis_start, NULL); // restore - if (redis_restore(ctx->m, mc.redis)) + if (redis_restore(ctx->m, ctx->m->conf.redis)) die("Refusing to continue without working Redis database"); // stop redis restore timer @@ -641,7 +618,7 @@ no_kernel: gettimeofday(&rtpe_latest_graphite_interval_start, NULL); - timeval_from_us(&tmp_tv, (long long) graphite_interval*1000000); + timeval_from_us(&tmp_tv, (long long) rtpe_config.graphite_interval*1000000); set_graphite_interval_tv(&tmp_tv); } @@ -663,7 +640,7 @@ int main(int argc, char **argv) { if (!is_addr_unspecified(&redis_ep.address)) thread_create_detach(redis_notify_loop, ctx.m); - if (!is_addr_unspecified(&graphite_ep.address)) + if (!is_addr_unspecified(&rtpe_config.graphite_ep.address)) thread_create_detach(graphite_loop, ctx.m); thread_create_detach(ice_thread_run, NULL); diff --git a/daemon/main.h b/daemon/main.h index 3ddba0c7a..55124f1cc 100644 --- a/daemon/main.h +++ b/daemon/main.h @@ -1,7 +1,43 @@ #ifndef _MAIN_H_ #define _MAIN_H_ + +#include "aux.h" +#include +#include "socket.h" + +enum xmlrpc_format { + XF_SEMS = 0, + XF_CALLID, +}; + +struct rtpengine_config { + /* everything below protected by config_lock */ + rwlock_t config_lock; + int kernel_table; + int max_sessions; + int timeout; + int silent_timeout; + int final_timeout; + int delete_delay; + GQueue redis_subscribed_keyspaces; + int redis_expires_secs; + char *b2b_url; + int default_tos; + int control_tos; + enum xmlrpc_format fmt; + endpoint_t graphite_ep; + int graphite_interval; + int redis_num_threads; +}; + + struct poller; extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer? + +extern struct rtpengine_config rtpe_config; + + + #endif diff --git a/daemon/redis.c b/daemon/redis.c index f3c258286..5649e099e 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -30,6 +30,7 @@ #include "rtplib.h" #include "str.h" #include "ssrc.h" +#include "main.h" INLINE redisReply *redis_expect(int type, redisReply *r) { @@ -543,11 +544,11 @@ static int redis_notify(struct callmaster *cm) { } // subscribe to the values in the configured keyspaces - rwlock_lock_r(&cm->conf.config_lock); - for (l = cm->conf.redis_subscribed_keyspaces->head; l; l = l->next) { + rwlock_lock_r(&rtpe_config.config_lock); + for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { redis_notify_subscribe_action(cm, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); } - rwlock_unlock_r(&cm->conf.config_lock); + rwlock_unlock_r(&rtpe_config.config_lock); // dispatch event base => thread blocks here if (event_base_dispatch(cm->conf.redis_notify_event_base) < 0) { @@ -1649,9 +1650,9 @@ int redis_restore(struct callmaster *m, struct redis *r) { ctx.m = m; mutex_init(&ctx.r_m); g_queue_init(&ctx.r_q); - for (i = 0; i < m->conf.redis_num_threads; i++) + for (i = 0; i < rtpe_config.redis_num_threads; i++) g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required)); - gtp = g_thread_pool_new(restore_thread, &ctx, m->conf.redis_num_threads, TRUE, NULL); + gtp = g_thread_pool_new(restore_thread, &ctx, rtpe_config.redis_num_threads, TRUE, NULL); for (i = 0; i < calls->elements; i++) { call = calls->element[i]; @@ -2073,7 +2074,7 @@ void redis_update_onekey(struct call *c, struct redis *r) { rwlock_lock_r(&c->master_lock); - redis_expires_s = c->callmaster->conf.redis_expires_secs; + redis_expires_s = rtpe_config.redis_expires_secs; c->redis_hosted_db = r->db; if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { diff --git a/daemon/statistics.c b/daemon/statistics.c index de788b554..74bfd04de 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -1,6 +1,8 @@ #include "call.h" #include "statistics.h" #include "graphite.h" +#include "main.h" + struct totalstats rtpe_totalstats; struct totalstats rtpe_totalstats_interval; @@ -108,7 +110,6 @@ void statistics_update_foreignown_inc(struct call* c) { } void statistics_update_oneway(struct call* c) { - struct callmaster *m; struct packet_stream *ps = NULL, *ps2 = NULL; struct call_monologue *ml; struct call_media *md; @@ -117,8 +118,6 @@ void statistics_update_oneway(struct call* c) { GList *l; struct timeval tim_result_duration; - m = c->callmaster; - // --- for statistics getting one way stream or no relay at all int total_nopacket_relayed_sess = 0; @@ -205,7 +204,7 @@ void statistics_update_oneway(struct call* c) { timeval_totalstats_interval_call_duration_add( &rtpe_totalstats_interval, &ml->started, &ml->terminated, &rtpe_latest_graphite_interval_start, - m->conf.graphite_interval); + rtpe_config.graphite_interval); } if (ml->term_reason==FINAL_TIMEOUT) {