From c4e9b9044ecf671670681639bec526f1cddb4894 Mon Sep 17 00:00:00 2001 From: lazedo Date: Tue, 4 Apr 2017 22:07:09 +0100 Subject: [PATCH] Kazoo 5414 (#156) * add defaults for zone * lowercase queue name for nodes * first step at node synchronization * fix expires sent to amqp * move bindings * add node expiration logic --- kamailio/default.cfg | 1 + kamailio/defs.cfg | 4 +++ kamailio/kazoo-bindings.cfg | 5 ++- kamailio/nodes-role.cfg | 68 +++++++++++++++++++++++++++++++++++-- kamailio/presence-role.cfg | 10 +----- 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/kamailio/default.cfg b/kamailio/default.cfg index 48ab73d..905d283 100644 --- a/kamailio/default.cfg +++ b/kamailio/default.cfg @@ -177,6 +177,7 @@ modparam("db_text", "default_connection", "KAZOO_DB_URL") ####### Kazoo Integration module ########## loadmodule "kazoo.so" +modparam("kazoo", "amqp_primary_zone", MY_AMQP_ZONE) modparam("kazoo", "amqp_query_timeout_avp", "$avp(kz_timeout)") modparam("kazoo", "node_hostname", "MY_HOSTNAME") modparam("kazoo", "amqp_heartbeats", MY_AMQP_HEARTBEATS) diff --git a/kamailio/defs.cfg b/kamailio/defs.cfg index cff24c3..da6d4db 100644 --- a/kamailio/defs.cfg +++ b/kamailio/defs.cfg @@ -51,4 +51,8 @@ #!define BLF_USE_SINGLE_DIALOG 1 #!endif +#ifndef MY_AMQP_ZONE +#!define MY_AMQP_ZONE "local" +#!endif + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/kazoo-bindings.cfg b/kamailio/kazoo-bindings.cfg index 51f2d07..f41caf5 100644 --- a/kamailio/kazoo-bindings.cfg +++ b/kamailio/kazoo-bindings.cfg @@ -42,9 +42,8 @@ event_route[kazoo:mod-init] route(REGISTRAR_SYNC_BINDINGS); #!endif - #!ifdef NODES_SYNC_ROLE - $var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'NODES-MY_HOSTNAME', 'federate' : 1}"; - kazoo_subscribe("$var(payload)"); + #!ifdef NODES_ROLE + route(NODES_BINDINGS); #!endif #!ifdef ACL_ROLE diff --git a/kamailio/nodes-role.cfg b/kamailio/nodes-role.cfg index ff32b33..fbc9356 100644 --- a/kamailio/nodes-role.cfg +++ b/kamailio/nodes-role.cfg @@ -1,15 +1,79 @@ ######## Nodes role - pushes info to kazoo ######## + +#!ifndef NODES_EXPIRE +#!define NODES_EXPIRE 10 +#!endif + +#!ifndef NODES_FUDGE_FACTOR +#!define NODES_FUDGE_FACTOR 3 +#!endif + +modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=180"); + +####### TIMER module ########## #!ifndef TIMER_LOADED loadmodule "timer.so" #!trydef TIMER_LOADED #!endif -modparam("timer", "declare_timer", "NODES_TIMER=NODES_TIMER_ROUTE,5000,slow,enable"); +modparam("timer", "declare_timer", "NODES_ADVERTISE_TIMER=NODES_ADVERTISE_ROUTE,5000,slow,enable"); +modparam("timer", "declare_timer", "NODE_TRACK_TIMER=NODE_TRACK_ROUTE,500,fast,enable"); + +####### MQUEUE module ########## +#!ifndef MQUEUE_LOADED +loadmodule "mqueue.so" +#!trydef MQUEUE_LOADED +#!endif +modparam("mqueue","mqueue", "name=node_track") + ####### NODES Logic ######## -route[NODES_TIMER_ROUTE] +route[NODES_ADVERTISE_ROUTE] { $var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Registrations" : $(stat(registered_users){s.int}), "WhApps" : {"kamailio" : {"Startup" : $Tb }} }'; kazoo_publish("nodes", "", $var(Payload)); } + +event_route[kazoo:consumer-event-nodes-advertise] +{ + if($shtinc(nodes=>$(kzE{kz.json,Node})::count) == 1) { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n"); + } + $sht(nodes=>$(kzE{kz.json,Node})) = $kzE; + $shtex(nodes=>$(kzE{kz.json,Node})) = ($(kzE{kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR; + +} + +event_route[htable:expired:nodes] +{ + if($shtrecord(key) =~ "::count$$") { + if($shtrecord(value) == 0) { + xlog("L_INFO", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n"); + } + mq_add("node_track", "$shtrecord(key)", ""); + return; + } + + xlog("L_INFO", "htable|nodes|hearbeat expired for node $shtrecord(key)\n"); +} + +route[NODE_TRACK_ROUTE] +{ + $var(runloop) = 1; + while(mq_fetch("node_track") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { + $var(Key) = $mqk(node_track); + $sht(nodes=>$var(Key)) = 0; + $var(runloop) = $var(runloop) + 1; + } +} + +#!ifndef NODES_CUSTOM_BINDINGS +route[NODES_BINDINGS] +{ + $var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'nodes-MY_HOSTNAME', 'exclusive' : 0, 'federate' : 1}"; + kazoo_subscribe("$var(payload)"); +} +#!endif + + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/presence-role.cfg b/kamailio/presence-role.cfg index 1a302e2..0cb15ba 100644 --- a/kamailio/presence-role.cfg +++ b/kamailio/presence-role.cfg @@ -152,17 +152,9 @@ route[HANDLE_NEW_SUBSCRIBE] route[SUBSCRIBE_AMQP] { - $var(Expires) = $hdr(Expires); - if($var(Expires) < PRESENCE_MIN_EXPIRES) { - $var(Expires) = PRESENCE_MIN_EXPIRES; - } else if($var(Expires) > PRESENCE_MAX_EXPIRES) { - $var(Expires) = PRESENCE_MAX_EXPIRES; - } - $var(rk) = "subscribe." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode}); - $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : "$var(Expires)", "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" }); + $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : $sub(expires), "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" }); kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request)); - } route[HANDLE_PUBLISH]