diff --git a/README.md b/README.md index 5c464a78e..a68884ff7 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ There's 3 parts to rtpengine, which can be found in the respective subdirectorie - *PCRE* library - *libcurl* - *XMLRPC-C* version 1.16.08 or higher + - *hiredis* library The `Makefile` contains a few Debian-specific flags, which may have to removed for compilation to be successful. This will not affect operation in any way. diff --git a/daemon/graphite.c b/daemon/graphite.c index 91ba8361b..66bf28add 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -21,9 +21,7 @@ #include "socket.h" static socket_t graphite_sock; -static const endpoint_t *graphite_ep; -static int connectinprogress=0; -static struct callmaster* cm=0; +static int connection_state = STATE_DISCONNECTED; //struct totalstats totalstats_prev; static time_t next_run; // HEAD: static time_t g_now, next_run; @@ -39,13 +37,17 @@ void set_prefix(char* prefix) { graphite_prefix = prefix; } -int connect_to_graphite_server(const endpoint_t *ep) { - graphite_ep = ep; +int connect_to_graphite_server(const endpoint_t *graphite_ep) { int rc; - ilog(LOG_INFO, "Connecting to graphite server %s", endpoint_print_buf(ep)); + if (!graphite_ep) { + ilog(LOG_ERROR, "NULL graphite_ep"); + return -1; + } - rc = connect_socket_nb(&graphite_sock, SOCK_STREAM, ep); + ilog(LOG_INFO, "Connecting to graphite server %s", endpoint_print_buf(graphite_ep)); + + rc = connect_socket_nb(&graphite_sock, SOCK_STREAM, graphite_ep); if (rc == -1) { ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite."); return -1; @@ -55,29 +57,27 @@ int connect_to_graphite_server(const endpoint_t *ep) { else { /* EINPROGRESS */ ilog(LOG_INFO, "Connection to graphite is in progress."); - connectinprogress = 1; + connection_state = STATE_IN_PROGRESS; } return 0; } -int send_graphite_data(struct totalstats *sent_data) { +int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) { int rc=0; + // sanity checks + if (!cm) { + ilog(LOG_ERROR, "NULL callmaster when trying to send data"); + return -1; + } + if (graphite_sock.fd < 0) { ilog(LOG_ERROR,"Graphite socket is not connected."); return -1; } - // format hostname "." totals.subkey SPACE value SPACE timestamp - char hostname[256]; - rc = gethostname(hostname,256); - if (rc<0) { - ilog(LOG_ERROR, "Could not retrieve host name information."); - goto error; - } - char data_to_send[8192]; char* ptr = data_to_send; @@ -118,34 +118,34 @@ int send_graphite_data(struct totalstats *sent_data) { mutex_unlock(&cm->totalstats_interval.managed_sess_lock); rwlock_unlock_r(&cm->hashlock); - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr, "%s.totals.call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts->total_calls_duration_interval.tv_sec,(unsigned long long)ts->total_calls_duration_interval.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.average_call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts->total_average_call_dur.tv_sec,(unsigned long long)ts->total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts->total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.managed_sess_min "UINT64F" %llu\n",hostname, ts->managed_sess_min,(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.managed_sess_max "UINT64F" %llu\n",hostname, ts->managed_sess_max,(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.regular_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_regular_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.relayed_errors "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_relayed_errors),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.relayed_packets "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_relayed_packets),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.silent_timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_silent_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc; - if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.reject_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_rejected_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr, "call_dur %llu.%06llu %llu\n",(unsigned long long)ts->total_calls_duration_interval.tv_sec,(unsigned long long)ts->total_calls_duration_interval.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"average_call_dur %llu.%06llu %llu\n",(unsigned long long)ts->total_average_call_dur.tv_sec,(unsigned long long)ts->total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"forced_term_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"managed_sess "UINT64F" %llu\n", ts->total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"managed_sess_min "UINT64F" %llu\n", ts->managed_sess_min,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"managed_sess_max "UINT64F" %llu\n", ts->managed_sess_max,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"nopacket_relayed_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"oneway_stream_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"regular_term_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_regular_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"relayed_errors "UINT64F" %llu\n", atomic64_get_na(&ts->total_relayed_errors),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"relayed_packets "UINT64F" %llu\n", atomic64_get_na(&ts->total_relayed_packets),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"silent_timeout_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_silent_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"timeout_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"reject_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_rejected_sess),(unsigned long long)g_now.tv_sec); ptr += rc; ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%llu.%06llu at time %llu\n", (unsigned long long) ts->managed_sess_min, @@ -172,7 +172,7 @@ static inline void copy_with_lock(struct totalstats *ts_dst, struct totalstats * mutex_unlock(ts_lock); } -void graphite_loop_run(struct callmaster* callmaster, int seconds) { +void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds) { int rc=0; fd_set wfds; @@ -181,7 +181,18 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { int optval=0; socklen_t optlen=sizeof(optval); - if (connectinprogress && graphite_sock.fd >= 0) { + // sanity checks + if (!cm) { + ilog(LOG_ERROR, "NULL callmaster"); + return ; + } + + if (!graphite_ep) { + ilog(LOG_ERROR, "NULL graphite_ep"); + return ; + } + + if (connection_state == STATE_IN_PROGRESS && graphite_sock.fd >= 0) { FD_SET(graphite_sock.fd,&wfds); tv.tv_sec = 0; tv.tv_usec = 1000000; @@ -190,6 +201,7 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { if ((rc == -1) && (errno == EINTR)) { ilog(LOG_ERROR,"Error on the socket."); close_socket(&graphite_sock); + connection_state = STATE_DISCONNECTED; return; } else if (rc==0) { // timeout @@ -197,7 +209,8 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { } else { if (!FD_ISSET(graphite_sock.fd,&wfds)) { ilog(LOG_WARN,"fd active but not the graphite fd."); - close_socket(&graphite_sock); + close_socket(&graphite_sock); + connection_state = STATE_DISCONNECTED; return; } rc = getsockopt(graphite_sock.fd, SOL_SOCKET, SO_ERROR, &optval, &optlen); @@ -205,10 +218,11 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { if (optval != 0) { ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock.fd, strerror(optval)); close_socket(&graphite_sock); + connection_state = STATE_DISCONNECTED; return; } ilog(LOG_INFO, "Graphite server connected."); - connectinprogress=0; + connection_state = STATE_CONNECTED; next_run=0; // fake next run to skip sleep after reconnect } } @@ -221,20 +235,19 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { next_run = g_now.tv_sec + seconds; - if (!cm) - cm = callmaster; - - if (graphite_sock.fd < 0 && !connectinprogress) { + if (graphite_sock.fd < 0 && connection_state == STATE_DISCONNECTED) { rc = connect_to_graphite_server(graphite_ep); } - if (graphite_sock.fd >= 0 && !connectinprogress) { + if (graphite_sock.fd >= 0 && connection_state == STATE_CONNECTED) { add_total_calls_duration_in_interval(cm, &graphite_interval_tv); - rc = send_graphite_data(&graphite_stats); + rc = send_graphite_data(cm, &graphite_stats); gettimeofday(&cm->latest_graphite_interval_start, NULL); - if (rc<0) { + if (rc < 0) { ilog(LOG_ERROR,"Sending graphite data failed."); + close_socket(&graphite_sock); + connection_state = STATE_DISCONNECTED; } copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval.total_average_lock); @@ -245,7 +258,13 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { void graphite_loop(void *d) { struct callmaster *cm = d; - if (!cm->conf.graphite_interval) { + // sanity checks + if (!cm) { + ilog(LOG_ERROR, "NULL callmaster"); + return ; + } + + if (cm->conf.graphite_interval <= 0) { ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); cm->conf.graphite_interval=1; } @@ -253,5 +272,5 @@ void graphite_loop(void *d) { connect_to_graphite_server(&cm->conf.graphite_ep); while (!g_shutdown) - graphite_loop_run(cm,cm->conf.graphite_interval); // time in seconds + graphite_loop_run(cm, &cm->conf.graphite_ep, cm->conf.graphite_interval); // time in seconds } diff --git a/daemon/graphite.h b/daemon/graphite.h index e6948aa43..9ffd34c70 100644 --- a/daemon/graphite.h +++ b/daemon/graphite.h @@ -10,9 +10,15 @@ #include "call.h" +enum connection_state { + STATE_DISCONNECTED = 0, + STATE_IN_PROGRESS, + STATE_CONNECTED, +}; + int connect_to_graphite_server(const endpoint_t *ep); -int send_graphite_data(); -void graphite_loop_run(struct callmaster* cm, int seconds); +int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data); +void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds); void set_prefix(char* prefix); void graphite_loop(void *d); void set_latest_graphite_interval_start(struct timeval *tv); diff --git a/el/README.el.md b/el/README.el.md index 67f75ea5d..1ddb43c59 100644 --- a/el/README.el.md +++ b/el/README.el.md @@ -70,6 +70,7 @@ respective subdirectories. - *make* - *pkgconfig* - *glib2-devel* + - *hiredis-devel* - *libcurl-devel* - *openssl-devel* - *pcre-devel* diff --git a/el/rtpengine.spec b/el/rtpengine.spec index 493f6a038..d76233ab5 100644 --- a/el/rtpengine.spec +++ b/el/rtpengine.spec @@ -11,7 +11,7 @@ Conflicts: %{name}-kernel < %{version}-%{release} BuildRequires: gcc make pkgconfig redhat-rpm-config BuildRequires: glib2-devel libcurl-devel openssl-devel pcre-devel -BuildRequires: xmlrpc-c-devel zlib-devel +BuildRequires: xmlrpc-c-devel zlib-devel hiredis-devel Requires: nc # Remain compat with other installations Provides: ngcp-rtpengine = %{version}-%{release}