Browse Source

Kazoo 5414 (#156)

* add defaults for zone

* lowercase queue name for nodes

* first step at node synchronization

* fix expires sent to amqp

* move bindings

* add node expiration logic
KAZOO-5650
lazedo 9 years ago
committed by bitbashing
parent
commit
c4e9b9044e
5 changed files with 74 additions and 14 deletions
  1. +1
    -0
      kamailio/default.cfg
  2. +4
    -0
      kamailio/defs.cfg
  3. +2
    -3
      kamailio/kazoo-bindings.cfg
  4. +66
    -2
      kamailio/nodes-role.cfg
  5. +1
    -9
      kamailio/presence-role.cfg

+ 1
- 0
kamailio/default.cfg View File

@ -177,6 +177,7 @@ modparam("db_text", "default_connection", "KAZOO_DB_URL")
####### Kazoo Integration module ########## ####### Kazoo Integration module ##########
loadmodule "kazoo.so" loadmodule "kazoo.so"
modparam("kazoo", "amqp_primary_zone", MY_AMQP_ZONE)
modparam("kazoo", "amqp_query_timeout_avp", "$avp(kz_timeout)") modparam("kazoo", "amqp_query_timeout_avp", "$avp(kz_timeout)")
modparam("kazoo", "node_hostname", "MY_HOSTNAME") modparam("kazoo", "node_hostname", "MY_HOSTNAME")
modparam("kazoo", "amqp_heartbeats", MY_AMQP_HEARTBEATS) modparam("kazoo", "amqp_heartbeats", MY_AMQP_HEARTBEATS)


+ 4
- 0
kamailio/defs.cfg View File

@ -51,4 +51,8 @@
#!define BLF_USE_SINGLE_DIALOG 1 #!define BLF_USE_SINGLE_DIALOG 1
#!endif #!endif
#ifndef MY_AMQP_ZONE
#!define MY_AMQP_ZONE "local"
#!endif
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab

+ 2
- 3
kamailio/kazoo-bindings.cfg View File

@ -42,9 +42,8 @@ event_route[kazoo:mod-init]
route(REGISTRAR_SYNC_BINDINGS); route(REGISTRAR_SYNC_BINDINGS);
#!endif #!endif
#!ifdef NODES_SYNC_ROLE
$var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'NODES-MY_HOSTNAME', 'federate' : 1}";
kazoo_subscribe("$var(payload)");
#!ifdef NODES_ROLE
route(NODES_BINDINGS);
#!endif #!endif
#!ifdef ACL_ROLE #!ifdef ACL_ROLE


+ 66
- 2
kamailio/nodes-role.cfg View File

@ -1,15 +1,79 @@
######## Nodes role - pushes info to kazoo ######## ######## Nodes role - pushes info to kazoo ########
#!ifndef NODES_EXPIRE
#!define NODES_EXPIRE 10
#!endif
#!ifndef NODES_FUDGE_FACTOR
#!define NODES_FUDGE_FACTOR 3
#!endif
modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=180");
####### TIMER module ##########
#!ifndef TIMER_LOADED #!ifndef TIMER_LOADED
loadmodule "timer.so" loadmodule "timer.so"
#!trydef TIMER_LOADED #!trydef TIMER_LOADED
#!endif #!endif
modparam("timer", "declare_timer", "NODES_TIMER=NODES_TIMER_ROUTE,5000,slow,enable");
modparam("timer", "declare_timer", "NODES_ADVERTISE_TIMER=NODES_ADVERTISE_ROUTE,5000,slow,enable");
modparam("timer", "declare_timer", "NODE_TRACK_TIMER=NODE_TRACK_ROUTE,500,fast,enable");
####### MQUEUE module ##########
#!ifndef MQUEUE_LOADED
loadmodule "mqueue.so"
#!trydef MQUEUE_LOADED
#!endif
modparam("mqueue","mqueue", "name=node_track")
####### NODES Logic ######## ####### NODES Logic ########
route[NODES_TIMER_ROUTE]
route[NODES_ADVERTISE_ROUTE]
{ {
$var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Registrations" : $(stat(registered_users){s.int}), "WhApps" : {"kamailio" : {"Startup" : $Tb }} }'; $var(Payload) = '{"Event-Category" : "nodes", "Event-Name" : "advertise", "Expires" : 5000, "Used-Memory" : $(stat(real_used_size){s.int}), "Registrations" : $(stat(registered_users){s.int}), "WhApps" : {"kamailio" : {"Startup" : $Tb }} }';
kazoo_publish("nodes", "", $var(Payload)); kazoo_publish("nodes", "", $var(Payload));
} }
event_route[kazoo:consumer-event-nodes-advertise]
{
if($shtinc(nodes=>$(kzE{kz.json,Node})::count) == 1) {
xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|nodes|hearbeat from new node $(kzE{kz.json,Node})\n");
}
$sht(nodes=>$(kzE{kz.json,Node})) = $kzE;
$shtex(nodes=>$(kzE{kz.json,Node})) = ($(kzE{kz.json,Expires}{s.int}) / 1000) + NODES_FUDGE_FACTOR;
}
event_route[htable:expired:nodes]
{
if($shtrecord(key) =~ "::count$$") {
if($shtrecord(value) == 0) {
xlog("L_INFO", "htable|nodes|node $(shtrecord(key){s.rm,::count}) is still unreachable\n");
}
mq_add("node_track", "$shtrecord(key)", "");
return;
}
xlog("L_INFO", "htable|nodes|hearbeat expired for node $shtrecord(key)\n");
}
route[NODE_TRACK_ROUTE]
{
$var(runloop) = 1;
while(mq_fetch("node_track") == 1 && $var(runloop) < MAX_WHILE_LOOPS) {
$var(Key) = $mqk(node_track);
$sht(nodes=>$var(Key)) = 0;
$var(runloop) = $var(runloop) + 1;
}
}
#!ifndef NODES_CUSTOM_BINDINGS
route[NODES_BINDINGS]
{
$var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'nodes-MY_HOSTNAME', 'exclusive' : 0, 'federate' : 1}";
kazoo_subscribe("$var(payload)");
}
#!endif
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab

+ 1
- 9
kamailio/presence-role.cfg View File

@ -152,17 +152,9 @@ route[HANDLE_NEW_SUBSCRIBE]
route[SUBSCRIBE_AMQP] route[SUBSCRIBE_AMQP]
{ {
$var(Expires) = $hdr(Expires);
if($var(Expires) < PRESENCE_MIN_EXPIRES) {
$var(Expires) = PRESENCE_MIN_EXPIRES;
} else if($var(Expires) > PRESENCE_MAX_EXPIRES) {
$var(Expires) = PRESENCE_MAX_EXPIRES;
}
$var(rk) = "subscribe." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode}); $var(rk) = "subscribe." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode});
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : "$var(Expires)", "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : $sub(expires), "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request)); kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request));
} }
route[HANDLE_PUBLISH] route[HANDLE_PUBLISH]


Loading…
Cancel
Save