Browse Source

move config options into global struct rtpengine_config

Change-Id: Ie566efb6a1b8bedbe33f768bc4cd979b2d2b46cc
pull/432/merge
Richard Fuchs 8 years ago
parent
commit
f2b93f9ef8
9 changed files with 165 additions and 174 deletions
  1. +11
    -13
      daemon/call.c
  2. +2
    -24
      daemon/call.h
  3. +10
    -9
      daemon/call_interfaces.c
  4. +34
    -34
      daemon/cli.c
  5. +5
    -4
      daemon/graphite.c
  6. +57
    -80
      daemon/main.c
  7. +36
    -0
      daemon/main.h
  8. +7
    -6
      daemon/redis.c
  9. +3
    -4
      daemon/statistics.c

+ 11
- 13
daemon/call.c View File

@ -176,7 +176,6 @@ static void call_timer_iterator(gpointer data, gpointer user_data) {
struct call *c = data;
struct iterator_helper *hlp = user_data;
GList *it;
struct callmaster *cm;
unsigned int check;
int good = 0;
struct packet_stream *ps;
@ -189,11 +188,10 @@ static void call_timer_iterator(gpointer data, gpointer user_data) {
rwlock_lock_r(&c->master_lock);
log_info_call(c);
cm = c->callmaster;
rwlock_lock_r(&cm->conf.config_lock);
rwlock_lock_r(&rtpe_config.config_lock);
// final timeout applicable to all calls (own and foreign)
if (cm->conf.final_timeout && rtpe_now.tv_sec >= (c->created.tv_sec + cm->conf.final_timeout)) {
if (rtpe_config.final_timeout && rtpe_now.tv_sec >= (c->created.tv_sec + rtpe_config.final_timeout)) {
ilog(LOG_INFO, "Closing call due to final timeout");
tmp_t_reason = FINAL_TIMEOUT;
for (it = c->monologues.head; it; it = it->next) {
@ -248,10 +246,10 @@ no_sfd:
if (good)
goto next;
check = cm->conf.timeout;
check = rtpe_config.timeout;
tmp_t_reason = TIMEOUT;
if (!MEDIA_ISSET(ps->media, RECV) || !sfd || !PS_ISSET(ps, FILLED)) {
check = cm->conf.silent_timeout;
check = rtpe_config.silent_timeout;
tmp_t_reason = SILENT_TIMEOUT;
}
@ -286,7 +284,7 @@ delete:
goto out;
out:
rwlock_unlock_r(&cm->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
rwlock_unlock_r(&c->master_lock);
log_info_clear();
}
@ -397,7 +395,7 @@ void kill_calls_timer(GSList *list, struct callmaster *m) {
return;
/* if m is NULL, it's the scheduled deletions, otherwise it's the timeouts */
url = m ? m->conf.b2b_url : NULL;
url = m ? rtpe_config.b2b_url : NULL;
if (url) {
xh = g_slice_alloc(sizeof(*xh));
xh->c = g_string_chunk_new(64);
@ -410,7 +408,7 @@ void kill_calls_timer(GSList *list, struct callmaster *m) {
else
url_suffix = g_string_chunk_insert(xh->c, url);
xh->tags_urls = NULL;
xh->fmt = m->conf.fmt;
xh->fmt = rtpe_config.fmt;
}
while (list) {
@ -429,7 +427,7 @@ void kill_calls_timer(GSList *list, struct callmaster *m) {
else
snprintf(url_buf, sizeof(url_buf), "%s", url_suffix);
switch (m->conf.fmt) {
switch (rtpe_config.fmt) {
case XF_SEMS:
for (csl = ca->monologues.head; csl; csl = csl->next) {
cm = csl->data;
@ -1343,7 +1341,7 @@ static void __tos_change(struct call *call, const struct sdp_ng_flags *flags) {
return;
if (!flags || flags->tos > 255)
new_tos = call->callmaster->conf.default_tos;
new_tos = rtpe_config.default_tos;
else
new_tos = flags->tos;
@ -2017,7 +2015,7 @@ static struct call *call_create(const str *callid, struct callmaster *m) {
call_str_cpy(c, &c->callid, callid);
c->created = rtpe_now;
c->dtls_cert = dtls_cert();
c->tos = m->conf.default_tos;
c->tos = rtpe_config.default_tos;
c->ssrc_hash = create_ssrc_hash();
return c;
@ -2363,7 +2361,7 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc
GList *i;
if (delete_delay < 0)
delete_delay = m->conf.delete_delay;
delete_delay = rtpe_config.delete_delay;
c = call_get(callid, m);
if (!c) {


+ 2
- 24
daemon/call.h View File

@ -61,11 +61,6 @@ enum transport_protocol_index {
__PROTO_LAST,
};
enum xmlrpc_format {
XF_SEMS = 0,
XF_CALLID,
};
enum call_stream_state {
CSS_UNKNOWN = 0,
CSS_SHUTDOWN,
@ -388,29 +383,12 @@ struct call {
};
struct callmaster_config {
/* everything below protected by config_lock */
rwlock_t config_lock;
int max_sessions;
unsigned int timeout;
unsigned int silent_timeout;
unsigned int final_timeout;
unsigned int delete_delay;
struct redis *redis;
struct redis *redis_write;
struct redis *redis_notify;
struct event_base *redis_notify_event_base;
GQueue *redis_subscribed_keyspaces;
struct event_base *redis_notify_event_base;
struct redisAsyncContext *redis_notify_async_context;
unsigned int redis_expires_secs;
char *b2b_url;
unsigned char default_tos;
unsigned char control_tos;
enum xmlrpc_format fmt;
endpoint_t graphite_ep;
int graphite_interval;
int redis_num_threads;
};
struct callmaster {


+ 10
- 9
daemon/call_interfaces.c View File

@ -24,6 +24,7 @@
#include "ssrc.h"
#include "tcp_listener.h"
#include "streambuf.h"
#include "main.h"
static pcre *info_re;
@ -396,12 +397,12 @@ str *call_query_udp(char **out, struct callmaster *m) {
rwlock_unlock_w(&c->master_lock);
rwlock_lock_r(&m->conf.config_lock);
rwlock_lock_r(&rtpe_config.config_lock);
ret = str_sprintf("%s %lld "UINT64F" "UINT64F" "UINT64F" "UINT64F"\n", out[RE_UDP_COOKIE],
(long long int) m->conf.silent_timeout - (rtpe_now.tv_sec - stats.last_packet),
(long long int) rtpe_config.silent_timeout - (rtpe_now.tv_sec - stats.last_packet),
atomic64_get_na(&stats.totals[0].packets), atomic64_get_na(&stats.totals[1].packets),
atomic64_get_na(&stats.totals[2].packets), atomic64_get_na(&stats.totals[3].packets));
rwlock_unlock_r(&m->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
goto out;
err:
@ -814,25 +815,25 @@ out:
const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output, const char* addr,
const endpoint_t *sin)
{
rwlock_lock_r(&m->conf.config_lock);
if (m->conf.max_sessions>=0) {
rwlock_lock_r(&rtpe_config.config_lock);
if (rtpe_config.max_sessions>=0) {
rwlock_lock_r(&rtpe_callhash_lock);
if (g_hash_table_size(rtpe_callhash) -
atomic64_get(&rtpe_stats.foreign_sessions) >= m->conf.max_sessions) {
atomic64_get(&rtpe_stats.foreign_sessions) >= rtpe_config.max_sessions) {
rwlock_unlock_r(&rtpe_callhash_lock);
/* foreign calls can't get rejected
* total_rejected_sess applies only to "own" sessions */
atomic64_inc(&rtpe_totalstats.total_rejected_sess);
atomic64_inc(&rtpe_totalstats_interval.total_rejected_sess);
ilog(LOG_ERROR, "Parallel session limit reached (%i)",m->conf.max_sessions);
ilog(LOG_ERROR, "Parallel session limit reached (%i)",rtpe_config.max_sessions);
rwlock_unlock_r(&m->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
return "Parallel session limit reached";
}
rwlock_unlock_r(&rtpe_callhash_lock);
}
rwlock_unlock_r(&m->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
return call_offer_answer_ng(input, m, output, OP_OFFER, addr, sin);
}


+ 34
- 34
daemon/cli.c View File

@ -23,6 +23,7 @@
#include "tcp_listener.h"
#include "str.h"
#include "statistics.h"
#include "main.h"
#include "rtpengine_config.h"
@ -263,7 +264,7 @@ static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, stru
static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) {
/* don't lock anything while reading the value */
streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", m->conf.max_sessions);
streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions);
return ;
}
@ -287,14 +288,14 @@ static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, str
}
static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) {
rwlock_lock_r(&m->conf.config_lock);
rwlock_lock_r(&rtpe_config.config_lock);
/* don't lock anything while reading the value */
streambuf_printf(replybuffer, "TIMEOUT=%u\n", m->conf.timeout);
streambuf_printf(replybuffer, "SILENT_TIMEOUT=%u\n", m->conf.silent_timeout);
streambuf_printf(replybuffer, "FINAL_TIMEOUT=%u\n", m->conf.final_timeout);
streambuf_printf(replybuffer, "TIMEOUT=%u\n", rtpe_config.timeout);
streambuf_printf(replybuffer, "SILENT_TIMEOUT=%u\n", rtpe_config.silent_timeout);
streambuf_printf(replybuffer, "FINAL_TIMEOUT=%u\n", rtpe_config.final_timeout);
rwlock_unlock_r(&m->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
return ;
}
@ -525,22 +526,22 @@ static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struc
} else if (maxsessions_num < disabled) {
streambuf_printf(replybuffer, "Fail setting maxsessions to %ld; either positive or -1 values allowed\n", maxsessions_num);
} else if (maxsessions_num == disabled) {
rwlock_lock_w(&m->conf.config_lock);
m->conf.max_sessions = maxsessions_num;
rwlock_unlock_w(&m->conf.config_lock);
rwlock_lock_w(&rtpe_config.config_lock);
rtpe_config.max_sessions = maxsessions_num;
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting maxsessions to %ld; disable feature\n", maxsessions_num);
} else {
rwlock_lock_w(&m->conf.config_lock);
m->conf.max_sessions = maxsessions_num;
rwlock_unlock_w(&m->conf.config_lock);
rwlock_lock_w(&rtpe_config.config_lock);
rtpe_config.max_sessions = maxsessions_num;
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting maxsessions to %ld\n", maxsessions_num);
}
return;
}
static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer, unsigned int *conf_timeout) {
unsigned long timeout_num;
static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer, int *conf_timeout) {
long timeout_num;
char *endptr;
if (str_shift(instr, 1)) {
@ -548,31 +549,30 @@ static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct
return;
}
timeout_num = strtoul(instr->s, &endptr, 10);
timeout_num = strtol(instr->s, &endptr, 10);
if ((errno == ERANGE && (timeout_num == ULONG_MAX)) || (errno != 0 && timeout_num == 0)) {
if ((errno == ERANGE && (timeout_num == ULONG_MAX)) || (errno != 0 && timeout_num == 0) || timeout_num < 0 || timeout_num >= INT_MAX) {
streambuf_printf(replybuffer, "Fail setting timeout to %s; errno=%d\n", instr->s, errno);
return;
} else if (endptr == instr->s) {
streambuf_printf(replybuffer, "Fail setting timeout to %s; no digists found\n", instr->s);
return;
} else {
/* don't lock anything while writing the value - only this command modifies its value */
rwlock_lock_w(&m->conf.config_lock);
*conf_timeout = timeout_num;
rwlock_unlock_w(&m->conf.config_lock);
rwlock_lock_w(&rtpe_config.config_lock);
*conf_timeout = (int) timeout_num;
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting timeout to %lu\n", timeout_num);
}
}
static void cli_incoming_set_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) {
cli_incoming_set_gentimeout(instr, m, replybuffer, &m->conf.timeout);
cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.timeout);
}
static void cli_incoming_set_silenttimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) {
cli_incoming_set_gentimeout(instr, m, replybuffer, &m->conf.silent_timeout);
cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.silent_timeout);
}
static void cli_incoming_set_finaltimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) {
cli_incoming_set_gentimeout(instr, m, replybuffer, &m->conf.final_timeout);
cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.final_timeout);
}
static void cli_incoming_list(str *instr, struct callmaster* m, struct streambuf *replybuffer) {
@ -681,15 +681,15 @@ static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambu
} else if (endptr == instr->s) {
streambuf_printf(replybuffer, "Fail adding keyspace %s to redis notifications; no digists found\n", instr->s);
} else {
rwlock_lock_w(&m->conf.config_lock);
if (!g_queue_find(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) {
g_queue_push_tail(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
rwlock_lock_w(&rtpe_config.config_lock);
if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) {
g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db);
streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db);
} else {
streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db);
}
rwlock_unlock_w(&m->conf.config_lock);
rwlock_unlock_w(&rtpe_config.config_lock);
}
}
@ -705,15 +705,15 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf
uint_keyspace_db = strtoul(instr->s, &endptr, 10);
rwlock_lock_w(&m->conf.config_lock);
rwlock_lock_w(&rtpe_config.config_lock);
if ((errno == ERANGE && (uint_keyspace_db == ULONG_MAX)) || (errno != 0 && uint_keyspace_db == 0)) {
streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; errono=%d\n", instr->s, errno);
} else if (endptr == instr->s) {
streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s);
} else if ((l = g_queue_find(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) {
} else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) {
// remove this keyspace
redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db);
g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data);
g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data);
streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db);
// destroy foreign calls for this keyspace
@ -724,7 +724,7 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf
} else {
streambuf_printf(replybuffer, "Keyspace %lu is not among redis notifications.\n", uint_keyspace_db);
}
rwlock_unlock_w(&m->conf.config_lock);
rwlock_unlock_w(&rtpe_config.config_lock);
}
@ -733,11 +733,11 @@ static void cli_incoming_kslist(str *instr, struct callmaster* m, struct streamb
streambuf_printf(replybuffer, "\nSubscribed-on keyspaces:\n");
rwlock_lock_r(&m->conf.config_lock);
for (l = m->conf.redis_subscribed_keyspaces->head; l; l = l->next) {
rwlock_lock_r(&rtpe_config.config_lock);
for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) {
streambuf_printf(replybuffer, "%u ", GPOINTER_TO_UINT(l->data));
}
rwlock_unlock_r(&m->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "\n");
}


+ 5
- 4
daemon/graphite.c View File

@ -21,6 +21,7 @@
#include "graphite.h"
#include "socket.h"
#include "statistics.h"
#include "main.h"
struct timeval rtpe_latest_graphite_interval_start;
@ -334,13 +335,13 @@ void graphite_loop(void *d) {
return ;
}
if (cm->conf.graphite_interval <= 0) {
if (rtpe_config.graphite_interval <= 0) {
ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second.");
cm->conf.graphite_interval=1;
rtpe_config.graphite_interval=1;
}
connect_to_graphite_server(&cm->conf.graphite_ep);
connect_to_graphite_server(&rtpe_config.graphite_ep);
while (!rtpe_shutdown)
graphite_loop_run(cm, &cm->conf.graphite_ep, cm->conf.graphite_interval); // time in seconds
graphite_loop_run(cm, &rtpe_config.graphite_ep, rtpe_config.graphite_interval); // time in seconds
}

+ 57
- 80
daemon/main.c View File

@ -50,40 +50,36 @@ struct poller *rtpe_poller;
struct rtpengine_config rtpe_config = {
// non-zero defaults
.kernel_table = -1,
.max_sessions = -1,
.delete_delay = 30,
.redis_subscribed_keyspaces = G_QUEUE_INIT,
.redis_expires_secs = 86400,
};
static GQueue interfaces = G_QUEUE_INIT;
static GQueue keyspaces = G_QUEUE_INIT;
static endpoint_t tcp_listen_ep;
static endpoint_t udp_listen_ep;
static endpoint_t ng_listen_ep;
static endpoint_t cli_listen_ep;
static endpoint_t graphite_ep;
static endpoint_t redis_ep;
static endpoint_t redis_write_ep;
static endpoint_t homer_ep;
static int homer_protocol = SOCK_DGRAM;
static int homer_id = 2001;
static int tos;
static int control_tos;
static int table = -1;
static int no_fallback;
static unsigned int timeout;
static unsigned int silent_timeout;
static unsigned int final_timeout;
static unsigned int redis_expires = 86400;
static int port_min = 30000;
static int port_max = 40000;
static int max_sessions = -1;
static int redis_db = -1;
static int redis_write_db = -1;
static int redis_num_threads;
static int no_redis_required;
static char *redis_auth;
static char *redis_write_auth;
static char *b2b_url;
static enum xmlrpc_format xmlrpc_fmt = XF_SEMS;
static int num_threads;
static int delete_delay = 30;
static int graphite_interval = 0;
static char *spooldir;
static char *rec_method = "pcap";
static char *rec_format = "raw";
@ -266,7 +262,7 @@ static void options(int *argc, char ***argv) {
char *endptr;
GOptionEntry e[] = {
{ "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" },
{ "table", 't', 0, G_OPTION_ARG_INT, &rtpe_config.kernel_table, "Kernel table to use", "INT" },
{ "no-fallback",'F', 0, G_OPTION_ARG_NONE, &no_fallback, "Only start when kernel module is available", NULL },
{ "interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY,&if_a, "Local interface for RTP", "[NAME/]IP[!IP]"},
{ "subscribe-keyspace", 'k', 0, G_OPTION_ARG_STRING_ARRAY,&ks_a, "Subscription keyspace list", "INT INT ..."},
@ -275,29 +271,29 @@ static void options(int *argc, char ***argv) {
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" },
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46|HOSTNAME:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46|HOSTNAME:PORT" },
{ "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" },
{ "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &rtpe_config.graphite_interval, "Graphite send interval in seconds", "INT" },
{ "graphite-prefix",0, 0, G_OPTION_ARG_STRING, &graphite_prefix_s, "Prefix for graphite line", "STRING"},
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" },
{ "control-tos",0 , 0, G_OPTION_ARG_INT, &control_tos, "Default TOS value to set on control-ng", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" },
{ "final-timeout",'a',0,G_OPTION_ARG_INT, &final_timeout, "Call timeout", "SECS" },
{ "tos", 'T', 0, G_OPTION_ARG_INT, &rtpe_config.default_tos, "Default TOS value to set on streams", "INT" },
{ "control-tos",0 , 0, G_OPTION_ARG_INT, &rtpe_config.control_tos, "Default TOS value to set on control-ng", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &rtpe_config.timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &rtpe_config.silent_timeout,"RTP timeout for muted", "SECS" },
{ "final-timeout",'a',0,G_OPTION_ARG_INT, &rtpe_config.final_timeout, "Call timeout", "SECS" },
{ "port-min", 'm', 0, G_OPTION_ARG_INT, &port_min, "Lowest port to use for RTP", "INT" },
{ "port-max", 'M', 0, G_OPTION_ARG_INT, &port_max, "Highest port to use for RTP", "INT" },
{ "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "[PW@]IP:PORT/INT" },
{ "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" },
{ "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" },
{ "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" },
{ "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_num_threads, "Number of Redis restore threads", "INT" },
{ "redis-expires", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_expires_secs, "Expire time in seconds for redis keys", "INT" },
{ "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &rtpe_config.b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"},
{ "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"},
{ "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &xmlrpc_fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" },
{ "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &rtpe_config.fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" },
{ "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads to create", "INT" },
{ "delete-delay", 'd', 0, G_OPTION_ARG_INT, &delete_delay, "Delay for deleting a session from memory.", "INT" },
{ "delete-delay", 'd', 0, G_OPTION_ARG_INT, &rtpe_config.delete_delay, "Delay for deleting a session from memory.", "INT" },
{ "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL },
{ "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL },
{ "max-sessions", 0, 0, G_OPTION_ARG_INT, &max_sessions, "Limit of maximum number of sessions", "INT" },
{ "max-sessions", 0, 0, G_OPTION_ARG_INT, &rtpe_config.max_sessions, "Limit of maximum number of sessions", "INT" },
{ "homer", 0, 0, G_OPTION_ARG_STRING, &homerp, "Address of Homer server for RTCP stats","IP46|HOSTNAME:PORT"},
{ "homer-protocol",0,0,G_OPTION_ARG_STRING, &homerproto, "Transport protocol for Homer (default udp)", "udp|tcp" },
{ "homer-id", 0, 0, G_OPTION_ARG_STRING, &homer_id, "'Capture ID' to use within the HEP protocol", "INT" },
@ -337,7 +333,7 @@ static void options(int *argc, char ***argv) {
} else if (endptr == str_keyspace_db.s) {
ilog(LOG_ERR, "Fail adding keyspace %.*s to redis notifications; no digits found\n", str_keyspace_db.len, str_keyspace_db.s);
} else {
g_queue_push_tail(&keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
}
}
}
@ -359,7 +355,7 @@ static void options(int *argc, char ***argv) {
die("Invalid IP or port (--listen-cli)");
}
if (graphitep) {if (endpoint_parse_any_getaddrinfo_full(&graphite_ep, graphitep))
if (graphitep) {if (endpoint_parse_any_getaddrinfo_full(&rtpe_config.graphite_ep, graphitep))
die("Invalid IP or port (--graphite)");
}
@ -379,20 +375,20 @@ static void options(int *argc, char ***argv) {
die("Invalid protocol (--homer-protocol)");
}
if (tos < 0 || tos > 255)
if (rtpe_config.default_tos < 0 || rtpe_config.default_tos > 255)
die("Invalid TOS value");
if (control_tos < 0 || control_tos > 255)
if (rtpe_config.control_tos < 0 || rtpe_config.control_tos > 255)
die("Invalid control-ng TOS value");
if (timeout <= 0)
timeout = 60;
if (rtpe_config.timeout <= 0)
rtpe_config.timeout = 60;
if (silent_timeout <= 0)
silent_timeout = 3600;
if (rtpe_config.silent_timeout <= 0)
rtpe_config.silent_timeout = 3600;
if (final_timeout <= 0)
final_timeout = 0;
if (rtpe_config.final_timeout <= 0)
rtpe_config.final_timeout = 0;
if (redisps)
if (redis_ep_parse(&redis_ep, &redis_db, &redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps))
@ -403,7 +399,7 @@ static void options(int *argc, char ***argv) {
"RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write))
die("Invalid Redis endpoint [IP:PORT/INT] (--redis-write)");
if (xmlrpc_fmt > 1)
if (rtpe_config.fmt > 1)
die("Invalid XMLRPC format");
if ((log_level < LOG_EMERG) || (log_level > LOG_DEBUG))
@ -505,7 +501,6 @@ static void init_everything() {
static void create_everything(struct main_context *ctx) {
struct callmaster_config mc;
struct control_tcp *ct;
struct control_udp *cu;
struct control_ng *cn;
@ -514,9 +509,9 @@ static void create_everything(struct main_context *ctx) {
struct timeval redis_start, redis_stop;
double redis_diff = 0;
if (table < 0)
if (rtpe_config.kernel_table < 0)
goto no_kernel;
if (kernel_setup_table(table)) {
if (kernel_setup_table(rtpe_config.kernel_table)) {
if (no_fallback) {
ilog(LOG_CRIT, "Userspace fallback disallowed - exiting");
exit(-1);
@ -535,33 +530,19 @@ no_kernel:
dtls_timer(rtpe_poller);
ZERO(mc);
rwlock_init(&mc.config_lock);
if (max_sessions < -1) {
max_sessions = -1;
rwlock_init(&rtpe_config.config_lock);
if (rtpe_config.max_sessions < -1) {
rtpe_config.max_sessions = -1;
}
mc.max_sessions = max_sessions;
mc.timeout = timeout;
mc.silent_timeout = silent_timeout;
mc.final_timeout = final_timeout;
mc.delete_delay = delete_delay;
mc.default_tos = tos;
mc.control_tos = control_tos;
mc.b2b_url = b2b_url;
mc.fmt = xmlrpc_fmt;
mc.graphite_ep = graphite_ep;
mc.graphite_interval = graphite_interval;
mc.redis_subscribed_keyspaces = g_queue_copy(&keyspaces);
if (redis_num_threads < 1) {
if (rtpe_config.redis_num_threads < 1) {
#ifdef _SC_NPROCESSORS_ONLN
redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN );
rtpe_config.redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN );
#endif
if (redis_num_threads < 1) {
redis_num_threads = REDIS_RESTORE_NUM_THREADS;
if (rtpe_config.redis_num_threads < 1) {
rtpe_config.redis_num_threads = REDIS_RESTORE_NUM_THREADS;
}
}
mc.redis_num_threads = redis_num_threads;
ct = NULL;
if (tcp_listen_ep.port) {
@ -581,7 +562,7 @@ no_kernel:
cn = NULL;
if (ng_listen_ep.port) {
interfaces_exclude_port(ng_listen_ep.port);
cn = control_ng_new(rtpe_poller, &ng_listen_ep, ctx->m, control_tos);
cn = control_ng_new(rtpe_poller, &ng_listen_ep, ctx->m, rtpe_config.control_tos);
if (!cn)
die("Failed to open UDP control connection port");
}
@ -595,27 +576,23 @@ no_kernel:
}
if (!is_addr_unspecified(&redis_write_ep.address)) {
mc.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required);
if (!mc.redis_write)
ctx->m->conf.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required);
if (!ctx->m->conf.redis_write)
die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&redis_write_ep));
}
if (!is_addr_unspecified(&redis_ep.address)) {
mc.redis = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
mc.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
if (!mc.redis || !mc.redis_notify)
ctx->m->conf.redis = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
ctx->m->conf.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
if (!ctx->m->conf.redis || !ctx->m->conf.redis_notify)
die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&redis_ep));
if (!mc.redis_write)
mc.redis_write = mc.redis;
if (!ctx->m->conf.redis_write)
ctx->m->conf.redis_write = ctx->m->conf.redis;
}
mc.redis_expires_secs = redis_expires;
ctx->m->conf = mc;
daemonize();
wpidfile();
@ -623,12 +600,12 @@ no_kernel:
rtcp_init(); // must come after Homer init
if (mc.redis) {
if (ctx->m->conf.redis) {
// start redis restore timer
gettimeofday(&redis_start, NULL);
// restore
if (redis_restore(ctx->m, mc.redis))
if (redis_restore(ctx->m, ctx->m->conf.redis))
die("Refusing to continue without working Redis database");
// stop redis restore timer
@ -641,7 +618,7 @@ no_kernel:
gettimeofday(&rtpe_latest_graphite_interval_start, NULL);
timeval_from_us(&tmp_tv, (long long) graphite_interval*1000000);
timeval_from_us(&tmp_tv, (long long) rtpe_config.graphite_interval*1000000);
set_graphite_interval_tv(&tmp_tv);
}
@ -663,7 +640,7 @@ int main(int argc, char **argv) {
if (!is_addr_unspecified(&redis_ep.address))
thread_create_detach(redis_notify_loop, ctx.m);
if (!is_addr_unspecified(&graphite_ep.address))
if (!is_addr_unspecified(&rtpe_config.graphite_ep.address))
thread_create_detach(graphite_loop, ctx.m);
thread_create_detach(ice_thread_run, NULL);


+ 36
- 0
daemon/main.h View File

@ -1,7 +1,43 @@
#ifndef _MAIN_H_
#define _MAIN_H_
#include "aux.h"
#include <glib.h>
#include "socket.h"
enum xmlrpc_format {
XF_SEMS = 0,
XF_CALLID,
};
struct rtpengine_config {
/* everything below protected by config_lock */
rwlock_t config_lock;
int kernel_table;
int max_sessions;
int timeout;
int silent_timeout;
int final_timeout;
int delete_delay;
GQueue redis_subscribed_keyspaces;
int redis_expires_secs;
char *b2b_url;
int default_tos;
int control_tos;
enum xmlrpc_format fmt;
endpoint_t graphite_ep;
int graphite_interval;
int redis_num_threads;
};
struct poller;
extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer?
extern struct rtpengine_config rtpe_config;
#endif

+ 7
- 6
daemon/redis.c View File

@ -30,6 +30,7 @@
#include "rtplib.h"
#include "str.h"
#include "ssrc.h"
#include "main.h"
INLINE redisReply *redis_expect(int type, redisReply *r) {
@ -543,11 +544,11 @@ static int redis_notify(struct callmaster *cm) {
}
// subscribe to the values in the configured keyspaces
rwlock_lock_r(&cm->conf.config_lock);
for (l = cm->conf.redis_subscribed_keyspaces->head; l; l = l->next) {
rwlock_lock_r(&rtpe_config.config_lock);
for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) {
redis_notify_subscribe_action(cm, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data));
}
rwlock_unlock_r(&cm->conf.config_lock);
rwlock_unlock_r(&rtpe_config.config_lock);
// dispatch event base => thread blocks here
if (event_base_dispatch(cm->conf.redis_notify_event_base) < 0) {
@ -1649,9 +1650,9 @@ int redis_restore(struct callmaster *m, struct redis *r) {
ctx.m = m;
mutex_init(&ctx.r_m);
g_queue_init(&ctx.r_q);
for (i = 0; i < m->conf.redis_num_threads; i++)
for (i = 0; i < rtpe_config.redis_num_threads; i++)
g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required));
gtp = g_thread_pool_new(restore_thread, &ctx, m->conf.redis_num_threads, TRUE, NULL);
gtp = g_thread_pool_new(restore_thread, &ctx, rtpe_config.redis_num_threads, TRUE, NULL);
for (i = 0; i < calls->elements; i++) {
call = calls->element[i];
@ -2073,7 +2074,7 @@ void redis_update_onekey(struct call *c, struct redis *r) {
rwlock_lock_r(&c->master_lock);
redis_expires_s = c->callmaster->conf.redis_expires_secs;
redis_expires_s = rtpe_config.redis_expires_secs;
c->redis_hosted_db = r->db;
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) {


+ 3
- 4
daemon/statistics.c View File

@ -1,6 +1,8 @@
#include "call.h"
#include "statistics.h"
#include "graphite.h"
#include "main.h"
struct totalstats rtpe_totalstats;
struct totalstats rtpe_totalstats_interval;
@ -108,7 +110,6 @@ void statistics_update_foreignown_inc(struct call* c) {
}
void statistics_update_oneway(struct call* c) {
struct callmaster *m;
struct packet_stream *ps = NULL, *ps2 = NULL;
struct call_monologue *ml;
struct call_media *md;
@ -117,8 +118,6 @@ void statistics_update_oneway(struct call* c) {
GList *l;
struct timeval tim_result_duration;
m = c->callmaster;
// --- for statistics getting one way stream or no relay at all
int total_nopacket_relayed_sess = 0;
@ -205,7 +204,7 @@ void statistics_update_oneway(struct call* c) {
timeval_totalstats_interval_call_duration_add(
&rtpe_totalstats_interval, &ml->started, &ml->terminated,
&rtpe_latest_graphite_interval_start,
m->conf.graphite_interval);
rtpe_config.graphite_interval);
}
if (ml->term_reason==FINAL_TIMEOUT) {


Loading…
Cancel
Save