From 10e5707628cd77b7a039c8a8f677ea503951721e Mon Sep 17 00:00:00 2001 From: lazedo Date: Thu, 9 Feb 2017 21:07:00 +0000 Subject: [PATCH] refactor presence role (#136) * refactor presence role * Update presence-role.cfg * Update presence_notify_sync-role.cfg * Update presence-role.cfg * Update registrar-role.cfg * Update presence_notify_sync-role.cfg * typo * remove trans * Update presence-role.cfg * Update presence-role.cfg * Update presence_notify_sync-role.cfg * Update presence-role.cfg * Update presence-role.cfg * Update presence-role.cfg * update presence * update local * update bindings * Update presence-role.cfg * bring back comments on defaults * remove PRESENCE_NAT role from definitions * outbound should be loaded before stun * Update presence-role.cfg * add a threshold for 408 and other errors * handle summary & detail queries * Update accounting-role.cfg * Update default.cfg * fix call-id in logging * Update fast-pickup-role.cfg --- kamailio/accounting-role.cfg | 6 +- kamailio/default.cfg | 45 +++--- kamailio/defs.cfg | 6 +- kamailio/fast-pickup-role.cfg | 11 +- kamailio/local.cfg | 1 - kamailio/presence-role.cfg | 188 +++++++++++++++++++++---- kamailio/presence_notify_sync-role.cfg | 53 +++++-- kamailio/presence_query-role.cfg | 86 ++++++++--- kamailio/registrar-role.cfg | 4 +- 9 files changed, 306 insertions(+), 94 deletions(-) diff --git a/kamailio/accounting-role.cfg b/kamailio/accounting-role.cfg index 99fded7..7c58cb5 100644 --- a/kamailio/accounting-role.cfg +++ b/kamailio/accounting-role.cfg @@ -1,8 +1,8 @@ ####### Flags ####### flags - FLAG_ACC: 7, - FLAG_ACCMISSED: 8, - FLAG_ACCFAILED: 9; + FLAG_ACC: 8, + FLAG_ACCMISSED: 9, + FLAG_ACCFAILED: 10; ######## Accounting module ######## loadmodule "acc.so" diff --git a/kamailio/default.cfg b/kamailio/default.cfg index 2f754f1..80ba6e8 100644 --- a/kamailio/default.cfg +++ b/kamailio/default.cfg @@ -63,17 +63,21 @@ tcp_wq_blk_size = 2100 tcp_wq_max = 10485760 ####### UDP Parameters ######### -udp4_raw = -1 +udp4_raw = 0 +#udp4_raw_mtu = 800 +# # pmtu_discovery = no +#udp_mtu = 800 +# #udp_mtu_try_proto = TCP ####### DNS Parameters ######### dns = no rev_dns = no dns_try_ipv6 = no use_dns_cache = on -dns_cache_del_nonexp = no +dns_cache_del_nonexp = yes dns_cache_flags = 1 dns_cache_gc_interval = 120 -dns_cache_init = 1 +dns_cache_init = 0 dns_cache_mem = 1000 dns_cache_negative_ttl = 60 dns_try_naptr = no @@ -86,12 +90,12 @@ disable_sctp = yes ####### Modules Section ######## mpath="/usr/lib64/kamailio/modules/:/usr/lib/x86_64-linux-gnu/kamailio/modules/" -######## Kamailio stun module ######## -loadmodule "stun.so" - ######## Kamailio outbound module ######## loadmodule "outbound.so" +######## Kamailio stun module ######## +loadmodule "stun.so" + ######## Kamailio path module ######## loadmodule "path.so" @@ -167,6 +171,8 @@ loadmodule "uac_redirect.so" loadmodule "db_text.so" modparam("db_text", "db_mode", 1) modparam("db_text", "emptystring", 1) +modparam("db_text", "default_connection", "KAZOO_DB_URL") + ####### Kazoo Integration module ########## loadmodule "kazoo.so" @@ -795,7 +801,6 @@ onsend_route { event_route[kazoo:mod-init] { - #!ifdef PRESENCE_ROLE ### use this simple form of binding a listener ### kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME"); @@ -814,7 +819,15 @@ event_route[kazoo:mod-init] ### ### - $var(payload) = '{ "exchange" : "dialoginfo" , "type" : "direct", "queue" : "BLF-QUEUE-MY_HOSTNAME", "routing" : "BLF-MY_HOSTNAME", "auto_delete" : 0, "durable" : 1, "no_ack" : 0, "wait_for_consumer_ack" : 1 }'; + #!ifdef PRESENCE_ROLE + + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-dialog-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'dialog.*.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); + + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-presence-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'update.*.*', 'federate' : 1 }"; + kazoo_subscribe("$var(payload)"); + + $var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-mwi-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'mwi_updates.*', 'federate' : 1 }"; kazoo_subscribe("$var(payload)"); #!endif @@ -841,16 +854,9 @@ event_route[kazoo:mod-init] #!endif - #!ifdef PRESENCE_SYNC_ROLE - - $var(payload) = "{ 'exchange' : 'presence' , 'type' : 'topic', 'queue' : 'PRESENCE-QUEUE-MY_HOSTNAME', 'routing' : 'sync', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }"; - kazoo_subscribe("$var(payload)"); - - #!endif - #!ifdef PRESENCE_QUERY_ROLE - $var(payload) = "{ 'exchange' : 'omnipresence' , 'type' : 'topic', 'routing' : 'presence.search_req.*', 'federate' : 1 }"; + $var(payload) = "{ 'exchange' : 'presence' , 'type' : 'topic', 'queue' : 'presence-search-MY_HOSTNAME', 'exclusive' : 0, 'routing' : 'presence.search_req.*', 'federate' : 1 }"; kazoo_subscribe("$var(payload)"); #!endif @@ -869,13 +875,6 @@ event_route[kazoo:mod-init] #!endif - #!ifdef PRESENCE_BROADCAST_ROLE - - $var(payload) = "{ 'exchange' : 'omnipresence' , 'queue' : 'PRESENCE-BROADCAST-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'presence.update' }"; - kazoo_subscribe("$var(payload)"); - - #!endif - } event_route[kazoo:consumer-event] diff --git a/kamailio/defs.cfg b/kamailio/defs.cfg index 24ca823..1aaa2c0 100644 --- a/kamailio/defs.cfg +++ b/kamailio/defs.cfg @@ -3,8 +3,12 @@ ####### defs ######## +#!ifndef KAZOO_DATA_DIR +#!substdef "!KAZOO_DATA_DIR!/etc/kazoo/kamailio/dbtext!g" +#!endif + #!ifndef KAZOO_DB_URL -#!substdef "!KAZOO_DB_URL!text:///etc/kazoo/kamailio/dbtext!g" +#!substdef "!KAZOO_DB_URL!text://KAZOO_DATA_DIR!g" #!endif #!ifndef MAX_WHILE_LOOPS diff --git a/kamailio/fast-pickup-role.cfg b/kamailio/fast-pickup-role.cfg index 8e980a9..dd263b2 100644 --- a/kamailio/fast-pickup-role.cfg +++ b/kamailio/fast-pickup-role.cfg @@ -36,17 +36,22 @@ route[FAST_PICKUP_ATTEMPT] } if($var(replaced_call_id) != "none") { - xlog("L_INFO", "$ci|log|replaces call-id $var(replaced_call_id)\n"); + xlog("L_INFO", "$ci|log|request has replaces call-id $var(replaced_call_id)\n"); $var(amqp_payload_request) = '{"Event-Category" : "call_event" , "Event-Name" : "channel_status_req", "Call-ID" : "' + $var(replaced_call_id) + '", "Active-Only" : true }'; $var(amqp_routing_key) = "call.status_req." + $(var(replaced_call_id){kz.encode}); - sl_send_reply("100", "Attempting K query"); + sl_send_reply("100", "locating your call"); + xlog("L_INFO", "$ci|log|querying cluster for the location of call-id $var(replaced_call_id)\n"); if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request))) { $du = $(kzR{kz.json,Switch-URL}); if($du != $null) { - xlog("L_INFO", "$ci|log|call-id $var(replaced_call_id) found redirecting call to $du, courtesy of kazoo\n"); + xlog("L_INFO", "$ci|log|call-id $var(replaced_call_id) found redirecting call to $du\n"); route(EXTERNAL_TO_INTERNAL_RELAY); exit(); + } else { + remove_hf_re("^Replaces"); } + } else { + remove_hf_re("^Replaces"); } } diff --git a/kamailio/local.cfg b/kamailio/local.cfg index 7f34ca3..f28d2d6 100644 --- a/kamailio/local.cfg +++ b/kamailio/local.cfg @@ -28,7 +28,6 @@ debug = L_INFO # # #!trydef REGISTRAR_SYNC_ROLE # # #!trydef PRESENCE_SYNC_ROLE # # #!trydef PRESENCE_NOTIFY_SYNC_ROLE -# # #!trydef PRESENCE_BROADCAST_ROLE ################################################################################ ## SERVER INFORMATION diff --git a/kamailio/presence-role.cfg b/kamailio/presence-role.cfg index 577c7e4..dc8b597 100644 --- a/kamailio/presence-role.cfg +++ b/kamailio/presence-role.cfg @@ -4,6 +4,7 @@ #!trydef PRESENCE_MAX_EXPIRES 3600 modparam("htable", "htable", "p=>size=32;autoexpire=3600;") +modparam("htable", "htable", "first=>size=32;autoexpire=3600;initval =0;updateexpire=1") loadmodule "presence.so" loadmodule "presence_dialoginfo.so" @@ -26,58 +27,134 @@ modparam("presence", "min_expires", PRESENCE_MIN_EXPIRES) modparam("presence", "max_expires", PRESENCE_MAX_EXPIRES) modparam("presence", "sip_uri_match", 1) modparam("presence", "waitn_time", 1) -modparam("presence", "notifier_processes", 5) +modparam("presence", "notifier_processes", 10) modparam("presence", "db_url", "KAZOO_DB_URL") modparam("presence", "xavp_cfg", "pres") +modparam("presence", "local_log_level", 6) +modparam("presence", "startup_mode", 0) +modparam("presence", "force_delete", 1) +modparam("presence", "timeout_rm_subs", 0) +modparam("presence", "cseq_offset", 1) modparam("kazoo", "db_url", "KAZOO_DB_URL") modparam("kazoo", "pua_mode", 1) +#!ifdef NAT_TRAVERSAL_ROLE +#!ifndef NAT_TRAVERSAL_LOADED +#!trydef NAT_TRAVERSAL_LOADED +loadmodule "nat_traversal.so" +#!endif +modparam("nat_traversal", "keepalive_method", "OPTIONS") +modparam("nat_traversal", "keepalive_from", "sip:sipcheck@MY_HOSTNAME") +modparam("nat_traversal", "keepalive_state_file", "KAZOO_DATA_DIR/keep_alive_state") +modparam("nat_traversal", "keepalive_interval", 45) +#!endif + + +####### SQL OPS module ########## +#!ifndef SQLOPS_LOADED +loadmodule "sqlops.so" +#!trydef SQLOPS_LOADED +#!endif +modparam("sqlops","sqlcon", "exec=>KAZOO_DB_URL") + +route[PRESENCE_NAT] +{ + if (client_nat_test("3")) { + fix_contact(); + } + + nat_keepalive(); + force_rport(); + +} + ####### Presence Logic ######## route[HANDLE_SUBSCRIBE] { - if (is_method("SUBSCRIBE")) { - #!ifdef NAT_TRAVERSAL_ROLE - route(NAT_TEST_AND_CORRECT); - #!endif + if (!is_method("SUBSCRIBE")) { + return; + } + + #!ifdef NAT_TRAVERSAL_ROLE + route(PRESENCE_NAT); + #!endif + + if(has_totag()) { + route(HANDLE_RESUBSCRIBE); + } else { + route(HANDLE_NEW_SUBSCRIBE); + } + + exit; +} + +route[HANDLE_RESUBSCRIBE] +{ + loose_route(); + if(handle_subscribe()) { + if($subs(remote_cseq) < 5) { + $sht(first=>$subs(callid)) = $null; + $sht(first=>$subs(from_user)::$subs(to_user)::$subs(from_domain)::$subs(event)) = $null; + } + route(SUBSCRIBE_AMQP); + }; +} + + +route[HANDLE_NEW_SUBSCRIBE] +{ + if ($hdr(Event) == "dialog" + || $hdr(Event) == "presence" + || $hdr(Event) == "message-summary") { + if ($tU == $null) { xlog("L_INFO", "$ci|stop|ignoring subscribe with empty TO username from a $ua\n"); - sl_send_reply(400, "Missing TO username"); - exit; + send_reply(400, "Missing TO username"); + return; } if ($fU == $null) { xlog("L_INFO", "$ci|stop|ignoring subscribe with empty FROM username from a $ua\n"); - sl_send_reply(400, "Missing FROM username"); - exit; + send_reply(400, "Missing FROM username"); + return; } - if (!handle_subscribe()) { - xlog("L_INFO", "$ci|stop|unsupported subsribe\n"); - exit; + if($shtinc(first=>$ci) > 1) { + sql_query("exec", 'delete from active_watchers where callid = "$ci"'); + xlog("L_INFO", "$ci|subscribe|resetting $hdr(Event) subscription from $fU to $tU in realm $fd : $sqlrows(exec)\n"); + } else { + if($shtinc(first=>$fU::$tU::$fd::$hdr(Event)) > 1) { + sql_query("exec", 'delete from active_watchers where watcher_username="$fU" and to_user="$tU" and watcher_domain="$fd" and event="$hdr(Event)"'); + xlog("L_INFO", "$ci|subscribe|resetting $hdr(Event) subscription from $fU to $tU in realm $fd : $sqlrows(exec)\n"); + } } - $var(Expires) = $hdr(Expires); - if($var(Expires) < PRESENCE_MIN_EXPIRES) { - $var(Expires) = PRESENCE_MIN_EXPIRES; - } else if($var(Expires) > PRESENCE_MAX_EXPIRES) { - $var(Expires) = PRESENCE_MAX_EXPIRES; - } - - ##RabbitMQ - $var(fs_path) = "%3C" + $rz + "%3A" + $Ri + "%3A" + $Rp + "%3Btransport=" + $proto + "%3Blr%3Breceived=" + $si+":"+$sp+"%3E"; - $var(fs_contact) = "<" + $(ct{tobody.uri}) + ";fs_path=" + $var(fs_path) + ">"; - if($(ct{tobody.params}) != $null) { - $var(fs_contact) = $var(fs_contact) + ";" + $(ct{tobody.params}); + if (handle_subscribe()) { + route(SUBSCRIBE_AMQP); + xlog("L_INFO","$ci|end|new $hdr(Event) subscription from $fU to $tU in realm $fd : $sht(first=>$ci) : $sht(first=>$fU::$tU::$fd::$hdr(Event))\n"); + } else { + xlog("L_INFO", "$ci|stop|error $T_reply_code for new $hdr(Event) subscription from $fU to $tU in realm $fd\n"); } + } else { + xlog("L_INFO", "$ci|stop|unsupported subscription package $hdr(Event) from $fU to $tU in realm $fd\n"); + send_reply(489, "Bad Event"); + } +} - $var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : "$var(Expires)", "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common})" }'; +route[SUBSCRIBE_AMQP] +{ + $var(Expires) = $hdr(Expires); + if($var(Expires) < PRESENCE_MIN_EXPIRES) { + $var(Expires) = PRESENCE_MIN_EXPIRES; + } else if($var(Expires) > PRESENCE_MAX_EXPIRES) { + $var(Expires) = PRESENCE_MAX_EXPIRES; + } - kazoo_publish("dialoginfo_subs", "dialoginfo_subs", $var(amqp_payload_request)); + $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : "$var(Expires)", "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" }); + kazoo_publish("dialoginfo_subs", "dialoginfo_subs", $var(amqp_payload_request)); - exit; - } } route[HANDLE_PUBLISH] @@ -96,10 +173,57 @@ route[HANDLE_PUBLISH] } } +event_route[kazoo:consumer-event-presence-dialog-update] +{ + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State})\n"); + if(pres_has_subscribers("$(kzE{kz.json,From})", "dialog")) { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing dialog update for $(kzE{kz.json,From})\n"); + kazoo_pua_publish_dialoginfo($kzE); + pres_refresh_watchers("$(kzE{kz.json,From})", "dialog", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip dialog update for $(kzE{kz.json,From})\n"); + } + + + if(pres_has_subscribers("$(kzE{kz.json,From})", "presence")) { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing presence update for $(kzE{kz.json,From})\n"); + kazoo_pua_publish_presence($kzE); + pres_refresh_watchers("$(kzE{kz.json,From})", "presence", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip presence update for $(kzE{kz.json,From})\n"); + } + +} + +event_route[kazoo:consumer-event-presence-mwi-update] +{ + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received message-summary update for $(kzE{kz.json,From})\n"); + if(pres_has_subscribers("$(kzE{kz.json,From})", "message-summary")) { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing message-summary update for $(kzE{kz.json,From})\n"); + kazoo_pua_publish_mwi($kzE); + pres_refresh_watchers("$(kzE{kz.json,From})", "message-summary", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip message-summary update for $(kzE{kz.json,From})\n"); + } +} + event_route[kazoo:consumer-event-presence-update] +{ + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received presence update for $(kzE{kz.json,From})\n"); + if(pres_has_subscribers("$(kzE{kz.json,From})", "presence")) { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing presence update for $(kzE{kz.json,From})\n"); + kazoo_pua_publish_presence($kzE); + pres_refresh_watchers("$(kzE{kz.json,From})", "presence", 1); + } else { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip presence update for $(kzE{kz.json,From})\n"); + } + +} + +route[HANDLE_PRESENCE_UPDATE] { $var(call-id) = $(kzE{kz.json,Call-ID}); - if( $(kzE{kz.json,Event-Package}) == "dialog") { + if( $(kzE{kz.json,Event-Package}) == "dialog") { if($sht(p=>$var(call-id)) != $(kzE{kz.json,State}) || $(kzE{kz.json,Flush-Level}) != $null) { xlog("L_INFO", "$(kzE{kz.json,Target-Call-ID})|log|received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State}) $kzE\n"); $sht(p=>$(kzE{kz.json,Call-ID})) = $(kzE{kz.json,State}); @@ -108,10 +232,14 @@ event_route[kazoo:consumer-event-presence-update] #!endif kazoo_pua_publish($kzE); pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1); + $var(Presence) = $(kzE{re.subst,/"Event-Package": "dialog"/"Event-Package": "presence"/g}); + xlog("L_INFO", "PRESENCE $var(Presence)"); + kazoo_pua_publish($var(Presence)); + pres_refresh_watchers("$(var(Presence){kz.json,From})", "$(var(Presence){kz.json,Event-Package})", 1); } else { xlog("L_INFO", "$var(call-id)|log|received duplicate $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State}) $kzE\n"); xlog("L_INFO", "$var(call-id)|log|payload $kzE\n"); - } + } } else { xlog("L_INFO", "$var(call-id)|log|received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) $kzE\n"); kazoo_pua_publish($kzE); diff --git a/kamailio/presence_notify_sync-role.cfg b/kamailio/presence_notify_sync-role.cfg index 770d19f..1728e03 100644 --- a/kamailio/presence_notify_sync-role.cfg +++ b/kamailio/presence_notify_sync-role.cfg @@ -2,17 +2,35 @@ kazoo.presence_notify = 1 descr "enable/disable sending notify callback to omnip kazoo.presence_notify_timeout = 3000 descr "timeout in ms waiting for notify reply" kazoo.presence_notify_log_body = 0 descr "logs the body sent in the notification" kazoo.presence_notify_log_resp_body = 0 descr "logs the body received from notification" -kazoo.presence_notify_log_to_table = 0 descr "logs notify/reply to active_watchers_log table" -kazoo.presence_notify_log_to_amqp = 1 descr "logs notify/reply to amqp" +kazoo.presence_notify_log_to_table = 1 descr "logs notify/reply to active_watchers_log table" +kazoo.presence_notify_log_to_amqp = 0 descr "logs notify/reply to amqp" +kazoo.presence_notify_record_route = 0 descr "add record route header to notify msg sent" +kazoo.presence_notify_log_init_body = 0 descr "logs the body before its sent" +kazoo.presence_notify_force_send_socket = 0 descr "forces the send socket to the contact" + +######## Generic Hash Table container in shared memory ######## +modparam("htable", "htable", "notify=>size=16;autoexpire=3600;updateexpire=1;initval=0") #!trydef PRESENCE_NOTIFY_INIT +#!trydef MAX_NOTIFY_ERROR 5 + route[PRESENCE_LOCAL_NOTIFY] { if($rm != "NOTIFY") { return; } t_set_fr(@cfg_get.kazoo.presence_notify_timeout, @cfg_get.kazoo.presence_notify_timeout); - xlog("L_INFO", "$ci|log|init preparing notify to $du : $ruri"); + xlog("L_INFO", "$ci|log|init preparing $subs(event) notify to $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri) : $du\n"); + if(@cfg_get.kazoo.presence_notify_log_init_body == 1) { + xlog("L_INFO", "$ci|log|init|body $(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\n"); + } + if(@cfg_get.kazoo.presence_notify_force_send_socket == 1) { + $fs = $_s($(pr{s.tolower}):$(hdr(Contact){nameaddr.uri}{uri.host}):$(hdr(Contact){nameaddr.uri}{uri.port})); + xlog("L_INFO", "$ci|log|init|forcing socket to $fs, $(pr{s.tolower}):$(hdr(Contact){nameaddr.uri}{uri.host}):$(hdr(Contact){nameaddr.uri}{uri.port}) , $ct\n"); + } + if(@cfg_get.kazoo.presence_notify_record_route == 1) { + record_route(); + } } @@ -47,25 +65,34 @@ event_route[presence:notify-reply] $xavp(pres=>delete_subscription) = 0; if($notify_reply($rs) == 200) { - xlog("L_INFO", "$ci|end|notified $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)"); + sht(notify=>$ci) = $null; + xlog("L_INFO", "$ci|end|notified $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)\n"); } else if($notify_reply($rs) == 481 && $subs(reason) == "timeout") { - xlog("L_INFO","$ci|end|sent subscription $hdr(Subscription-State)"); + xlog("L_INFO","$ci|end|sent subscription $hdr(Subscription-State)\n"); } else { - if($rP != "UDP") + xlog("L_ERROR", "$ci|error|received $notify_reply($rs) when notifying $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)\n"); + if($rP != "UDP") { $xavp(pres=>delete_subscription) = 1; - xlog("L_ERROR", "$ci|error|received $notify_reply($rs) $subs(reason) when notifying $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)"); + xlog("L_ERROR", "$ci|error|removing $rP watcher $subs(watcher_username)@$subs(watcher_domain) for $subs(pres_uri)\n"); + } else { + $var(shtinc) = $shtinc(notify=>$ci::count); + if($var(shtinc) > MAX_NOTIFY_ERROR) { + $xavp(pres=>delete_subscription) = 1; + xlog("L_ERROR", "$ci|error|removing $rP watcher $subs(watcher_username)@$subs(watcher_domain) for $subs(pres_uri)\n"); + } + } } if(@cfg_get.kazoo.presence_notify_log_body == 1) - xlog("L_INFO", "$ci|log|sent|body #012$(mb{s.escape.common}{s.replace,\','})#012"); + xlog("L_INFO", "$ci|log|sent|body $(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\n"); if(@cfg_get.kazoo.presence_notify_log_resp_body == 1) - xlog("L_INFO", "$ci|log|resp|body #012$(notify_reply($mb){s.escape.common}{s.replace,\','})#012"); + xlog("L_INFO", "$ci|log|resp|body $(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})\n"); if(@cfg_get.kazoo.presence_notify_log_to_amqp == 1) { route(PRESENCE_NOTIFY_AMQP); } if(@cfg_get.kazoo.presence_notify_log_to_table == 1) { - $var(Query) = $_s(REPLACE active_watchers_log SET sent_msg="$(mb{s.escape.common}{s.replace,\','})", received_msg="$(notify_reply($mb){s.escape.common}{s.replace,\','})", time=$TS, result=$notify_reply($rs), user_agent="$(subs(user_agent){s.escape.common})", callid="$subs(callid)", to_user="$subs(to_user)", to_domain="$subs(to_domain)" where presentity_uri="$subs(uri)" watcher_username="$subs(watcher_username)" and watcher_domain="$subs(watcher_domain)" event="$subs(event)"); + $var(Query) = $_s(REPLACE active_watchers_log SET sent_msg="$(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})", received_msg="$(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})", time=$TS, result=$notify_reply($rs), user_agent="$(subs(user_agent){s.escape.common}{s.replace,\','}{s.replace,$$,})", callid="$subs(callid)", to_user="$subs(to_user)", to_domain="$subs(to_domain)" where presentity_uri="$subs(uri)" watcher_username="$subs(watcher_username)" and watcher_domain="$subs(watcher_domain)" event="$subs(event)"); mq_add("presence_last_notity", "$subs(watcher_username)@$subs(watcher_domain)", "$var(Query)"); } } @@ -77,7 +104,7 @@ route[PRESENCE_LOG_TIMER_ROUTE] while(mq_fetch("presence_last_notity") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { # xlog("L_INFO", "Query : $mqv(presence_last_notity)"); if (sql_query("cb", "$mqv(presence_last_notity)") != 1) { - xlog("L_ERROR", "$ci|log|error updating active_watchers_log"); + xlog("L_ERROR", "$ci|log|error updating active_watchers_log\n"); } $var(runloop) = $var(runloop) + 1; } @@ -86,8 +113,8 @@ route[PRESENCE_LOG_TIMER_ROUTE] route[PRESENCE_NOTIFY_AMQP] { - $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$subs(event)", "Timestamp" : $TS, "Call-ID" : "$subs(callid)", "From" : "$fu", "To" : "$subs(to_user)@$subs(to_domain)", "Sent" : "$(TS{s.ftime,%Y-%m-%d %H:%M:%S})", "Body" : "Hostname : MY_HOSTNAME\r\nTimestamp : $(TS{s.ftime,%Y-%m-%d %H:%M:%S})\r\n$(mb{s.escape.common}{s.replace,\','})\r\nResponse\r\n$(notify_reply($mb){s.escape.common}{s.replace,\','})", "Sequence" : $cs, "Reply" : $notify_reply($rs) }); + $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$subs(event)", "Timestamp" : $TS, "Call-ID" : "$subs(callid)", "From" : "$fu", "To" : "$subs(to_user)@$subs(to_domain)", "Sent" : "$(TS{s.ftime,%Y-%m-%d %H:%M:%S})", "Body" : "Hostname : MY_HOSTNAME\r\nTimestamp : $(TS{s.ftime,%Y-%m-%d %H:%M:%S})\r\n$(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\r\nResponse\r\n$(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})", "Sequence" : $cs, "Reply" : $notify_reply($rs) }); $var(rk) = "notify." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode}); kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request)); - xlog("L_INFO", "$ci|log|sent notify callback for event $subs(event) : $tu"); + xlog("L_INFO", "$ci|log|sent notify callback for event $subs(event) : $tu\n"); } diff --git a/kamailio/presence_query-role.cfg b/kamailio/presence_query-role.cfg index 50309f5..325de55 100644 --- a/kamailio/presence_query-role.cfg +++ b/kamailio/presence_query-role.cfg @@ -3,53 +3,88 @@ ####### SQL OPS module ########## #!ifndef SQLOPS_LOADED loadmodule "sqlops.so" -modparam("sqlops","sqlcon", "cb=>KAZOO_DB_URL") #!trydef SQLOPS_LOADED #!endif +modparam("sqlops","sqlcon", "cb=>KAZOO_DB_URL") -event_route[kazoo:consumer-event-presence-search-req] +route[PRESENCE_SEARCH_SUMMARY] { - xlog("L_INFO", "received presence search_req $kzE\n"); + xlog("L_INFO", "processing search subscriptions for $(kzE{kz.json,Realm})\n"); $var(Queue) = $(kzE{kz.json,Server-ID}); $var(Event) = $(kzE{kz.json,Event-Package}); $var(Domain) = $(kzE{kz.json,Realm}); $var(Username) = $(kzE{kz.json,Username}); $var(Now) = $TS; $var(Items) = ""; - $var(Query) = $_s(select * from presentity where domain = "$var(Domain)"); + $var(Query) = $_s(select * from active_watchers where watcher_domain = "$var(Domain)"); if($var(Event) != "") { $var(Query) = $var(Query) + $_s( and event = "$var(Event)"); } if($var(Username) != "") { - $var(Query) = $var(Query) + $_s( and username = "$var(Username)"); + $var(Query) = $var(Query) + $_s( and watcher_username = "$var(Username)"); } - $var(Query) = $var(Query) + " order by username, event, id"; - xlog("L_INFO", "$ci| QUERY $var(Query)\n"); + $var(Query) = $var(Query) + " order by to_user, event, watcher_username, callid"; + xlog("L_DEBUG", "$ci| QUERY $var(Query)\n"); if (sql_xquery("cb", "$var(Query)", "ra") == 1) { + $var(Subs) = ""; + $var(Sep1) = ""; while($xavp(ra) != $null) { - $var(Username) = $xavp(ra=>username); + $var(Username) = $xavp(ra=>to_user); $var(Sep2)=""; $var(Evt)=""; - while($xavp(ra) != $null && $var(Username) == $xavp(ra=>username)) + while($xavp(ra) != $null && $var(Username) == $xavp(ra=>to_user)) { $var(Event) = $xavp(ra=>event); $var(Sep3)=""; $var(Sub)=""; - while($xavp(ra) != $null && $var(Username) == $xavp(ra=>username) && $var(Event) == $xavp(ra=>event)) + $var(Count) = 0; + while($xavp(ra) != $null && $var(Username) == $xavp(ra=>to_user) && $var(Event) == $xavp(ra=>event)) { - $var(Sub) = $var(Sub) + $_s($var(Sep3)"$xavp(ra=>id)" : {"etag" : "$xavp(ra=>etag)", "body" : "$(xavp(ra=>body){s.escape.common})" }); - $var(Sep3)=", "; + $var(Count) = $var(Count) + 1; pv_unset("$xavp(ra)"); } - $var(Evt) = $var(Evt) + $var(Sep2) + $_s("$var(Event)") + " : { " + $var(Sub) + "}"; + $var(Evt) = $var(Evt) + $var(Sep2) + $_s("$var(Event)" : $var(Count)); $var(Sep2)=", "; } - $var(Usr) = $_s("$var(Username)") + " : { " + $var(Evt) + " }"; - xlog("L_INFO", "$ci| RESULT \"Subscriptions\" : { $var(Usr) }\n"); - $var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "search_partial_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Subscriptions" : { $var(Usr) } }'; - kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request)); + $var(Subs) = $var(Subs) + $var(Sep1) + $_s("$var(Username)") + " : { " + $var(Evt) + " }"; + $var(Sep1)=", "; + } + } + xlog("L_DEBUG", "$ci| RESULT \"Subscriptions\" : { $var(Subs) }\n"); + + $var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "search_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Subscriptions" : { $var(Subs) } }'; + kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request)); + +} + +route[PRESENCE_SEARCH_DETAIL] +{ + xlog("L_INFO", "processing status for $(kzE{kz.json,Username}) in realm $(kzE{kz.json,Realm})\n"); + $var(Queue) = $(kzE{kz.json,Server-ID}); + $var(Event) = $(kzE{kz.json,Event-Package}); + $var(Domain) = $(kzE{kz.json,Realm}); + $var(Username) = $(kzE{kz.json,Username}); + $var(Now) = $TS; + $var(Items) = ""; + $var(Query) = $_s(select * from active_watchers_log where presentity_uri = "sip:$var(Username)@$var(Domain)"); + if($var(Event) != "") { + $var(Query) = $var(Query) + $_s( and event = "$var(Event)"); + } + $var(Query) = $var(Query) + " order by event, watcher_username, callid"; + xlog("L_DEBUG", "$ci| STATUS QUERY $var(Query)\n"); + + if (sql_xquery("cb", "$var(Query)", "ra") == 1) { + while($xavp(ra) != $null) { + $var(Event) = $xavp(ra=>event); + while($xavp(ra) != $null && $var(Event) == $xavp(ra=>event)) { + $var(Sub) = $_s("$var(Username)" : {"$xavp(ra=>event)" : { "$xavp(ra=>watcher_username)" : {"kamailio@MY_HOSTNAME" : {"call_id" : "$xavp(ra=>callid)", "time" : $xavp(ra=>time), "result" : $xavp(ra=>result), "sent" : "$(xavp(ra=>sent_msg){s.escape.common})", "received" : "$(xavp(ra=>received_msg){s.escape.common})", "User-Agent" : "$(xavp(ra=>user_agent){s.escape.common}{s.replace,\','}{s.replace,$$,})"}}}}); + xlog("L_DEBUG", "$ci| RESULT \"Subscriptions\" : { $var(Sub) }\n"); + $var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "search_partial_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Subscriptions" : { $var(Sub) } }'; + kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request)); + pv_unset("$xavp(ra)"); + } } } @@ -58,5 +93,20 @@ event_route[kazoo:consumer-event-presence-search-req] } -# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab + +event_route[kazoo:consumer-event-presence-search-req] +{ + switch($(kzE{kz.json,Search-Type})) { + case "summary": + route(PRESENCE_SEARCH_SUMMARY); + break; + case "detail": + route(PRESENCE_SEARCH_DETAIL); + break; + default: + xlog("L_INFO", "$ci|search type '$(kzE{kz.json,Search-Type})' not handled\n"); + } +} + +# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/registrar-role.cfg b/kamailio/registrar-role.cfg index 7f53031..d01852c 100644 --- a/kamailio/registrar-role.cfg +++ b/kamailio/registrar-role.cfg @@ -142,7 +142,7 @@ route[ATTEMPT_AUTHORIZATION] route[KAZOO_AUTHORIZATION] { $var(nonce) = $adn; - $var(amqp_payload_request) = $_s({"Event-Category" : "directory" , "Event-Name" : "authn_req", "Method" : "REGISTER", "Auth-Nonce" : "$adn", "Auth-Realm" : "$fd", "Auth-User" : "$fU", "From" : "$fu", "To" : "$tu", "Orig-IP" : "$si", "Orig-Port" : "$sp", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','})", "Contact" : "$(ct{s.escape.common}{s.replace,\','})", "Call-ID" : "$ci" }); + $var(amqp_payload_request) = $_s({"Event-Category" : "directory" , "Event-Name" : "authn_req", "Method" : "REGISTER", "Auth-Nonce" : "$adn", "Auth-Realm" : "$fd", "Auth-User" : "$fU", "From" : "$fu", "To" : "$tu", "Orig-IP" : "$si", "Orig-Port" : "$sp", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci" }); $var(amqp_routing_key) = "authn.req." + $(fd{kz.encode}); $avp(kz_timeout) = 2500; if (!t_newtran()) { @@ -280,7 +280,7 @@ route[SAVE_LOCATION] $var(ip) = "[" + $Ri + "]"; } - $var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Status" : "$var(Status)", "Event-Timestamp" : $TS, "Expires" : $(var(expires){s.int}), "First-Registration" : $var(new_reg), "Contact" : "$(ct{s.escape.common}{s.replace,\','})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','})" , "Custom-Channel-Vars" : $xavp(ulattrs=>custom_channel_vars), "Proxy-Path" : "sip:$var(ip)", "RUID" : "$xavp(ulrcd=>ruid)", "Source-IP": "$si", "Source-Port": "$sp" }); + $var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Status" : "$var(Status)", "Event-Timestamp" : $TS, "Expires" : $(var(expires){s.int}), "First-Registration" : $var(new_reg), "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" , "Custom-Channel-Vars" : $xavp(ulattrs=>custom_channel_vars), "Proxy-Path" : "sip:$var(ip)", "RUID" : "$xavp(ulrcd=>ruid)", "Source-IP": "$si", "Source-Port": "$sp" }); $var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $(fU{kz.encode}); kazoo_publish("registrar", $var(amqp_routing_key), $var(amqp_payload_request));