######## Nodes role - pushes info to kazoo ######## #!ifndef NODES_EXPIRE #!define NODES_EXPIRE 10 #!endif #!ifndef NODES_FUDGE_FACTOR #!define NODES_FUDGE_FACTOR 3 #!endif modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=180"); modparam("htable", "htable", "media=>size=8;initval=0;autoexpire=180"); ####### TIMER module ########## #!ifndef TIMER_LOADED loadmodule "timer.so" #!trydef TIMER_LOADED #!endif modparam("timer", "declare_timer", "NODES_ADVERTISE_TIMER=NODES_ADVERTISE_ROUTE,5000,slow,enable"); modparam("timer", "declare_timer", "NODE_TRACK_TIMER=NODE_TRACK_ROUTE,500,fast,enable"); modparam("timer", "declare_timer", "NODE_HEARTBEAT_TIMER=NODE_HEARTBEAT_ROUTE,500,fast,enable"); modparam("mqueue","mqueue", "name=node_track") modparam("mqueue","mqueue", "name=node_heartbeat") ####### NODES Logic ######## route[NODES_ADVERTISE_ROUTE] { #!ifdef REGISTRAR_ROLE $var(Registrar) = $_s("Registrar" : {"Registrations" : $(stat(registered_users){s.int})}); #!else $var(Registrar) = ""; #!endif #!ifdef DISPATCHER_ROLE route(DISPATCHER_SUMMARY); $var(Dispatcher) = $_s("Dispatcher" : {"Groups" : { $var(ds_groups_json) }}); #!else $var(Dispatcher) = ""; #!endif #!ifdef PRESENCE_ROLE route(COUNT_ALL_SUBSCRIBERS); $var(Subscriptions) = $_s("Subscriptions" : {"message-summary" : $xavp(watchers=>message-summary), "dialog" : $xavp(watchers=>dialog), "presence" : $xavp(watchers=>presence)}); route(COUNT_ALL_PRESENTITIES); $var(Presentities) = $_s("Presentities" : {"message-summary" : $xavp(watchers=>message-summary), "dialog" : $xavp(watchers=>dialog), "presence" : $xavp(watchers=>presence)}); $var(Presence) = $_s("Presence" : {$var(Subscriptions), $var(Presentities)}); #!else $var(Presence) = ""; #!endif $var(Roles) = $_s("Roles" : {$var(Dispatcher), $var(Presence) , $var(Registrar)}); $var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Startup" : $Tb, "XWhApps" : {"kamailio" : {"Startup" : $Tb }}, $var(Roles)}'; kazoo_publish("nodes", "", $var(Payload)); } event_route[kazoo:consumer-event-nodes-advertise] { $var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count); if($var(count) == 0) { xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat for reconnected node $(kzE{kz.json,Node})\n"); $var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count); } else { if($var(count) == 1) { xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n"); } } mq_add("node_heartbeat", "$(kzE{kz.json,Node})", "$kzE"); } event_route[htable:expired:nodes] { if($shtrecord(key) =~ "::count$$") { if($shtrecord(value) == -1) { xlog("L_INFO", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n"); } mq_add("node_track", "$shtrecord(key)", ""); return; } xlog("L_INFO", "htable|nodes|hearbeat expired for node $shtrecord(key)\n"); } route[NODE_TRACK_ROUTE] { $var(runloop) = 1; while(mq_fetch("node_track") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(Key) = $mqk(node_track); $sht(nodes=>$var(Key)) = -1; $var(runloop) = $var(runloop) + 1; } } route[NODE_HEARTBEAT_ROUTE] { $var(runloop) = 1; while(mq_fetch("node_heartbeat") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(Node) = $mqk(node_heartbeat); $var(Payload) = $mqv(node_heartbeat); route(CHECK_MEDIA_SERVERS); $sht(nodes=>$var(Node)) = $var(Payload); $shtex(nodes=>$var(Node)) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR; $var(runloop) = $var(runloop) + 1; } } route[CHECK_MEDIA_SERVERS] { if($(var(Payload){kz.json,Media-Servers}) == "") { return; } $var(Media) = $(var(Payload){kz.json,Media-Servers}); $var(Zone) = $(var(Payload){kz.json,AMQP-Broker-Zone}); $var(Dot) = "."; $var(Perc) = "%"; avp_delete("$avp(MediaKeys)/g"); if(kazoo_json_keys($var(Payload), "Media-Servers", "$avp(MediaKeys)") == 1) { $var(Count) = $cnt($avp(MediaKeys)); $var(Idx) = 0; while( $var(Idx) < $var(Count) ) { $var(MediaKey) = $(avp(MediaKeys)[$var(Idx)]{s.replace,$var(Dot),$var(Perc)}); avp_delete("$avp(ProfileKeys)/g"); if(kazoo_json_keys($var(Payload), "Media-Servers.$var(MediaKey).Interfaces", "$avp(ProfileKeys)") == 1) { $var(ProfileCount) = $cnt($avp(ProfileKeys)); $var(ProfileIdx) = 0; while( $var(ProfileIdx) < $var(ProfileCount) ) { $var(MediaRawUrl) = $(var(Payload){kz.json,Media-Servers.$var(MediaKey).Interfaces.$(avp(ProfileKeys)[$var(ProfileIdx)]).URL}); $var(MediaUrl) = $(var(MediaRawUrl){re.subst,/^sip:(.*)@(.*)/sip:\2/}); if($shtinc(media=>$var(MediaUrl)::count) == 1) { $sht(media=>$var(MediaUrl)::zone) = $var(Zone); $shtex(media=>$var(MediaUrl)::zone) = 0; route(MEDIA_SERVER_UP); }; $shtex(media=>$var(MediaUrl)::count) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR; $var(ProfileIdx) = $var(ProfileIdx) + 1; } } $var(Idx) = $var(Idx) + 1; } }; } event_route[htable:expired:media] { $var(MediaUrl) = $(shtrecord(key){re.subst,/(.*)::(.*)/\1/}); $var(Zone) = $sht(media=>$var(MediaUrl)::zone); route(MEDIA_SERVER_DOWN); } route[MEDIA_SERVER_UP] { xlog("L_INFO", "nodes|media|$var(Node) reported new media server $var(MediaUrl) in zone $var(Zone)\n"); #!ifdef DISPATCHER_ROLE route(DISPATCHER_CHECK_MEDIA_SERVER); #!endif #!ifdef FAST_PICKUP_ROLE route(FAST_PICKUP_START); #!endif } route[MEDIA_SERVER_DOWN] { xlog("L_INFO", "htable|media|hearbeat expired for media server $var(MediaUrl) in zone $var(Zone)\n"); #!ifdef PRESENCE_ROLE route(RESET_PUBLISHER); #!endif } #!ifndef NODES_CUSTOM_BINDINGS route[NODES_BINDINGS] { $var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'nodes-MY_HOSTNAME', 'exclusive' : 0, 'federate' : 1}"; kazoo_subscribe("$var(payload)"); } #!endif # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab