Browse Source

Merge branch 'dev-fmetz' into dev-fmetz-redis-notification

pull/225/head
Frederic-Philippe Metz 10 years ago
parent
commit
6247462211
5 changed files with 88 additions and 61 deletions
  1. +1
    -0
      README.md
  2. +77
    -58
      daemon/graphite.c
  3. +8
    -2
      daemon/graphite.h
  4. +1
    -0
      el/README.el.md
  5. +1
    -1
      el/rtpengine.spec

+ 1
- 0
README.md View File

@ -115,6 +115,7 @@ There's 3 parts to rtpengine, which can be found in the respective subdirectorie
- *PCRE* library - *PCRE* library
- *libcurl* - *libcurl*
- *XMLRPC-C* version 1.16.08 or higher - *XMLRPC-C* version 1.16.08 or higher
- *hiredis* library
The `Makefile` contains a few Debian-specific flags, which may have to removed for compilation to The `Makefile` contains a few Debian-specific flags, which may have to removed for compilation to
be successful. This will not affect operation in any way. be successful. This will not affect operation in any way.


+ 77
- 58
daemon/graphite.c View File

@ -21,9 +21,7 @@
#include "socket.h" #include "socket.h"
static socket_t graphite_sock; static socket_t graphite_sock;
static const endpoint_t *graphite_ep;
static int connectinprogress=0;
static struct callmaster* cm=0;
static int connection_state = STATE_DISCONNECTED;
//struct totalstats totalstats_prev; //struct totalstats totalstats_prev;
static time_t next_run; static time_t next_run;
// HEAD: static time_t g_now, next_run; // HEAD: static time_t g_now, next_run;
@ -39,13 +37,17 @@ void set_prefix(char* prefix) {
graphite_prefix = prefix; graphite_prefix = prefix;
} }
int connect_to_graphite_server(const endpoint_t *ep) {
graphite_ep = ep;
int connect_to_graphite_server(const endpoint_t *graphite_ep) {
int rc; int rc;
ilog(LOG_INFO, "Connecting to graphite server %s", endpoint_print_buf(ep));
if (!graphite_ep) {
ilog(LOG_ERROR, "NULL graphite_ep");
return -1;
}
rc = connect_socket_nb(&graphite_sock, SOCK_STREAM, ep);
ilog(LOG_INFO, "Connecting to graphite server %s", endpoint_print_buf(graphite_ep));
rc = connect_socket_nb(&graphite_sock, SOCK_STREAM, graphite_ep);
if (rc == -1) { if (rc == -1) {
ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite."); ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite.");
return -1; return -1;
@ -55,29 +57,27 @@ int connect_to_graphite_server(const endpoint_t *ep) {
else { else {
/* EINPROGRESS */ /* EINPROGRESS */
ilog(LOG_INFO, "Connection to graphite is in progress."); ilog(LOG_INFO, "Connection to graphite is in progress.");
connectinprogress = 1;
connection_state = STATE_IN_PROGRESS;
} }
return 0; return 0;
} }
int send_graphite_data(struct totalstats *sent_data) {
int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) {
int rc=0; int rc=0;
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster when trying to send data");
return -1;
}
if (graphite_sock.fd < 0) { if (graphite_sock.fd < 0) {
ilog(LOG_ERROR,"Graphite socket is not connected."); ilog(LOG_ERROR,"Graphite socket is not connected.");
return -1; return -1;
} }
// format hostname "." totals.subkey SPACE value SPACE timestamp
char hostname[256];
rc = gethostname(hostname,256);
if (rc<0) {
ilog(LOG_ERROR, "Could not retrieve host name information.");
goto error;
}
char data_to_send[8192]; char data_to_send[8192];
char* ptr = data_to_send; char* ptr = data_to_send;
@ -118,34 +118,34 @@ int send_graphite_data(struct totalstats *sent_data) {
mutex_unlock(&cm->totalstats_interval.managed_sess_lock); mutex_unlock(&cm->totalstats_interval.managed_sess_lock);
rwlock_unlock_r(&cm->hashlock); rwlock_unlock_r(&cm->hashlock);
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr, "%s.totals.call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts->total_calls_duration_interval.tv_sec,(unsigned long long)ts->total_calls_duration_interval.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.average_call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts->total_average_call_dur.tv_sec,(unsigned long long)ts->total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts->total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess_min "UINT64F" %llu\n",hostname, ts->managed_sess_min,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess_max "UINT64F" %llu\n",hostname, ts->managed_sess_max,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.regular_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_regular_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.relayed_errors "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_relayed_errors),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.relayed_packets "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_relayed_packets),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.silent_timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_silent_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.timeout_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.reject_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts->total_rejected_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr, "call_dur %llu.%06llu %llu\n",(unsigned long long)ts->total_calls_duration_interval.tv_sec,(unsigned long long)ts->total_calls_duration_interval.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"average_call_dur %llu.%06llu %llu\n",(unsigned long long)ts->total_average_call_dur.tv_sec,(unsigned long long)ts->total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"forced_term_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"managed_sess "UINT64F" %llu\n", ts->total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"managed_sess_min "UINT64F" %llu\n", ts->managed_sess_min,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"managed_sess_max "UINT64F" %llu\n", ts->managed_sess_max,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"nopacket_relayed_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"oneway_stream_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"regular_term_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_regular_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"relayed_errors "UINT64F" %llu\n", atomic64_get_na(&ts->total_relayed_errors),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"relayed_packets "UINT64F" %llu\n", atomic64_get_na(&ts->total_relayed_packets),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"silent_timeout_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_silent_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"timeout_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_timeout_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"reject_sess "UINT64F" %llu\n", atomic64_get_na(&ts->total_rejected_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%llu.%06llu at time %llu\n", ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%llu.%06llu at time %llu\n",
(unsigned long long) ts->managed_sess_min, (unsigned long long) ts->managed_sess_min,
@ -172,7 +172,7 @@ static inline void copy_with_lock(struct totalstats *ts_dst, struct totalstats *
mutex_unlock(ts_lock); mutex_unlock(ts_lock);
} }
void graphite_loop_run(struct callmaster* callmaster, int seconds) {
void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds) {
int rc=0; int rc=0;
fd_set wfds; fd_set wfds;
@ -181,7 +181,18 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
int optval=0; int optval=0;
socklen_t optlen=sizeof(optval); socklen_t optlen=sizeof(optval);
if (connectinprogress && graphite_sock.fd >= 0) {
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster");
return ;
}
if (!graphite_ep) {
ilog(LOG_ERROR, "NULL graphite_ep");
return ;
}
if (connection_state == STATE_IN_PROGRESS && graphite_sock.fd >= 0) {
FD_SET(graphite_sock.fd,&wfds); FD_SET(graphite_sock.fd,&wfds);
tv.tv_sec = 0; tv.tv_sec = 0;
tv.tv_usec = 1000000; tv.tv_usec = 1000000;
@ -190,6 +201,7 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
if ((rc == -1) && (errno == EINTR)) { if ((rc == -1) && (errno == EINTR)) {
ilog(LOG_ERROR,"Error on the socket."); ilog(LOG_ERROR,"Error on the socket.");
close_socket(&graphite_sock); close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
return; return;
} else if (rc==0) { } else if (rc==0) {
// timeout // timeout
@ -197,7 +209,8 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
} else { } else {
if (!FD_ISSET(graphite_sock.fd,&wfds)) { if (!FD_ISSET(graphite_sock.fd,&wfds)) {
ilog(LOG_WARN,"fd active but not the graphite fd."); ilog(LOG_WARN,"fd active but not the graphite fd.");
close_socket(&graphite_sock);
close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
return; return;
} }
rc = getsockopt(graphite_sock.fd, SOL_SOCKET, SO_ERROR, &optval, &optlen); rc = getsockopt(graphite_sock.fd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
@ -205,10 +218,11 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
if (optval != 0) { if (optval != 0) {
ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock.fd, strerror(optval)); ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock.fd, strerror(optval));
close_socket(&graphite_sock); close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
return; return;
} }
ilog(LOG_INFO, "Graphite server connected."); ilog(LOG_INFO, "Graphite server connected.");
connectinprogress=0;
connection_state = STATE_CONNECTED;
next_run=0; // fake next run to skip sleep after reconnect next_run=0; // fake next run to skip sleep after reconnect
} }
} }
@ -221,20 +235,19 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
next_run = g_now.tv_sec + seconds; next_run = g_now.tv_sec + seconds;
if (!cm)
cm = callmaster;
if (graphite_sock.fd < 0 && !connectinprogress) {
if (graphite_sock.fd < 0 && connection_state == STATE_DISCONNECTED) {
rc = connect_to_graphite_server(graphite_ep); rc = connect_to_graphite_server(graphite_ep);
} }
if (graphite_sock.fd >= 0 && !connectinprogress) {
if (graphite_sock.fd >= 0 && connection_state == STATE_CONNECTED) {
add_total_calls_duration_in_interval(cm, &graphite_interval_tv); add_total_calls_duration_in_interval(cm, &graphite_interval_tv);
rc = send_graphite_data(&graphite_stats);
rc = send_graphite_data(cm, &graphite_stats);
gettimeofday(&cm->latest_graphite_interval_start, NULL); gettimeofday(&cm->latest_graphite_interval_start, NULL);
if (rc<0) {
if (rc < 0) {
ilog(LOG_ERROR,"Sending graphite data failed."); ilog(LOG_ERROR,"Sending graphite data failed.");
close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
} }
copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval.total_average_lock); copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval.total_average_lock);
@ -245,7 +258,13 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
void graphite_loop(void *d) { void graphite_loop(void *d) {
struct callmaster *cm = d; struct callmaster *cm = d;
if (!cm->conf.graphite_interval) {
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster");
return ;
}
if (cm->conf.graphite_interval <= 0) {
ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second.");
cm->conf.graphite_interval=1; cm->conf.graphite_interval=1;
} }
@ -253,5 +272,5 @@ void graphite_loop(void *d) {
connect_to_graphite_server(&cm->conf.graphite_ep); connect_to_graphite_server(&cm->conf.graphite_ep);
while (!g_shutdown) while (!g_shutdown)
graphite_loop_run(cm,cm->conf.graphite_interval); // time in seconds
graphite_loop_run(cm, &cm->conf.graphite_ep, cm->conf.graphite_interval); // time in seconds
} }

+ 8
- 2
daemon/graphite.h View File

@ -10,9 +10,15 @@
#include "call.h" #include "call.h"
enum connection_state {
STATE_DISCONNECTED = 0,
STATE_IN_PROGRESS,
STATE_CONNECTED,
};
int connect_to_graphite_server(const endpoint_t *ep); int connect_to_graphite_server(const endpoint_t *ep);
int send_graphite_data();
void graphite_loop_run(struct callmaster* cm, int seconds);
int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data);
void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds);
void set_prefix(char* prefix); void set_prefix(char* prefix);
void graphite_loop(void *d); void graphite_loop(void *d);
void set_latest_graphite_interval_start(struct timeval *tv); void set_latest_graphite_interval_start(struct timeval *tv);


+ 1
- 0
el/README.el.md View File

@ -70,6 +70,7 @@ respective subdirectories.
- *make* - *make*
- *pkgconfig* - *pkgconfig*
- *glib2-devel* - *glib2-devel*
- *hiredis-devel*
- *libcurl-devel* - *libcurl-devel*
- *openssl-devel* - *openssl-devel*
- *pcre-devel* - *pcre-devel*


+ 1
- 1
el/rtpengine.spec View File

@ -11,7 +11,7 @@ Conflicts: %{name}-kernel < %{version}-%{release}
BuildRequires: gcc make pkgconfig redhat-rpm-config BuildRequires: gcc make pkgconfig redhat-rpm-config
BuildRequires: glib2-devel libcurl-devel openssl-devel pcre-devel BuildRequires: glib2-devel libcurl-devel openssl-devel pcre-devel
BuildRequires: xmlrpc-c-devel zlib-devel
BuildRequires: xmlrpc-c-devel zlib-devel hiredis-devel
Requires: nc Requires: nc
# Remain compat with other installations # Remain compat with other installations
Provides: ngcp-rtpengine = %{version}-%{release} Provides: ngcp-rtpengine = %{version}-%{release}


Loading…
Cancel
Save