######## Nodes role - pushes info to kazoo ######## #!trydef NODES_EXPIRE 10 #!trydef NODES_FUDGE_EXPIRE 45 modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=60"); modparam("htable", "htable", "media=>size=8;initval=0;autoexpire=60"); modparam("rtimer", "timer", "name=ta;interval=2;mode=2;") modparam("rtimer", "timer", "name=retry;interval=5;mode=2;") modparam("rtimer", "timer", "name=pub;interval=10;mode=1;") modparam("rtimer", "exec", "timer=ta;route=NODE_HEARTBEAT_ROUTE") modparam("rtimer", "exec", "timer=retry;route=NODE_TRACK_ROUTE") modparam("rtimer", "exec", "timer=pub;route=NODES_ADVERTISE_ROUTE") modparam("mqueue","mqueue", "name=node_track") modparam("mqueue","mqueue", "name=node_heartbeat") ####### NODES Logic ######## route[NODES_ADVERTISE_ROUTE] { route(LISTENER_STATUS); #!ifdef DISPATCHER_ROLE route(DISPATCHER_STATUS); $var(Dispatcher) = $_s(, "Dispatcher" : {"Groups" : { $var(ds_groups_json) }}); #!else $var(Dispatcher) = ""; #!endif #!ifdef PRESENCE_ROLE route(COUNT_ALL_SUBSCRIBERS); $var(Subscriptions) = $_s("Subscriptions" : { $var(Subscriptions) }); $var(Subscribers) = $_s("Subscribers" : { $var(Subscribers) }); route(COUNT_ALL_PRESENTITIES); $var(Presentities) = $_s("Presentities" : {"message-summary" : $xavp(watchers=>message-summary), "dialog" : $xavp(watchers=>dialog), "presence" : $xavp(watchers=>presence)}); $var(Presence) = $_s(, "Presence" : {$var(Subscribers), $var(Subscriptions), $var(Presentities)}); #!else $var(Presence) = ""; #!endif #!ifdef REGISTRAR_ROLE $var(Registrar) = $_s(, "Registrar" : {"Registrations" : $(stat(registered_users){s.int})}); #!else $var(Registrar) = ""; #!endif $var(Roles) = $_s("Roles" : {"Proxy" : $var(listeners) $var(Dispatcher) $var(Presence) $var(Registrar)}); $var(Payload) = $_s({"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 15000, "Used-Memory" : $(stat(real_used_size){s.int}), "Startup" : $Tb, "WhApps" : {"kamailio" : {"Startup" : $Tb }}, $var(Roles)}); kazoo_publish("nodes", "", $var(Payload)); } event_route[kazoo:consumer-event-nodes-advertise] { $var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count); if($var(count) == 0) { xlog("L_WARNING", "$(kzE{kz.json,Msg-ID})|nodes|heartbeat for reconnected node $(kzE{kz.json,Node})\n"); $var(count) = $shtinc(nodes=>$(kzE{kz.json,Node})::count); } else { if($var(count) == 1) { xlog("L_WARNING", "$(kzE{kz.json,Msg-ID})|nodes|heartbeat from new node $(kzE{kz.json,Node})\n"); } else { xlog("L_DEBUG", "$(kzE{kz.json,Msg-ID})|nodes|heartbeat from existing node $(kzE{kz.json,Node})\n"); } } mq_add("node_heartbeat", "$(kzE{kz.json,Node})", "$kzE"); } event_route[htable:expired:nodes] { if($shtrecord(key) =~ "::count$$") { if($shtrecord(value) == -1) { xlog("L_WARNING", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n"); } mq_add("node_track", "$shtrecord(key)", ""); return; } xlog("L_WARNING", "htable|nodes|heartbeat expired for node $shtrecord(key)\n"); } route[NODE_TRACK_ROUTE] { $var(runloop) = 1; while(mq_fetch("node_track") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(Key) = $mqk(node_track); $sht(nodes=>$var(Key)) = -1; $var(runloop) = $var(runloop) + 1; } } route[NODE_HEARTBEAT_ROUTE] { $var(runloop) = 1; while(mq_fetch("node_heartbeat") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(Node) = $mqk(node_heartbeat); $var(Payload) = $mqv(node_heartbeat); xlog("L_DEBUG", "$(var(Payload){kz.json,Msg-ID})|nodes|processing heartbeat for node $var(Node)\n"); route(CHECK_MEDIA_SERVERS); $sht(nodes=>$var(Node)) = $var(Payload); $shtex(nodes=>$var(Node)) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_EXPIRE; $var(runloop) = $var(runloop) + 1; } } route[CHECK_MEDIA_SERVERS] { if($(var(Payload){kz.json,Media-Servers}) == "") { return; } $var(Media) = $(var(Payload){kz.json,Media-Servers}); $var(Zone) = $(var(Payload){kz.json,AMQP-Broker-Zone}); $var(Dot) = "."; $var(Perc) = "%"; avp_delete("$avp(MediaKeys)/g"); if(kazoo_json_keys($var(Payload), "Media-Servers", "$avp(MediaKeys)") == 1) { $var(Count) = $cnt($avp(MediaKeys)); $var(Idx) = 0; while( $var(Idx) < $var(Count) ) { $var(MediaName) = $(avp(MediaKeys)[$var(Idx)]); $var(MediaKey) = $(var(MediaName){s.replace,$var(Dot),$var(Perc)}); avp_delete("$avp(ProfileKeys)/g"); if(kazoo_json_keys($var(Payload), "Media-Servers.$var(MediaKey).Interfaces", "$avp(ProfileKeys)") == 1) { $var(ProfileCount) = $cnt($avp(ProfileKeys)); $var(ProfileIdx) = 0; while( $var(ProfileIdx) < $var(ProfileCount) ) { $var(MediaProfile) = $(avp(ProfileKeys)[$var(ProfileIdx)]); $var(MediaRawUrl) = $(var(Payload){kz.json,Media-Servers.$var(MediaKey).Interfaces.$var(MediaProfile).URL}); $var(MediaUrl) = $(var(MediaRawUrl){re.subst,/^sip:(.*)@(.*)/sip:\2/}); if($shtinc(media=>$var(MediaUrl)::count) == 1) { $sht(media=>$var(MediaUrl)::zone) = $var(Zone); $shtex(media=>$var(MediaUrl)::zone) = 0; route(MEDIA_SERVER_UP); }; $var(MediaExpire) = ($(var(Payload){kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_EXPIRE; xlog("L_DEBUG", "nodes|media|$var(Node) media expiration $var(MediaExpire) for $var(MediaUrl)\n"); $shtex(media=>$var(MediaUrl)::count) = $var(MediaExpire); $var(ProfileIdx) = $var(ProfileIdx) + 1; } } $var(Idx) = $var(Idx) + 1; } }; } event_route[htable:expired:media] { $var(MediaUrl) = $(shtrecord(key){re.subst,/(.*)::(.*)/\1/}); $var(Zone) = $sht(media=>$var(MediaUrl)::zone); route(MEDIA_SERVER_DOWN); } route[MEDIA_SERVER_UP] { xlog("L_WARNING", "nodes|media|$var(Node) reported new media server $var(MediaUrl) in zone $var(Zone)\n"); #!ifdef DISPATCHER_ROLE route(DISPATCHER_CHECK_MEDIA_SERVER); #!endif #!ifdef FAST_PICKUP_ROLE route(FAST_PICKUP_START); #!endif } route[MEDIA_SERVER_DOWN] { xlog("L_WARNING", "htable|media|heartbeat expired for media server $var(MediaUrl) in zone $var(Zone)\n"); #!ifdef PRESENCE_ROLE route(RESET_PUBLISHER); #!endif } #!ifndef NODES_CUSTOM_BINDINGS route[NODES_BINDINGS] { $var(payload) = $_s({"exchange" : "nodes" , "type" : "fanout", "queue" : "nodes-MY_HOSTNAME", "exclusive":0, "federate":1 }); kazoo_subscribe("$var(payload)"); } #!endif route[LISTENER_STATUS] { jsonrpc_exec('{"jsonrpc": "2.0", "method": "corex.list_sockets", "id": 1}'); $var(count) = $(jsonrpl(body){kz.json.count,result}); $var(loop) = 0; $var(sep) = ""; $var(listeners) = ""; while( $var(loop) < $var(count) ) { $var(listener) = $(jsonrpl(body){kz.json,result[$var(loop)]}); $var(proto) = $(var(listener){kz.json,PROTO}); $var(address) = $(var(listener){kz.json,ADDRLIST.ADDR}); $var(port) = $(var(listener){kz.json,PORT}); if($var(port) == "WS_PORT") { $var(proto) = "ws"; } if($var(port) == "WSS_PORT") { $var(proto) = "wss"; } $var(uri) = $_s($var(proto):$var(address):$var(port)); if($(var(listener){kz.json,ADVERTISE}) != "-") { $var(advertise) = $_s( , "advertise" : "$(var(listener){kz.json,ADVERTISE})"); } else { $var(advertise) = ""; } $var(x) = $_s("$var(uri)" : {"proto" : "$var(proto)", "address" : "$var(address)", "port" : $var(port) $var(advertise) }); $var(listeners) = $_s($var(listeners)$var(sep)$var(x)); $var(loop) = $var(loop) + 1; $var(sep) = " , "; } $var(listeners) = $_s({"Listeners" : { $var(listeners) }}); } # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab