######## Nodes role - pushes info to kazoo ########
|
|
|
|
#!ifndef NODES_EXPIRE
|
|
#!define NODES_EXPIRE 10
|
|
#!endif
|
|
|
|
#!ifndef NODES_FUDGE_FACTOR
|
|
#!define NODES_FUDGE_FACTOR 3
|
|
#!endif
|
|
|
|
modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=180");
|
|
modparam("htable", "htable", "media=>size=8;initval=0;autoexpire=180");
|
|
|
|
####### TIMER module ##########
|
|
#!ifndef TIMER_LOADED
|
|
loadmodule "timer.so"
|
|
#!trydef TIMER_LOADED
|
|
#!endif
|
|
modparam("timer", "declare_timer", "NODES_ADVERTISE_TIMER=NODES_ADVERTISE_ROUTE,5000,slow,enable");
|
|
modparam("timer", "declare_timer", "NODE_TRACK_TIMER=NODE_TRACK_ROUTE,500,fast,enable");
|
|
modparam("timer", "declare_timer", "NODE_HEARTBEAT_TIMER=NODE_HEARTBEAT_ROUTE,500,fast,enable");
|
|
|
|
|
|
modparam("mqueue","mqueue", "name=node_track")
|
|
modparam("mqueue","mqueue", "name=node_heartbeat")
|
|
|
|
|
|
####### NODES Logic ########
|
|
route[NODES_ADVERTISE_ROUTE]
|
|
{
|
|
|
|
#!ifdef REGISTRAR_ROLE
|
|
$var(Registrar) = $_s("Registrar" : {"Registrations" : $(stat(registered_users){s.int})});
|
|
#!else
|
|
$var(Registrar) = "";
|
|
#!endif
|
|
|
|
#!ifdef DISPATCHER_ROLE
|
|
route(DISPATCHER_SUMMARY);
|
|
$var(Dispatcher) = $_s("Dispatcher" : {"Groups" : { $var(ds_groups_json) }});
|
|
#!else
|
|
$var(Dispatcher) = "";
|
|
#!endif
|
|
|
|
#!ifdef PRESENCE_ROLE
|
|
route(COUNT_ALL_SUBSCRIBERS);
|
|
$var(Subscriptions) = $_s("Subscriptions" : {"message-summary" : $xavp(watchers=>message-summary), "dialog" : $xavp(watchers=>dialog), "presence" : $xavp(watchers=>presence)});
|
|
route(COUNT_ALL_PRESENTITIES);
|
|
$var(Presentities) = $_s("Presentities" : {"message-summary" : $xavp(watchers=>message-summary), "dialog" : $xavp(watchers=>dialog), "presence" : $xavp(watchers=>presence)});
|
|
$var(Presence) = $_s("Presence" : {$var(Subscriptions), $var(Presentities)});
|
|
#!else
|
|
$var(Presence) = "";
|
|
#!endif
|
|
|
|
$var(Roles) = $_s("Roles" : {$var(Dispatcher), $var(Presence) , $var(Registrar)});
|
|
$var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Startup" : $Tb, "XWhApps" : {"kamailio" : {"Startup" : $Tb }}, $var(Roles)}';
|
|
kazoo_publish("nodes", "", $var(Payload));
|
|
}
|
|
|
|
|
|
event_route[kazoo:consumer-event-nodes-advertise]
|
|
{
|
|
$var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count);
|
|
if($var(count) == 0) {
|
|
xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat for reconnected node $(kzE{kz.json,Node})\n");
|
|
$var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count);
|
|
} else {
|
|
if($var(count) == 1) {
|
|
xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n");
|
|
}
|
|
}
|
|
mq_add("node_heartbeat", "$(kzE{kz.json,Node})", "$kzE");
|
|
}
|
|
|
|
event_route[htable:expired:nodes]
|
|
{
|
|
if($shtrecord(key) =~ "::count$$") {
|
|
if($shtrecord(value) == -1) {
|
|
xlog("L_INFO", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n");
|
|
}
|
|
mq_add("node_track", "$shtrecord(key)", "");
|
|
return;
|
|
}
|
|
|
|
xlog("L_INFO", "htable|nodes|hearbeat expired for node $shtrecord(key)\n");
|
|
}
|
|
|
|
route[NODE_TRACK_ROUTE]
|
|
{
|
|
$var(runloop) = 1;
|
|
while(mq_fetch("node_track") == 1 && $var(runloop) < MAX_WHILE_LOOPS) {
|
|
$var(Key) = $mqk(node_track);
|
|
$sht(nodes=>$var(Key)) = -1;
|
|
$var(runloop) = $var(runloop) + 1;
|
|
}
|
|
}
|
|
|
|
|
|
route[NODE_HEARTBEAT_ROUTE]
|
|
{
|
|
$var(runloop) = 1;
|
|
while(mq_fetch("node_heartbeat") == 1 && $var(runloop) < MAX_WHILE_LOOPS) {
|
|
$var(Node) = $mqk(node_heartbeat);
|
|
$var(Payload) = $mqv(node_heartbeat);
|
|
|
|
route(CHECK_MEDIA_SERVERS);
|
|
|
|
$sht(nodes=>$var(Node)) = $var(Payload);
|
|
$shtex(nodes=>$var(Node)) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR;
|
|
$var(runloop) = $var(runloop) + 1;
|
|
}
|
|
}
|
|
|
|
route[CHECK_MEDIA_SERVERS]
|
|
{
|
|
if($(var(Payload){kz.json,Media-Servers}) == "") {
|
|
return;
|
|
}
|
|
|
|
$var(Media) = $(var(Payload){kz.json,Media-Servers});
|
|
$var(Zone) = $(var(Payload){kz.json,AMQP-Broker-Zone});
|
|
|
|
$var(Dot) = ".";
|
|
$var(Perc) = "%";
|
|
avp_delete("$avp(MediaKeys)/g");
|
|
if(kazoo_json_keys($var(Payload), "Media-Servers", "$avp(MediaKeys)") == 1) {
|
|
$var(Count) = $cnt($avp(MediaKeys));
|
|
$var(Idx) = 0;
|
|
while( $var(Idx) < $var(Count) ) {
|
|
$var(MediaKey) = $(avp(MediaKeys)[$var(Idx)]{s.replace,$var(Dot),$var(Perc)});
|
|
avp_delete("$avp(ProfileKeys)/g");
|
|
if(kazoo_json_keys($var(Payload), "Media-Servers.$var(MediaKey).Interfaces", "$avp(ProfileKeys)") == 1) {
|
|
$var(ProfileCount) = $cnt($avp(ProfileKeys));
|
|
$var(ProfileIdx) = 0;
|
|
while( $var(ProfileIdx) < $var(ProfileCount) ) {
|
|
$var(MediaRawUrl) = $(var(Payload){kz.json,Media-Servers.$var(MediaKey).Interfaces.$(avp(ProfileKeys)[$var(ProfileIdx)]).URL});
|
|
$var(MediaUrl) = $(var(MediaRawUrl){re.subst,/^sip:(.*)@(.*)/sip:\2/});
|
|
if($shtinc(media=>$var(MediaUrl)::count) == 1) {
|
|
$sht(media=>$var(MediaUrl)::zone) = $var(Zone);
|
|
$shtex(media=>$var(MediaUrl)::zone) = 0;
|
|
route(MEDIA_SERVER_UP);
|
|
};
|
|
$shtex(media=>$var(MediaUrl)::count) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR;
|
|
|
|
$var(ProfileIdx) = $var(ProfileIdx) + 1;
|
|
}
|
|
}
|
|
$var(Idx) = $var(Idx) + 1;
|
|
}
|
|
};
|
|
}
|
|
|
|
|
|
event_route[htable:expired:media]
|
|
{
|
|
$var(MediaUrl) = $(shtrecord(key){re.subst,/(.*)::(.*)/\1/});
|
|
$var(Zone) = $sht(media=>$var(MediaUrl)::zone);
|
|
route(MEDIA_SERVER_DOWN);
|
|
}
|
|
|
|
route[MEDIA_SERVER_UP]
|
|
{
|
|
xlog("L_INFO", "nodes|media|$var(Node) reported new media server $var(MediaUrl) in zone $var(Zone)\n");
|
|
|
|
#!ifdef DISPATCHER_ROLE
|
|
route(DISPATCHER_CHECK_MEDIA_SERVER);
|
|
#!endif
|
|
|
|
#!ifdef FAST_PICKUP_ROLE
|
|
route(FAST_PICKUP_START);
|
|
#!endif
|
|
|
|
}
|
|
|
|
route[MEDIA_SERVER_DOWN]
|
|
{
|
|
xlog("L_INFO", "htable|media|hearbeat expired for media server $var(MediaUrl) in zone $var(Zone)\n");
|
|
|
|
#!ifdef PRESENCE_ROLE
|
|
route(RESET_PUBLISHER);
|
|
#!endif
|
|
|
|
}
|
|
|
|
#!ifndef NODES_CUSTOM_BINDINGS
|
|
route[NODES_BINDINGS]
|
|
{
|
|
$var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'nodes-MY_HOSTNAME', 'exclusive' : 0, 'federate' : 1}";
|
|
kazoo_subscribe("$var(payload)");
|
|
}
|
|
#!endif
|
|
|
|
|
|
|
|
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab
|