######## Nodes role - pushes info to kazoo ######## #!ifndef NODES_EXPIRE #!define NODES_EXPIRE 10 #!endif #!ifndef NODES_FUDGE_FACTOR #!define NODES_FUDGE_FACTOR 3 #!endif modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=180"); modparam("htable", "htable", "media=>size=8;initval=0;autoexpire=180"); ####### TIMER module ########## #!ifndef TIMER_LOADED loadmodule "timer.so" #!trydef TIMER_LOADED #!endif modparam("timer", "declare_timer", "NODES_ADVERTISE_TIMER=NODES_ADVERTISE_ROUTE,5000,slow,enable"); modparam("timer", "declare_timer", "NODE_TRACK_TIMER=NODE_TRACK_ROUTE,500,fast,enable"); modparam("timer", "declare_timer", "NODE_HEARTBEAT_TIMER=NODE_HEARTBEAT_ROUTE,500,fast,enable"); ####### MQUEUE module ########## #!ifndef MQUEUE_LOADED loadmodule "mqueue.so" #!trydef MQUEUE_LOADED #!endif modparam("mqueue","mqueue", "name=node_track") modparam("mqueue","mqueue", "name=node_heartbeat") ####### NODES Logic ######## route[NODES_ADVERTISE_ROUTE] { $var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Registrations" : $(stat(registered_users){s.int}), "WhApps" : {"kamailio" : {"Startup" : $Tb }} }'; kazoo_publish("nodes", "", $var(Payload)); } event_route[kazoo:consumer-event-nodes-advertise] { if($shtinc(nodes=>$(kzE{kz.json,Node})::count) == 1) { xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n"); } mq_add("node_heartbeat", "$(kzE{kz.json,Node})", "$kzE"); } event_route[htable:expired:nodes] { if($shtrecord(key) =~ "::count$$") { if($shtrecord(value) == 0) { xlog("L_INFO", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n"); } mq_add("node_track", "$shtrecord(key)", ""); return; } xlog("L_INFO", "htable|nodes|hearbeat expired for node $shtrecord(key)\n"); } route[NODE_TRACK_ROUTE] { $var(runloop) = 1; while(mq_fetch("node_track") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(Key) = $mqk(node_track); $sht(nodes=>$var(Key)) = 0; $var(runloop) = $var(runloop) + 1; } } route[NODE_HEARTBEAT_ROUTE] { $var(runloop) = 1; while(mq_fetch("node_heartbeat") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(Node) = $mqk(node_heartbeat); $var(Payload) = $mqv(node_heartbeat); route(CHECK_MEDIA_SERVERS); $sht(nodes=>$var(Node)) = $var(Payload); $shtex(nodes=>$var(Node)) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR; $var(runloop) = $var(runloop) + 1; } } route[CHECK_MEDIA_SERVERS] { if($(var(Payload){kz.json,Media-Servers}) == "") { return; } $var(Media) = $(var(Payload){kz.json,Media-Servers}); $var(Zone) = $(var(Payload){kz.json,AMQP-Broker-Zone}); $var(Dot) = "."; $var(Perc) = "%"; avp_delete("$avp(MediaKeys)"); if(kazoo_json_keys($var(Payload), "Media-Servers", "$avp(MediaKeys)") == 1) { $var(Count) = $cnt($avp(MediaKeys)); $var(Idx) = 0; while( $var(Idx) < $var(Count) ) { $var(MediaKey) = $(avp(MediaKeys)[$var(Idx)]{s.replace,$var(Dot),$var(Perc)}); $var(MediaUrl1) = $(var(Payload){kz.json,Media-Servers.$var(MediaKey).Interface.URL}); if($var(MediaUrl1) != "") { $var(MediaUrl) = $(var(MediaUrl1){re.subst,/^sip:(.*)@(.*)/sip:\2/}); if($shtinc(media=>$var(MediaUrl)::count) == 1) { $sht(media=>$var(MediaUrl)::zone) = $var(Zone); $shtex(media=>$var(MediaUrl)::zone) = 0; route(MEDIA_SERVER_UP); }; $shtex(media=>$var(MediaUrl)::count) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR; } $var(Idx) = $var(Idx) + 1; } }; } event_route[htable:expired:media] { $var(MediaUrl) = $(shtrecord(key){re.subst,/(.*)::(.*)/\1/}); $var(Zone) = $sht(media=>$var(MediaUrl)::zone); route(MEDIA_SERVER_DOWN); } route[MEDIA_SERVER_UP] { xlog("L_INFO", "nodes|media|$var(Node) reported new media server $var(MediaUrl) in zone $var(Zone)\n"); #!ifdef DISPATCHER_ROLE route(DISPATCHER_CHECK_MEDIA_SERVER); #!endif #!ifdef FAST_PICKUP_ROLE route(FAST_PICKUP_START); #!endif } route[MEDIA_SERVER_DOWN] { xlog("L_INFO", "htable|media|hearbeat expired for media server $var(MediaUrl) in zone $var(Zone)\n"); #!ifdef PRESENCE_ROLE route(RESET_PUBLISHER); #!endif } #!ifndef NODES_CUSTOM_BINDINGS route[NODES_BINDINGS] { $var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'nodes-MY_HOSTNAME', 'exclusive' : 0, 'federate' : 1}"; kazoo_subscribe("$var(payload)"); } #!endif # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab