From ca531312a1d2e9378cc2661e1dfd3bea54c14a72 Mon Sep 17 00:00:00 2001 From: lazedo Date: Tue, 17 Sep 2019 15:28:25 +0000 Subject: [PATCH] handle returned messages --- kamailio/kazoo-bindings.cfg | 5 +++ kamailio/registrar-role.cfg | 5 +-- kamailio/trusted.cfg | 64 +++++++++++++++++++------------------ 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/kamailio/kazoo-bindings.cfg b/kamailio/kazoo-bindings.cfg index 319417f..a81d516 100644 --- a/kamailio/kazoo-bindings.cfg +++ b/kamailio/kazoo-bindings.cfg @@ -155,4 +155,9 @@ event_route[kazoo:consumer-event-connection-listener-unavailable] xlog("L_DEBUG","amqp zone $(kzE{kz.json,zone}) is unavailable\n"); } +event_route[kazoo:consumer-event-amqp-error-message-returned] +{ + xlog("L_WARNING","$(kzE{kz.json,Msg-ID})|amqp|message $(kzE{kz.json,Original-Event-Category}):$(kzE{kz.json,Original-Event-Name}) was returned by broker $(kzE{kz.json,Error-Code}) $(kzE{kz.json,Error-Reason})\n"); +} + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/registrar-role.cfg b/kamailio/registrar-role.cfg index de4523f..e991ab2 100644 --- a/kamailio/registrar-role.cfg +++ b/kamailio/registrar-role.cfg @@ -421,9 +421,10 @@ route[SAVE_LOCATION] $var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Status" : "$var(Status)", "Event-Timestamp" : $TS, "Expires" : $(var(expires){s.int}), "First-Registration" : $var(new_reg), "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" , "Custom-Channel-Vars" : $xavp(ulattrs=>custom_channel_vars), "Proxy-Path" : "$var(proxy_path)", "Proxy-Protocol" : "$var(proto)", "Proxy-IP" : "$var(AdvIP)", "Proxy-Port" : "$RAp", "Source-IP": "$si", "Source-Port": "$sp" }); $var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $(fU{kz.encode}); - kazoo_publish("registrar", $var(amqp_routing_key), $var(amqp_payload_request)); + if(kazoo_publish("registrar", $var(amqp_routing_key), $var(amqp_payload_request), "$def(REGISTRAR_AMQP_FLAGS)") == 1) { + xlog("L_INFO", "$ci|end|successful $(var(Status){s.tolower}) with contact : $ct : $var(expires)\n"); + } - xlog("L_INFO", "$ci|end|successful $(var(Status){s.tolower}) with contact : $ct : $var(expires)\n"); #!ifdef PUSHER_ROLE route(PUSHER_ON_REGISTRATION); diff --git a/kamailio/trusted.cfg b/kamailio/trusted.cfg index 8e5e39b..8021b52 100644 --- a/kamailio/trusted.cfg +++ b/kamailio/trusted.cfg @@ -4,7 +4,7 @@ modparam("permissions", "db_url", "KAZOO_DB_URL") modparam("permissions", "db_mode", KZ_PERMISSIONS_CACHE) modparam("permissions", "peer_tag_avp", "$avp(trusted_x_header)") -modparam("rtimer", "timer", "name=trusted_reload;interval=5;mode=1;") +modparam("rtimer", "timer", "name=trusted_reload;interval=20;mode=1;") modparam("rtimer", "exec", "timer=trusted_reload;route=TRUSTED_RELOAD") modparam("rtimer", "exec", "timer=trusted_reload;route=TRUSTED_QUERY") @@ -14,43 +14,26 @@ modparam("pv", "shvset", "trusted_query=i:1") route[TRUSTED_LOAD] { - if (!t_newtran()) { - xlog("L_ERROR", "trusted|log|failed to create transaction to query for acl\n"); - return; - } - $shv(trusted_query) = 0; $var(amqp_payload_request) = $_s({"Event-Category" : "trusted" , "Event-Name" : "query"}); $var(amqp_routing_key) = "trusted.query"; xlog("L_DEBUG", "$ci|amqp|publishing to acl => $var(amqp_routing_key) : $var(amqp_payload_request)\n"); - if(kazoo_async_query("trusted", $var(amqp_routing_key), $var(amqp_payload_request), "KZ_ACL_REPLY", "KZ_ACL_TIMEOUT", "$def(TRUSTED_AMQP_FLAGS)") != 1) { - xlog("L_WARNING", "$ci|log|failed to send trusted query\n"); + if(kazoo_query("trusted", $var(amqp_routing_key), $var(amqp_payload_request), "$def(TRUSTED_AMQP_FLAGS)") != 1) { + if($(kzR{kz.json,Event-Name}) == "message_returned" ) { + xlog("L_WARNING", "trusted|query|message was returned by broker $(kzR{kz.json,Error-Code}) $(kzR{kz.json,Error-Reason})\n"); + } else { + xlog("L_WARNING", "trusted|query|failed $kzR\n"); + } $shv(trusted_query) = 1; + return; } -} - -failure_route[KZ_ACL_TIMEOUT] -{ - if($(kzR{kz.json,Event-Name}) == "message_returned" ) { - xlog("L_WARNING", "$ci|amqp|message was returned by broker $(kzR{kz.json,Error-Code}) $(kzR{kz.json,Error-Reason})\n"); - } else { - xlog("L_WARNING", "$ci|end|failed $T_reply_code $T_reply_reason [$T(id_index):$T(id_label)] querying trusted\n"); - } - $shv(trusted_query) = 1; - # this is needed because of async query that creates a transaction - t_drop(); -} -onreply_route[KZ_ACL_REPLY] -{ - xlog("L_DEBUG", "trusted|query|got reply\n"); + xlog("L_DEBUG", "trusted|query|got reply $kzR\n"); avp_delete("$avp(TrustedKeys)/g"); if(kazoo_json_keys($kzR, "Trusted", "$avp(TrustedKeys)") != 1) { xlog("L_WARNING", "trusted|reply|no keys for Trusted\n"); - # this is needed because of async query that creates a transaction - t_drop(); return; } sql_query("exec", "delete from address"); @@ -62,13 +45,34 @@ onreply_route[KZ_ACL_REPLY] $var(Key) = $(var(KeyName){s.replace,.,%}); $var(token) = $(kzR{kz.json,Trusted.$var(Key).token}); $var(cidr_count) = $(kzR{kz.json.count, Trusted.$var(Key).cidrs}); + + ## ports + $var(port_count) = $(kzR{kz.json.count, Trusted.$var(Key).ports}); + $(avp(ports)[*]) = $null; + if($var(port_count) == 0) { + $avp(ports) = 0; + $var(port_count) = 1; + } else { + $var(portIdx) = 0; + while($var(portIdx) < $var(port_count)) { + $avp(ports) = $(kzR{kz.json, Trusted.$var(Key).ports[$var(portIdx)}); + $var(portIdx) = $var(portIdx) + 1; + } + } + $var(cidr_idx) = 0; while($var(cidr_idx) < $var(cidr_count)) { $var(cidr) = $(kzR{kz.json,Trusted.$var(Key).cidrs[$var(cidr_idx)]}); $var(ip) = $(var(cidr){s.select,0,/}); $var(mask) = $(var(cidr){s.select,1,/}); - $var(sql) = $_s(insert into address(ip_addr, mask, tag) values("$var(ip)", $var(mask), "$var(token)")); - sql_query("exec", "$var(sql)"); + + $var(portIdx) = 0; + while($var(portIdx) < $var(port_count)) { + $var(sql) = $_s(insert into address(ip_addr, mask, port, tag) values("$var(ip)", $var(mask), $(avp(ports)[$var(portIdx)]), "$var(token)")); + sql_query("exec", "$var(sql)"); + $var(portIdx) = $var(portIdx) + 1; + } + $var(cidr_idx) = $var(cidr_idx) + 1; $var(total) = $var(total) + 1; } @@ -78,14 +82,12 @@ onreply_route[KZ_ACL_REPLY] xlog("L_NOTICE", "trusted|query|loaded $var(total) entries into address table\n"); $shv(trusted_reload) = 1; - # this is needed because of async query that creates a transaction - t_drop(); } route[RELOAD_TRUSTED] { jsonrpc_exec('{"jsonrpc": "2.0", "method": "permissions.addressReload"}'); - xlog("L_INFO", "trusted|reload|$(jsonrpl(body){kz.json,result})\n"); + xlog("L_NOTICE", "trusted|reload|$(jsonrpl(body){kz.json,result})\n"); } route[TRUSTED_RELOAD]