@ -5,20 +5,23 @@
#!endif
#!ifndef NODES_FUDGE_FACTOR
#!define NODES_FUDGE_FACTOR 3
#!define NODES_FUDGE_FACTOR 10
#!endif
modparam("htable", "htable", "nodes = >size=8;initval=0;autoexpire=18 0");
modparam("htable", "htable", "media = >size=8;initval=0;autoexpire=18 0");
modparam("htable", "htable", "nodes = >size=8;initval=0;autoexpire=6 0");
modparam("htable", "htable", "media = >size=8;initval=0;autoexpire=6 0");
####### TIMER module ##########
#!ifndef TIMER_LOADED
loadmodule "timer.so"
#!trydef TIMER_LOADED
####### R TIMER module ##########
#!ifndef R TIMER_LOADED
loadmodule "r timer.so"
#!trydef R TIMER_LOADED
#!endif
modparam("timer", "declare_timer", "NODES_ADVERTISE_TIMER = NODES_ADVERTISE_ROUTE,5000,slow,enable");
modparam("timer", "declare_timer", "NODE_TRACK_TIMER = NODE_TRACK_ROUTE,500,fast,enable");
modparam("timer", "declare_timer", "NODE_HEARTBEAT_TIMER = NODE_HEARTBEAT_ROUTE,500,fast,enable");
modparam("rtimer", "timer", "name = ta;interval=2;mode=2;")
modparam("rtimer", "timer", "name = retry;interval=5;mode=2;")
modparam("rtimer", "timer", "name = pub;interval=10;mode=1;")
modparam("rtimer", "exec", "timer = ta;route=NODE_HEARTBEAT_ROUTE")
modparam("rtimer", "exec", "timer = retry;route=NODE_TRACK_ROUTE")
modparam("rtimer", "exec", "timer = pub;route=NODES_ADVERTISE_ROUTE")
modparam("mqueue","mqueue", "name = node_track")
@ -29,15 +32,11 @@ modparam("mqueue","mqueue", "name=node_heartbeat")
route[NODES_ADVERTISE_ROUTE]
{
#!ifdef REGISTRAR_ROLE
$var(Registrar) = $_s("Registrar" : {"Registrations" : $(stat(registered_users){s.int})});
#!else
$var(Registrar) = "";
#!endif
route(LISTENER_STATUS);
#!ifdef DISPATCHER_ROLE
route(DISPATCHER_STATUS);
$var(Dispatcher) = $_s("Dispatcher" : {"Groups" : { $var(ds_groups_json) }});
$var(Dispatcher) = $_s(, "Dispatcher" : {"Groups" : { $var(ds_groups_json) }});
#!else
$var(Dispatcher) = "";
#!endif
@ -50,13 +49,19 @@ route[NODES_ADVERTISE_ROUTE]
route(COUNT_ALL_PRESENTITIES);
$var(Presentities) = $_s("Presentities" : {"message-summary" : $xavp(watchers=>message-summary), "dialog" : $xavp(watchers=>dialog), "presence" : $xavp(watchers=>presence)});
$var(Presence) = $_s("Presence" : {$var(Subscribers), $var(Subscriptions), $var(Presentities)});
$var(Presence) = $_s(, "Presence" : {$var(Subscribers), $var(Subscriptions), $var(Presentities)});
#!else
$var(Presence) = "";
#!endif
$var(Roles) = $_s("Roles" : {$var(Dispatcher), $var(Presence) , $var(Registrar)});
$var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Startup" : $Tb, "XWhApps" : {"kamailio" : {"Startup" : $Tb }}, $var(Roles)}';
#!ifdef REGISTRAR_ROLE
$var(Registrar) = $_s(, "Registrar" : {"Registrations" : $(stat(registered_users){s.int})});
#!else
$var(Registrar) = "";
#!endif
$var(Roles) = $_s("Roles" : {"Proxy" : $var(listeners) $var(Dispatcher) $var(Presence) $var(Registrar)});
$var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 15000, "Used-Memory" : $(stat(real_used_size){s.int}), "Startup" : $Tb, "WhApps" : {"kamailio" : {"Startup" : $Tb }}, $var(Roles)}';
kazoo_publish("nodes", "", $var(Payload));
}
@ -65,11 +70,13 @@ event_route[kazoo:consumer-event-nodes-advertise]
{
$var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count);
if($var(count) = = 0) {
xlog("L_INFO ", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat for reconnected node $(kzE{kz.json,Node})\n");
xlog("L_WARNING ", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat for reconnected node $(kzE{kz.json,Node})\n");
$var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count);
} else {
if($var(count) = = 1) {
xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n");
xlog("L_WARNING", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n");
} else {
xlog("L_DEBUG", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from existing node $(kzE{kz.json,Node})\n");
}
}
mq_add("node_heartbeat", "$(kzE{kz.json,Node})", "$kzE");
@ -79,13 +86,13 @@ event_route[htable:expired:nodes]
{
if($shtrecord(key) = ~ "::count$$") {
if($shtrecord(value) = = -1) {
xlog("L_INFO ", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n");
xlog("L_WARNING ", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n");
}
mq_add("node_track", "$shtrecord(key)", "");
return;
}
xlog("L_INFO ", "htable|nodes|hearbeat expired for node $shtrecord(key)\n");
xlog("L_WARNING ", "htable|nodes|hearbeat expired for node $shtrecord(key)\n");
}
route[NODE_TRACK_ROUTE]
@ -105,6 +112,7 @@ route[NODE_HEARTBEAT_ROUTE]
while(mq_fetch("node_heartbeat") = = 1 && $var(runloop) < MAX_WHILE_LOOPS) {
$var(Node) = $mqk(node_heartbeat);
$var(Payload) = $mqv(node_heartbeat);
xlog("L_DEBUG", "$(var(Payload){kz.json,Msg-ID})|nodes|processing hearbeat for node $var(Node)\n");
route(CHECK_MEDIA_SERVERS);
@ -130,21 +138,24 @@ route[CHECK_MEDIA_SERVERS]
$var(Count) = $cnt($avp(MediaKeys));
$var(Idx) = 0;
while( $var(Idx) < $var(Count) ) {
$var(MediaKey) = $(avp(MediaKeys)[$var(Idx)]{s.replace,$var(Dot),$var(Perc)});
$var(MediaName) = $(avp(MediaKeys)[$var(Idx)]);
$var(MediaKey) = $(var(MediaName){s.replace,$var(Dot),$var(Perc)});
avp_delete("$avp(ProfileKeys)/g");
if(kazoo_json_keys($var(Payload), "Media-Servers.$var(MediaKey).Interfaces", "$avp(ProfileKeys)") = = 1) {
$var(ProfileCount) = $cnt($avp(ProfileKeys));
$var(ProfileIdx) = 0;
while( $var(ProfileIdx) < $var(ProfileCount) ) {
$var(MediaRawUrl) = $(var(Payload){kz.json,Media-Servers.$var(MediaKey).Interfaces.$(avp(ProfileKeys)[$var(ProfileIdx)]).URL});
$var(MediaProfile) = $(avp(ProfileKeys)[$var(ProfileIdx)]);
$var(MediaRawUrl) = $(var(Payload){kz.json,Media-Servers.$var(MediaKey).Interfaces.$var(MediaProfile).URL});
$var(MediaUrl) = $(var(MediaRawUrl){re.subst,/^sip:(.*)@(.*)/sip:\2/});
if($shtinc(media = >$var(MediaUrl)::count) == 1) {
$sht(media = >$var(MediaUrl)::zone) = $var(Zone);
$shtex(media = >$var(MediaUrl)::zone) = 0;
route(MEDIA_SERVER_UP);
};
$shtex(media = >$var(MediaUrl)::count) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR;
$var(MediaExpire) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR;
xlog("L_DEBUG", "nodes|media|$var(Node) media expiration $var(MediaExpire) for $var(MediaUrl)\n");
$shtex(media = >$var(MediaUrl)::count) = $var(MediaExpire);
$var(ProfileIdx) = $var(ProfileIdx) + 1;
}
}
@ -153,7 +164,6 @@ route[CHECK_MEDIA_SERVERS]
};
}
event_route[htable:expired:media]
{
$var(MediaUrl) = $(shtrecord(key){re.subst,/(.*)::(.*)/\1/});
@ -163,7 +173,7 @@ event_route[htable:expired:media]
route[MEDIA_SERVER_UP]
{
xlog("L_INFO ", "nodes|media|$var(Node) reported new media server $var(MediaUrl) in zone $var(Zone)\n");
xlog("L_WARNING ", "nodes|media|$var(Node) reported new media server $var(MediaUrl) in zone $var(Zone)\n");
#!ifdef DISPATCHER_ROLE
route(DISPATCHER_CHECK_MEDIA_SERVER);
@ -177,7 +187,7 @@ route[MEDIA_SERVER_UP]
route[MEDIA_SERVER_DOWN]
{
xlog("L_INFO ", "htable|media|hearbeat expired for media server $var(MediaUrl) in zone $var(Zone)\n");
xlog("L_WARNING ", "htable|media|hearbeat expired for media server $var(MediaUrl) in zone $var(Zone)\n");
#!ifdef PRESENCE_ROLE
route(RESET_PUBLISHER);
@ -193,6 +203,31 @@ route[NODES_BINDINGS]
}
#!endif
route[LISTENER_STATUS]
{
jsonrpc_exec('{"jsonrpc": "2.0", "method": "corex.list_sockets", "id": 1}');
$var(count) = $(jsonrpl(body){kz.json.count,result});
$var(loop) = 0;
$var(sep) = "";
$var(listeners) = "";
while( $var(loop) < $var(count) ) {
$var(listener) = $(jsonrpl(body){kz.json,result[$var(loop)]});
$var(proto) = $(var(listener){kz.json,PROTO});
$var(address) = $(var(listener){kz.json,ADDRLIST.ADDR});
$var(port) = $(var(listener){kz.json,PORT});
$var(uri) = $_s($var(proto):$var(address):$var(port));
if($(var(listener){kz.json,ADVERTISE}) ! = "-") {
$var(advertise) = $_s( , "advertise" : "$(var(listener){kz.json,ADVERTISE})");
} else {
$var(advertise) = "";
}
$var(x) = $_s("$var(uri)" : {"proto" : "$var(proto)", "address" : "$var(address)", "port" : $var(port) $var(advertise) });
$var(listeners) = $_s($var(listeners)$var(sep)$var(x));
$var(loop) = $var(loop) + 1;
$var(sep) = " , ";
}
$var(listeners) = $_s({"Listeners" : { $var(listeners) }});
}
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab