From 318fe456876feb4c13f2397363188f0dece5f2b5 Mon Sep 17 00:00:00 2001 From: lazedo Date: Fri, 17 Feb 2017 19:05:43 +0000 Subject: [PATCH] Split bindings (#146) * split amqp bindings * add presence reset * use kazoo-bindings * fix clash with route name * fix syntax * create presentity_uri var * change default channels * allow kazoo module to be configured in local.cfg * allow single dialog blf to be configurable --- kamailio/default.cfg | 111 +++-------------------------- kamailio/defs.cfg | 19 +++++ kamailio/kazoo-bindings.cfg | 72 +++++++++++++++++++ kamailio/kazoo-custom-bindings.cfg | 5 ++ kamailio/message-role.cfg | 6 ++ kamailio/presence-role.cfg | 48 +++++++++++++ kamailio/presence_query-role.cfg | 9 ++- kamailio/registrar-role.cfg | 7 ++ kamailio/registrar-sync-role.cfg | 6 ++ 9 files changed, 180 insertions(+), 103 deletions(-) create mode 100644 kamailio/kazoo-bindings.cfg create mode 100644 kamailio/kazoo-custom-bindings.cfg diff --git a/kamailio/default.cfg b/kamailio/default.cfg index 5733297..aa4e5bf 100644 --- a/kamailio/default.cfg +++ b/kamailio/default.cfg @@ -177,7 +177,13 @@ modparam("db_text", "default_connection", "KAZOO_DB_URL") ####### Kazoo Integration module ########## loadmodule "kazoo.so" +modparam("kazoo", "amqp_query_timeout_avp", "$avp(kz_timeout)") modparam("kazoo", "node_hostname", "MY_HOSTNAME") +modparam("kazoo", "amqp_heartbeats", MY_AMQP_HEARTBEATS) +modparam("kazoo", "amqp_max_channels", MY_AMQP_MAX_CHANNELS) +modparam("kazoo", "amqp_consumer_processes", MY_AMQP_CONSUMER_PROCESSES) +modparam("kazoo", "amqp_consumer_workers", MY_AMQP_CONSUMER_WORKERS) +## amqp connections modparam("kazoo", "amqp_connection", "MY_AMQP_URL") #!ifdef MY_AMQP_SECONDARY_URL modparam("kazoo", "amqp_connection", "MY_AMQP_SECONDARY_URL") @@ -185,15 +191,9 @@ modparam("kazoo", "amqp_connection", "MY_AMQP_SECONDARY_URL") #!ifdef MY_AMQP_TERTIARY_URL modparam("kazoo", "amqp_connection", "MY_AMQP_TERTIARY_URL") #!endif -#!ifdef MY_AMQP_MAX_CHANNELS -modparam("kazoo", "amqp_max_channels", MY_AMQP_MAX_CHANNELS) -#!else -modparam("kazoo", "amqp_max_channels", 100) -#!endif -modparam("kazoo", "amqp_query_timeout_avp", "$avp(kz_timeout)"); -modparam("kazoo", "amqp_consumer_processes", 4); -modparam("kazoo", "amqp_consumer_workers", 16); -modparam("kazoo", "amqp_heartbeats", 5); + +###### kazoo bindings ###### +include_file "kazoo-bindings.cfg" ####### Role Configurations ########## #!ifdef DISPATCHER_ROLE @@ -820,99 +820,6 @@ onsend_route { xlog("L_INFO", "$ci|pass|$sndfrom(ip):$sndfrom(port) -> $sndto(ip):$sndto(port)\n"); } -event_route[kazoo:mod-init] -{ - -### use this simple form of binding a listener -### kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME"); -### -### or unleash the power of rabbit to kazoo-blf -### -### 'no_ack' : 1 => needs ack, -### 'wait_for_consumer_ack' -### : 1 => when it receives, it processses on the AMQP Worker ad after that it confirms -### : 0 => when it receives, it acks then processes in the AMQP Worker -### only works if no_ack : 0 -### -### Rabbit Policy for ha-mode -### pattern : ^BLF -### definition : ha-mode: all -### -### - - #!ifdef PRESENCE_ROLE - - $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-dialog-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'dialog.*.*', 'federate' : 1 }"; - kazoo_subscribe("$var(payload)"); - - $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-presence-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'update.*.*', 'federate' : 1 }"; - kazoo_subscribe("$var(payload)"); - - $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-mwi-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'mwi_updates.*', 'federate' : 1 }"; - kazoo_subscribe("$var(payload)"); - - #!endif - - #!ifdef MESSAGE_ROLE - - $var(key) = "kamailio@MY_HOSTNAME"; - $var(payload) = '{ "exchange" : "sms" , "type" : "topic", "queue" : "MSG-QUEUE-MY_HOSTNAME", "routing" : "message.route.' + $(var(key){kz.encode}) + '.*", "auto_delete" : 1, "durable" : 0, "no_ack" : 0, "wait_for_consumer_ack" : 1 }'; - kazoo_subscribe("$var(payload)"); - - #!endif - - #!ifdef DISPATCHER_ROLE - - $var(payload) = "{ 'exchange' : 'registrar' , 'type' : 'topic', 'queue' : 'MSG-FLUSH-MY_HOSTNAME', 'routing' : 'registration.flush.*', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; - kazoo_subscribe("$var(payload)"); - - #!endif - - #!ifdef REGISTRAR_SYNC_ROLE - - $var(payload) = "{ 'exchange' : 'registrar' , 'type' : 'topic', 'queue' : 'REGISTRAR-SYNC-QUEUE-MY_HOSTNAME', 'routing' : 'registration.sync', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; - kazoo_subscribe("$var(payload)"); - - #!endif - - #!ifdef PRESENCE_QUERY_ROLE - - $var(payload) = "{ 'exchange' : 'presence' , 'type' : 'topic', 'queue' : 'presence-search-MY_HOSTNAME', 'exclusive' : 0, 'routing' : 'presence.search_req.*', 'federate' : 1 }"; - kazoo_subscribe("$var(payload)"); - - #!endif - - #!ifdef NODES_SYNC_ROLE - - $var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'NODES-MY_HOSTNAME', 'federate' : 1}"; - kazoo_subscribe("$var(payload)"); - - #!endif - - #!ifdef ACL_ROLE - - $var(payload) = "{ 'exchange' : 'frontier_acl' , 'type' : 'topic', 'queue' : 'FRONTIERACL-FLUSH-MY_HOSTNAME', 'routing' : 'flush', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; - kazoo_subscribe("$var(payload)"); - - #!endif - -} - -event_route[kazoo:consumer-event] -{ - xlog("L_INFO","unhandled AMQP event, payload: $kzE\n"); -} - -event_route[kazoo:consumer-event-connection-open] -{ - xlog("L_INFO","connection to $(kzE{kz.json,host}) opened\n"); -} - -event_route[kazoo:consumer-event-connection-closed] -{ - xlog("L_INFO","connection to $(kzE{kz.json,host}) closed\n"); -} - route[ADD_AUTHORIZATION_HEADERS] { if (!is_method("INVITE|MESSAGE|REFER")) { diff --git a/kamailio/defs.cfg b/kamailio/defs.cfg index 73ae9ea..cff24c3 100644 --- a/kamailio/defs.cfg +++ b/kamailio/defs.cfg @@ -31,5 +31,24 @@ #!substdef "!ANTIFLOOD_CACHE_PERIOD!600!g" #!endif +#!ifndef MY_AMQP_MAX_CHANNELS +#!define MY_AMQP_MAX_CHANNELS 25 +#!endif + +#!ifndef MY_AMQP_CONSUMER_PROCESSES +#!define MY_AMQP_CONSUMER_PROCESSES 4 +#!endif + +#!ifndef MY_AMQP_CONSUMER_WORKERS +#!define MY_AMQP_CONSUMER_WORKERS 16 +#!endif + +#!ifndef MY_AMQP_HEARTBEATS +#!define MY_AMQP_HEARTBEATS 5 +#!endif + +#!ifndef BLF_USE_SINGLE_DIALOG +#!define BLF_USE_SINGLE_DIALOG 1 +#!endif # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/kazoo-bindings.cfg b/kamailio/kazoo-bindings.cfg new file mode 100644 index 0000000..51f2d07 --- /dev/null +++ b/kamailio/kazoo-bindings.cfg @@ -0,0 +1,72 @@ +######## kazoo bindings ######## +### use this simple form of binding a listener +### kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME"); +### +### or unleash the power of rabbit to kazoo-blf +### +### 'no_ack' : 1 => needs ack, +### 'wait_for_consumer_ack' +### : 1 => when it receives, it processses on the AMQP Worker ad after that it confirms +### : 0 => when it receives, it acks then processes in the AMQP Worker +### only works if no_ack : 0 +### +### Rabbit Policy for ha-mode +### pattern : ^BLF +### definition : ha-mode: all +### +### + +include_file "kazoo-custom-bindings.cfg" + +event_route[kazoo:mod-init] +{ + + #!ifdef PRESENCE_ROLE + route(PRESENCE_BINDINGS); + #!endif + + #!ifdef PRESENCE_QUERY_ROLE + route(PRESENCE_QUERY_BINDINGS); + #!endif + + + #!ifdef MESSAGE_ROLE + route(MESSAGE_BINDINGS); + #!endif + + #!ifdef REGISTRAR_ROLE + route(REGISTRAR_BINDINGS); + #!endif + + #!ifdef REGISTRAR_SYNC_ROLE + route(REGISTRAR_SYNC_BINDINGS); + #!endif + + #!ifdef NODES_SYNC_ROLE + $var(payload) = "{ 'exchange' : 'nodes' , 'type' : 'fanout', 'queue' : 'NODES-MY_HOSTNAME', 'federate' : 1}"; + kazoo_subscribe("$var(payload)"); + #!endif + + #!ifdef ACL_ROLE + $var(payload) = "{ 'exchange' : 'frontier_acl' , 'type' : 'topic', 'queue' : 'FRONTIERACL-FLUSH-MY_HOSTNAME', 'routing' : 'flush', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; + kazoo_subscribe("$var(payload)"); + #!endif + +} + +event_route[kazoo:consumer-event] +{ + xlog("L_INFO","unhandled AMQP event, payload: $kzE\n"); +} + +event_route[kazoo:consumer-event-connection-open] +{ + xlog("L_INFO","connection to $(kzE{kz.json,host}) opened\n"); +} + +event_route[kazoo:consumer-event-connection-closed] +{ + xlog("L_INFO","connection to $(kzE{kz.json,host}) closed\n"); +} + +# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/kazoo-custom-bindings.cfg b/kamailio/kazoo-custom-bindings.cfg new file mode 100644 index 0000000..711b78d --- /dev/null +++ b/kamailio/kazoo-custom-bindings.cfg @@ -0,0 +1,5 @@ +######## kazoo custom bindings ######## + + + +# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/message-role.cfg b/kamailio/message-role.cfg index b47f743..bb9b81d 100644 --- a/kamailio/message-role.cfg +++ b/kamailio/message-role.cfg @@ -51,5 +51,11 @@ route[MESSAGE_REPLY] kazoo_publish($var(exchange), $var(RoutingKey), $var(Payload)); } +route[MESSAGE_BINDINGS] +{ + $var(key) = "kamailio@MY_HOSTNAME"; + $var(payload) = '{ "exchange" : "sms" , "type" : "topic", "queue" : "MSG-QUEUE-MY_HOSTNAME", "routing" : "message.route.' + $(var(key){kz.encode}) + '.*", "auto_delete" : 1, "durable" : 0, "no_ack" : 0, "wait_for_consumer_ack" : 1 }'; + kazoo_subscribe("$var(payload)"); +} # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/presence-role.cfg b/kamailio/presence-role.cfg index 11962ab..b4badae 100644 --- a/kamailio/presence-role.cfg +++ b/kamailio/presence-role.cfg @@ -12,6 +12,8 @@ loadmodule "presence_mwi.so" loadmodule "presence_xml.so" modparam("presence_dialoginfo", "force_dummy_dialog", 1) +modparam("presence_dialoginfo", "force_single_dialog", BLF_USE_SINGLE_DIALOG) + modparam("presence_xml", "force_dummy_presence", 1) modparam("presence_xml", "force_active", 1) modparam("presence_xml", "disable_winfo", 1) @@ -224,6 +226,34 @@ event_route[kazoo:consumer-event-presence-update] } +event_route[kazoo:consumer-event-presence-reset] +{ + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|received presence reset for $(kzE{kz.json,Username})@$(kzE{kz.json,Realm})\n"); + $var(presentity) = "sip:" + $(kzE{kz.json,Username}) + "@" + $(kzE{kz.json,Realm}); + sql_query("exec", 'delete from presentity where domain="$(kzE{kz.json,Realm})" and username = "$(kzE{kz.json,Username})"'); + if(pres_has_subscribers("$var(presentity)", "message-summary")) { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|notifying message-summary subscribers of $var(presentity)\n"); + pres_refresh_watchers("$var(presentity)", "message-summary", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|skip message-summary subscriber notification for $var(presentity))\n"); + } + + if(pres_has_subscribers("$var(presentity)", "presence")) { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|notifying presence subscribers of $var(presentity)\n"); + pres_refresh_watchers("$var(presentity)", "presence", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|skip presence subscriber notification for $var(presentity))\n"); + } + + if(pres_has_subscribers("$var(presentity)", "dialog")) { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|notifying dialog subscribers of $var(presentity)\n"); + pres_refresh_watchers("$var(presentity)", "dialog", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|skip dialog subscriber notification for $var(presentity))\n"); + } + +} + route[HANDLE_PRESENCE_UPDATE] { $var(call-id) = $(kzE{kz.json,Call-ID}); @@ -251,4 +281,22 @@ route[HANDLE_PRESENCE_UPDATE] } } +#!ifndef PRESENCE_CUSTOM_BINDINGS +route[PRESENCE_BINDINGS] +{ + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-dialog-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'dialog.*.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); + + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-presence-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'update.*.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); + + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-mwi-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'mwi_updates.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); + + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-reset-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'presence.reset.*.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); + +} +#!endif + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/presence_query-role.cfg b/kamailio/presence_query-role.cfg index ca8e83a..c9826c5 100644 --- a/kamailio/presence_query-role.cfg +++ b/kamailio/presence_query-role.cfg @@ -66,9 +66,10 @@ route[PRESENCE_SEARCH_DETAIL] $var(Event) = $(kzE{kz.json,Event-Package}); $var(Domain) = $(kzE{kz.json,Realm}); $var(Username) = $(kzE{kz.json,Username}); + $var(presentity_uri) = "sip:" + $var(Username) + "@" + $var(Domain); $var(Now) = $TS; $var(Items) = ""; - $var(Query) = $_s(select * from active_watchers_log where presentity_uri = "sip:$var(Username)@$var(Domain)"); + $var(Query) = $_s(select * from active_watchers_log where presentity_uri = "$var(presentity_uri)"); if($var(Event) != "") { $var(Query) = $var(Query) + $_s( and event = "$var(Event)"); } @@ -109,4 +110,10 @@ event_route[kazoo:consumer-event-presence-search-req] } } +route[PRESENCE_QUERY_BINDINGS] +{ + $var(payload) = "{ 'exchange' : 'presence' , 'type' : 'topic', 'queue' : 'presence-search-MY_HOSTNAME', 'exclusive' : 0, 'routing' : 'presence.search_req.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); +} + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/registrar-role.cfg b/kamailio/registrar-role.cfg index d01852c..1961fbc 100644 --- a/kamailio/registrar-role.cfg +++ b/kamailio/registrar-role.cfg @@ -308,4 +308,11 @@ event_route[kazoo:consumer-event-directory-reg-flush] #!endif } +route[REGISTRAR_BINDINGS] +{ + $var(payload) = "{ 'exchange' : 'registrar' , 'type' : 'topic', 'queue' : 'registrar-flush-MY_HOSTNAME', 'routing' : 'registration.flush.*', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; + kazoo_subscribe("$var(payload)"); + +} + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/registrar-sync-role.cfg b/kamailio/registrar-sync-role.cfg index 918f005..1803cd3 100644 --- a/kamailio/registrar-sync-role.cfg +++ b/kamailio/registrar-sync-role.cfg @@ -30,3 +30,9 @@ event_route[kazoo:consumer-event-directory-reg-sync] } } + +route[REGISTRAR_SYNC_BINDINGS] +{ + $var(payload) = "{ 'exchange' : 'registrar' , 'type' : 'topic', 'queue' : 'registra-sync-MY_HOSTNAME', 'routing' : 'registration.sync', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; + kazoo_subscribe("$var(payload)"); +}