From 7d4380b2f61309fd13ff5762b28c21f7d600d668 Mon Sep 17 00:00:00 2001 From: bitbashing Date: Mon, 2 Nov 2020 15:01:11 -0800 Subject: [PATCH] Adding a new AMQP registrar search API (cherry picked from commit b58e7c3a3635bdda60463870ec3b2eeeb3e77296) --- kamailio/db_queries_kazoo.cfg | 3 ++ kamailio/db_queries_mysql.cfg | 2 + kamailio/db_queries_postgres.cfg | 2 + kamailio/registrar-query.cfg | 82 ++++++++++++++++++++++++++++++++ kamailio/registrar-role.cfg | 15 ++++++ 5 files changed, 104 insertions(+) create mode 100644 kamailio/registrar-query.cfg diff --git a/kamailio/db_queries_kazoo.cfg b/kamailio/db_queries_kazoo.cfg index f3de127..2df3a20 100644 --- a/kamailio/db_queries_kazoo.cfg +++ b/kamailio/db_queries_kazoo.cfg @@ -12,6 +12,9 @@ #!substdef "!KZQ_PRESENCE_SEARCH_SUMMARY!select * from active_watchers where to_domain = \"\$var(Domain)\"!g" #!substdef "!KZQ_PRESENCE_SEARCH_DETAIL!select a.*, b.time, b.result, b.sent_msg, b.received_msg from active_watchers a left outer join active_watchers_log b on a.presentity_uri = b.presentity_uri and a.event = b.event and a.callid = b.callid where a.presentity_uri = \"\$var(presentity_uri)\" !g" +#!substdef "!KZQ_REGISTRAR_SEARCH_SUMMARY!select username,domain from location where domain = \"\$var(Domain)\"!g" +#!substdef "!KZQ_REGISTRAR_SEARCH_DETAIL!select * from location where domain = \"\$var(Domain)\"!g" + #!substdef "!KZQ_HAS_PRESENTITY!select count(*) as count from presentity where username = \"\$subs(to_user)\" and domain = \"\$subs(to_domain)\" and event = \"\$subs(event)\"!g" #!substdef "!KZQ_REPLACE_WATCHERS_LOG!REPLACE INTO active_watchers_log (presentity_uri, watcher_username, watcher_domain, event, callid, to_user, to_domain, user_agent, time, result, sent_msg, received_msg) VALUES (\"\$subs(uri)\", \"\$subs(watcher_username)\", \"\$subs(watcher_domain)\", \"\$subs(event)\",\"\$subs(callid)\",\"\$subs(to_user)\",\"\$subs(to_domain)\", '\$(subs(user_agent){s.escape.common}{s.replace,\\\',''}{s.replace,\$\$,})', \$TS, \$notify_reply(\$rs), '\$(mb{s.escape.common}{s.replace,\\\',''}{s.replace,\$\$,})', '\$(notify_reply(\$mb){s.escape.common}{s.replace,\\\',''}{s.replace,\$\$,})')!g" diff --git a/kamailio/db_queries_mysql.cfg b/kamailio/db_queries_mysql.cfg index 7385add..d9141c5 100644 --- a/kamailio/db_queries_mysql.cfg +++ b/kamailio/db_queries_mysql.cfg @@ -11,5 +11,7 @@ #!substdef "!KZQ_RESET_PUBLISHER_UPDATE!update active_watchers set expires = \$TS where id in (select b.id from presentity a inner join active_watchers b on a.username = b.to_user and a.domain = b.to_domain and a.event = b.event where a.sender = \"\$var(MediaUrl)\")!g" #!substdef "!KZQ_PRESENCE_SEARCH_DETAIL!select * from active_watchers_log where presentity_uri = \"\$var(presentity_uri)\"!g" #!substdef "!KZQ_PRESENCE_SEARCH_SUMMARY!select * from active_watchers where watcher_domain = \"\$var(Domain)\"!g" +#!substdef "!KZQ_REGISTRAR_SEARCH_SUMMARY!select username,domain from location where domain = \"\$var(Domain)\"!g" +#!substdef "!KZQ_REGISTRAR_SEARCH_DETAIL!select * from location where domain = \"\$var(Domain)\"!g" #!substdef "!KZQ_HAS_PRESENTITY!select count(*) as count from presentity where username = \"\$subs(to_user)\" and domain = \"\$subs(to_domain)\" and event = \"\$subs(event)\"!g" #!substdef "!KZQ_PRESENCE_RESET!delete from presentity where sender = \"\$var(MediaUrl)\"!g" diff --git a/kamailio/db_queries_postgres.cfg b/kamailio/db_queries_postgres.cfg index cbceb30..712b9a3 100644 --- a/kamailio/db_queries_postgres.cfg +++ b/kamailio/db_queries_postgres.cfg @@ -11,4 +11,6 @@ #!substdef "!KZQ_RESET_PUBLISHER_UPDATE!update active_watchers set expires = \$TS where id in (select b.id from presentity a inner join active_watchers b on a.username = b.to_user and a.domain = b.to_domain and a.event = b.event where a.sender = '\$var(MediaUrl)')!g" #!substdef "!KZQ_PRESENCE_SEARCH_DETAIL!select * from active_watchers_log where presentity_uri = '\$var(presentity_uri)'!g" #!substdef "!KZQ_PRESENCE_SEARCH_SUMMARY!select * from active_watchers where watcher_domain = '\$var(Domain)'!g" +#!substdef "!KZQ_REGISTRAR_SEARCH_SUMMARY!select username,domain from location where domain = \"\$var(Domain)\"!g" +#!substdef "!KZQ_REGISTRAR_SEARCH_DETAIL!select * from location where domain = \"\$var(Domain)\"!g" #!substdef "!KZQ_PRESENCE_RESET!delete from presentity where sender = '\$var(MediaUrl)'!g" diff --git a/kamailio/registrar-query.cfg b/kamailio/registrar-query.cfg new file mode 100644 index 0000000..4cfde98 --- /dev/null +++ b/kamailio/registrar-query.cfg @@ -0,0 +1,82 @@ +######## Registrar query server module ######## + +#!trydef KZ_REGISTRAR_QUERY_REPLY_ZONES 0 +kazoo.registrar_query_reply_zones = KZ_REGISTRAR_QUERY_REPLY_ZONES descr "0 - all, 1 - local, 2 - remote" + +route[REGISTRAR_SEARCH_SUMMARY] +{ + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|query|processing registrar summary query for $(kzE{kz.json,Realm})\n"); + $var(Queue) = $(kzE{kz.json,Server-ID}); + $var(Domain) = $(kzE{kz.json,Realm}); + $var(Username) = $(kzE{kz.json,Username}); + $var(Query) = $_s(KZQ_REGISTRAR_SEARCH_SUMMARY); + if($var(Username) != "") { + $var(Query) = $var(Query) + $_s( and username = "$var(Username)"); + } + + if (sql_xquery("cb", "$var(Query)", "ra") == 1) + { + $var(Registrations) = ""; + $var(Sep1) = ""; + while($xavp(ra) != $null) { + $var(Registration) = $_s("$(xavp(ra=>username))@$(xavp(ra=>domain))"); + $var(Registrations) = $var(Registrations) + $var(Sep1) + $var(Registration); + $var(Sep1)=", "; + + pv_unset("$xavp(ra)"); + } + } + + $var(amqp_payload_request) = $_s({"Event-Category" : "registration", "Event-Name" : "search_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Registrations" : [ $var(Registrations) ] }); + kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request)); + +} + +route[REGISTRAR_SEARCH_DETAIL] +{ + xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|query|processing registrar query detail for $(kzE{kz.json,Username}) in realm $(kzE{kz.json,Realm})\n"); + $var(Queue) = $(kzE{kz.json,Server-ID}); + $var(Msg-ID) = $(kzE{kz.json,Msg-ID}); + $var(Domain) = $(kzE{kz.json,Realm}); + $var(Username) = $(kzE{kz.json,Username}); + $var(Query) = $_s(KZQ_REGISTRAR_SEARCH_DETAIL); + if($var(Username) != "") { + $var(Query) = $var(Query) + $_s( and username = "$var(Username)"); + } + + if (sql_xquery("cb", "$var(Query)", "ra") == 1) + { + while($xavp(ra) != $null) { + $var(Registration) = $_s({"Contact":"$(xavp(ra=>contact))", "Received":"$(xavp(ra=>received))", "Path":"$(xavp(ra=>path))", "Expires":$(xavp(ra=>expires)), "Call-ID":"$(xavp(ra=>callid))", "CSeq":"$(xavp(ra=>cseq))", "Last-Modified":"$(xavp(ra=>last_modified))", "User-Agent":"$(xavp(ra=>user_agent){s.escape.common}{s.replace,\','}{s.replace,$$,})", "Socket":"$(xavp(ra=>socket))"}); + pv_unset("$xavp(ra)"); + + $var(amqp_payload_request) = '{"Event-Category" : "registration", "Event-Name" : "search_partial_resp", "Msg-ID" : "$var(Msg-ID)", "Registrations" : [ $var(Registration) ] }'; + kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request)); + } + } + + $var(amqp_payload_request) = '{"Event-Category" : "registration", "Event-Name" : "search_resp", "Msg-ID" : "$var(Msg-ID)", "Registrations":[] }'; + kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request)); + +} + +event_route[kazoo:consumer-event-registration-search-req] +{ + $var(Zone) = $(kzE{kz.json,AMQP-Broker-Zone}); + if( ($var(Zone) == "MY_AMQP_ZONE" && $sel(cfg_get.kazoo.registrar_query_reply_zones) != 2) || + ($var(Zone) != "MY_AMQP_ZONE" && $sel(cfg_get.kazoo.registrar_query_reply_zones) != 1)) { + + switch($(kzE{kz.json,Search-Type})) { + case "summary": + route(REGISTRAR_SEARCH_SUMMARY); + break; + case "detail": + route(REGISTRAR_SEARCH_DETAIL); + break; + default: + xlog("L_INFO", "$ci|search type '$(kzE{kz.json,Search-Type})' not handled\n"); + } + } +} + +# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/registrar-role.cfg b/kamailio/registrar-role.cfg index 1e359ff..6853408 100644 --- a/kamailio/registrar-role.cfg +++ b/kamailio/registrar-role.cfg @@ -146,6 +146,8 @@ kazoo.registrar_keepalive_udp_only = KZ_REGISTRAR_KEEPALIVE_UDP_ONLY descr "shou kazoo.registrar_send_100 = REGISTRAR_SEND_100 descr "should we send 100 reply while doing directory search" kazoo.registrar_publish_reg_once = KZ_REGISTRAR_PUBLISH_REG_ONCE descr "should publish only new registrations" +#!include_file "registrar-query.cfg" + ####### Registrar Logic ######## route[REGISTRAR_NAT_FLAGS] @@ -500,12 +502,25 @@ route[REGISTRAR_BINDINGS] #!endif + route(REGISTRAR_API_BINDINGS); + #!ifdef REGISTRAR_SYNC_ROLE route(REGISTRAR_SYNC_BINDINGS); #!endif } +route[REGISTRAR_API_BINDINGS] +{ + #!import_file "registrar-api-custom-bindings.cfg" + + #!ifndef REGISTRAR_API_CUSTOM_BINDINGS + $var(payload) = $_s({"name": "registrar-api", "exchange": "registrar", "type": "topic", "queue": "registrar-api-MY_HOSTNAME", "routing": ["registration.search_req.*"], "exclusive": false, "federate": true }); + kazoo_subscribe("$var(payload)"); + #!endif + +} + route[REGISTRAR_BOUNDS] { if((int)@contact.expires) {