diff --git a/kamailio/db_queries_kazoo.cfg b/kamailio/db_queries_kazoo.cfg index 89018b7..7eba570 100644 --- a/kamailio/db_queries_kazoo.cfg +++ b/kamailio/db_queries_kazoo.cfg @@ -1,7 +1,7 @@ ####### Database queries ######## #!substdef "!KZQ_CHECK_MEDIA_SERVER_INSERT!insert into dispatcher (setid, destination) select \$var(SetId), \"\$var(MediaUrl)\" where not exists(select * from dispatcher where destination = \"\$var(MediaUrl)\")!g" #!substdef "!KZQ_COUNT_ALL_SUBSCRIBERS!select a.event, count(distinct watcher_username || \"@\" || watcher_domain) count_unique, count(*) count from event_list a, active_watchers b where b.event = a.event group by a.event!g" -#!substdef "!KZQ_COUNT_PRESENTITIES!select event, (select count(*) from presentity b where username = \"\$(kzE{kz.json,From}{uri.user})\" and domain = \"\$(kzE{kz.json,From}{uri.domain})\" and b.event = a.event) count from event_list a!g" +#!substdef "!KZQ_COUNT_PRESENTITIES!select event, (select count(*) from presentity b where username = \"\$(var(payload){kz.json,From}{uri.user})\" and domain = \"\$(var(payload){kz.json,From}{uri.domain})\" and b.event = a.event) count from event_list a!g" #!substdef "!KZQ_COUNT_SUBSCRIBERS!select event, (select count(*) from active_watchers b where presentity_uri = \"\$var(presentity)\" and b.event = a.event) count from event_list a!g" #!substdef "!KZQ_EVENT_PRESENCE_RESET_DELETE!delete from presentity where domain=\"\$(kzE{kz.json,Realm})\" and username = \"\$(kzE{kz.json,Username})\"!g" #!substdef "!KZQ_HANDLE_NEW_SUBSCRIBE_DELETE1!delete from active_watchers where callid = \"\$ci\"!g" @@ -11,3 +11,4 @@ #!substdef "!KZQ_RESET_PUBLISHER_UPDATE!update active_watchers set expires = \$TS where id in (select b.id from presentity a inner join active_watchers b on a.username = b.to_user and a.domain = b.to_domain and a.event = b.event where a.sender = \"\$var(MediaUrl)\")!g" #!substdef "!KZQ_PRESENCE_SEARCH_DETAIL!select * from active_watchers_log where presentity_uri = \"\$var(presentity_uri)\"!g" #!substdef "!KZQ_PRESENCE_SEARCH_SUMMARY!select * from active_watchers where watcher_domain = \"\$var(Domain)\"!g" +#!substdef "!KZQ_HAS_PRESENTITY!select count(*) as count from presentity where username = \"\$subs(to_user)\" and domain = \"\$subs(to_domain)\" and event = \"\$subs(event)\"!g" diff --git a/kamailio/defs.cfg b/kamailio/defs.cfg index 5b38bc5..35cecc1 100644 --- a/kamailio/defs.cfg +++ b/kamailio/defs.cfg @@ -83,4 +83,20 @@ #!endif #!endif +#!ifndef MEDIA_SERVERS_HASH_SIZE +#!substdef "!MEDIA_SERVERS_HASH_SIZE!256!g" +#!endif + +#!ifndef KZ_PRESENCE_AMQP_PUBLISH +#!define KZ_PRESENCE_AMQP_PUBLISH 0 +#!endif + +#!ifndef KZ_PRESENCE_REQUEST_RESUBSCRIBE_PROBE +#!define KZ_PRESENCE_REQUEST_RESUBSCRIBE_PROBE 0 +#!endif + +#!ifndef KZ_PRESENCE_REQUEST_PROBE +#!define KZ_PRESENCE_REQUEST_PROBE 1 +#!endif + # vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab diff --git a/kamailio/nodes-role.cfg b/kamailio/nodes-role.cfg index fc3de7b..3fb5e29 100644 --- a/kamailio/nodes-role.cfg +++ b/kamailio/nodes-role.cfg @@ -5,7 +5,7 @@ #!endif #!ifndef NODES_FUDGE_FACTOR -#!define NODES_FUDGE_FACTOR 3 +#!define NODES_FUDGE_FACTOR 10 #!endif modparam("htable", "htable", "nodes=>size=8;initval=0;autoexpire=180;"); diff --git a/kamailio/presence-reset.cfg b/kamailio/presence-reset.cfg index 7a8193d..bad71a8 100644 --- a/kamailio/presence-reset.cfg +++ b/kamailio/presence-reset.cfg @@ -40,6 +40,7 @@ route[RESET_PUBLISHER] route[RESET_ALL] { + xlog("L_INFO", "$var(Msg-ID)|reset|received presence reset ALL\n"); sql_query("exec", "delete from presentity"); $var(presentities) = $sqlrows(exec); xlog("L_INFO", "$(kzE{kz.json,Msg-ID})|reset|removed $var(presentities) presentities\n"); diff --git a/kamailio/presence-role.cfg b/kamailio/presence-role.cfg index 9393ff3..81af925 100644 --- a/kamailio/presence-role.cfg +++ b/kamailio/presence-role.cfg @@ -53,7 +53,9 @@ modparam("nat_traversal", "keepalive_state_file", "KAZOO_DATA_DIR/keep_alive_sta modparam("nat_traversal", "keepalive_interval", 45) #!endif -kazoo.presence_sync_amqp = 0 descr "sync subscriptions to amqp" +kazoo.presence_sync_amqp = KZ_PRESENCE_AMQP_PUBLISH descr "sync subscriptions to amqp" +kazoo.presence_request_probe = KZ_PRESENCE_REQUEST_PROBE descr "request probe for new subscriptions" +kazoo.presence_request_resubscribe_probe = KZ_PRESENCE_REQUEST_RESUBSCRIBE_PROBE descr "request probe for resubscriptions" #!ifdef FAST_PICKUP_ROLE #!include_file "fast-pickup-role.cfg" @@ -158,8 +160,9 @@ route[HANDLE_NEW_SUBSCRIBE] } if (handle_subscribe()) { - route(SUBSCRIBE_AMQP); xlog("L_INFO","$ci|end|new $hdr(Event) subscription from $fU to $tU in realm $fd : $sht(first=>$ci) : $sht(first=>$fU::$tU::$fd::$hdr(Event))\n"); + route(SUBSCRIBE_AMQP); + route(REQUEST_PROBE); } else { xlog("L_INFO", "$ci|stop|error $T_reply_code for new $hdr(Event) subscription from $fU to $tU in realm $fd\n"); } @@ -178,6 +181,37 @@ route[SUBSCRIBE_AMQP] } } +route[REQUEST_PROBE] +{ + if( (@cfg_get.kazoo.presence_request_probe == 1 && (!has_totag())) + || (@cfg_get.kazoo.presence_request_resubscribe_probe == 1 && has_totag()) ) { + if( route(HAS_PRESENTITY) == 0) { + if($hdr(event) == "message-summary") { + $var(mwi) = $tU; + route(REQUEST_MWI); + } else { + if($tU =~ "\*98") { + $var(mwi) = $(tU{s.substr,3,0}); + route(REQUEST_MWI); + } else { + xlog("L_INFO", "$ci|sub|requesting $hdr(Event) probe for $subs(to_user) in realm $subs(to_domain)\n"); + $var(rk) = "probes." + $hdr(Event); + $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "probe", "Event-Package" : "$hdr(event)", "Username" : "$tU", "Realm" : "$fd", "Call-ID" : "$ci"}); + kazoo_publish("presence", "$var(rk)", $var(amqp_payload_request)); + } + } + } + } +} + +route[REQUEST_MWI] +{ + xlog("L_INFO", "$ci|sub|requesting mwi probe for $var(mwi) in realm $subs(to_domain)\n"); + $var(rk) = "mwi_queries." + $(subs(to_domain){kz.encode}); + $var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "mwi_query", "Username" : "$var(mwi)", "Realm" : "$fd", "Call-ID" : "$ci"}); + kazoo_publish("presence", "$var(rk)", $var(amqp_payload_request)); +} + route[HANDLE_PUBLISH] { if (is_method("PUBLISH")) { @@ -194,6 +228,20 @@ route[HANDLE_PUBLISH] } } +route[HAS_PRESENTITY] +{ + $var(Query) = $_s(KZQ_HAS_PRESENTITY); + $var(res) = 0; + if (sql_xquery("cb", "$var(Query)", "subs") == 1) + { + if($xavp(subs) != $null) { + $var(res) = $xavp(subs=>count); + pv_unset("$xavp(subs)"); + } + } + return $var(res); +} + route[COUNT_PRESENTITIES] { $var(Query) = $_s(KZQ_COUNT_PRESENTITIES); @@ -261,32 +309,12 @@ event_route[kazoo:consumer-event-presence-dialog-update] #!ifdef FAST_PICKUP_ROLE route(FAST_PICKUP_INIT); #!endif - + $var(presentity) = $(kzE{kz.json,From}); - - if($(kzE{kz.json,State}) == "terminated") { - route(COUNT_PRESENTITIES); - } else { - route(COUNT_SUBSCRIBERS); - } + $var(payload) = $kzE; + + route(PRESENCE_UPDATE); - if($xavp(watchers=>dialog) > 0) { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing $(kzE{kz.json,From}) dialog update for $xavp(watchers=>dialog) watchers\n"); - kazoo_pua_publish_dialoginfo($var(JObj)); - pres_refresh_watchers("$(kzE{kz.json,From})", "dialog", 1); - } else { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip dialog update for $(kzE{kz.json,From})\n"); - } - - - if($xavp(watchers=>presence) > 0) { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing $(kzE{kz.json,From}) presence update for $xavp(watchers=>presence) watchers\n"); - kazoo_pua_publish_presence($kzE); - pres_refresh_watchers("$(kzE{kz.json,From})", "presence", 1); - } else { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip presence update for $(kzE{kz.json,From})\n"); - } - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|finished processing $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State}) from $(kzE{kz.json,Switch-URI}) at $(kzE{kz.json,AMQP-Received})/$var(Now)/$TS\n"); } @@ -301,40 +329,82 @@ event_route[kazoo:consumer-event-presence-mwi-update] kazoo_pua_publish_mwi($kzE); pres_refresh_watchers("$(kzE{kz.json,From})", "message-summary", 1); } else { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip message-summary update for $(kzE{kz.json,From})\n"); + xlog("L_DEBUG", "$(kzE{kz.json,Call-ID})|log|skip message-summary update for $(kzE{kz.json,From})\n"); } + + route(MWI_AS_PRESENCE); + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|finished processing message-summary update for $(kzE{kz.json,From}) light should be on ? $(kzE{kz.json,Messages-Waiting}) at $(kzE{kz.json,AMQP-Received})/$var(Now)/$TS\n"); + } event_route[kazoo:consumer-event-presence-update] { xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received presence update for $(kzE{kz.json,Presence-ID}) : $kzE\n"); - + $var(JObj) = $kzE; $var(presentity) = $_s(sip:$(kzE{kz.json,Presence-ID})); + $var(payload) = $kzE; + route(PRESENCE_UPDATE); - if($(kzE{kz.json,State}) == "terminated") { + xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|finished processing $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State}) from $(kzE{kz.json,Switch-URI}) at $(kzE{kz.json,AMQP-Received})/$var(Now)/$TS\n"); +} + +route[PRESENCE_UPDATE] +{ + if($(var(payload){kz.json,State}) == "terminated") { route(COUNT_PRESENTITIES); } else { route(COUNT_SUBSCRIBERS); } - + if($xavp(watchers=>dialog) > 0) { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing $var(presentity) dialog update for $xavp(watchers=>dialog) watchers\n"); - kazoo_pua_publish_dialoginfo($kzE); + xlog("L_INFO", "$(var(payload){kz.json,Call-ID})|log|publishing $var(presentity) dialog update for $xavp(watchers=>dialog) watchers\n"); + kazoo_pua_publish_dialoginfo($var(JObj)); pres_refresh_watchers("$var(presentity)", "dialog", 1); } else { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip dialog update for $var(presentity)\n"); + xlog("L_DEBUG", "$(var(payload){kz.json,Call-ID})|log|skip dialog update for $var(presentity)\n"); } - + if($xavp(watchers=>presence) > 0) { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing $var(presentity) presence update for $xavp(watchers=>presence) watchers\n"); - kazoo_pua_publish_presence($kzE); + xlog("L_INFO", "$(var(payload){kz.json,Call-ID})|log|publishing $var(presentity) presence update for $xavp(watchers=>presence) watchers\n"); + kazoo_pua_publish_presence($var(payload)); pres_refresh_watchers("$var(presentity)", "presence", 1); } else { - xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip presence update for $var(presentity)\n"); + xlog("L_DEBUG", "$(kzE{kz.json,Call-ID})|log|skip presence update for $var(presentity)\n"); } } +#!define MWI_PRESENCE_BODY $(kzE{re.subst,/"Messages-Waiting"\s*\:\s*"[^"]*"/"State" : "$var(State)"/} \ +{re.subst,/"From"\s*\:\s*"[^"]*"/"From" : "$var(presentity)"/} \ +{re.subst,/"From-User"\s*\:\s*"[^"]*"/"From-User" : "$var(user)"/} \ +{re.subst,/"To"\s*\:\s*"[^"]*"/"To" : "$var(presentity)"/} \ +{re.subst,/"To-User"\s*\:\s*"[^"]*"/"To-User" : "$var(user)"/} \ +{re.subst,/"Messages-New"\s*\:\s*[^,]*/"Direction" : "initiator"/} \ +{re.subst,/"Event-Name"\s*\:\s*"[^"]*"/"Event-Name" : "presence"/}); + + +route[MWI_AS_PRESENCE] +{ + if( $(kzE{kz.json,Extended-Presence-ID}) == "" ) { + return; + } + + $var(realm) = $(kzE{kz.json,From-Realm}); + $var(user) = $(kzE{kz.json,Extended-Presence-ID}); + $var(presentity) = $_s(sip:$var(user)@$var(realm)); + if( $(kzE{kz.json,Messages-Waiting}) == "yes" ) { + $var(State) = "confirmed"; + } else { + $var(State) = "terminated"; + } + + $var(payload) = MWI_PRESENCE_BODY + $var(JObj) = $var(payload); + + route(PRESENCE_UPDATE); + +} + #!include_file "presence-reset.cfg" route[PRESENCE_BINDINGS] @@ -342,13 +412,14 @@ route[PRESENCE_BINDINGS] #!import_file "presence-custom-bindings.cfg" #!ifndef PRESENCE_CUSTOM_BINDINGS - $var(payload) = "{ 'exchange' : 'presence', 'type' : 'topic', 'queue' : 'presence-dialog-MY_HOSTNAME', 'routing' : 'dialog.*.*', 'exclusive' : 0, 'federate' : 1 }"; + $var(payload) = $_s({ "exchange" : "presence", "type" : "topic", "queue" : "presence-dialog-MY_HOSTNAME", "routing" : "dialog.*.*", "exclusive" : 0, "federate" : 1 }); kazoo_subscribe("$var(payload)"); + - $var(payload) = "{ 'exchange' : 'presence', 'type' : 'topic', 'queue' : 'presence-presence-MY_HOSTNAME', 'routing' : 'update.*.*', 'exclusive' : 0, 'federate' : 1 }"; + $var(payload) = $_s({ "exchange" : "presence", "type" : "topic", "queue" : "presence-presence-MY_HOSTNAME", "routing" : "update.*.*", "exclusive" : 0, "federate" : 1 }); kazoo_subscribe("$var(payload)"); - $var(payload) = "{ 'exchange' : 'presence', 'type' : 'topic', 'queue' : 'presence-mwi-MY_HOSTNAME', 'routing' : 'mwi_updates.*', 'exclusive' : 0 , 'federate' : 1 }"; + $var(payload) = $_s({ "exchange" : "presence", "type" : "topic", "queue" : "presence-mwi-MY_HOSTNAME", "routing" : "mwi_updates.*.*", "exclusive" : 0 , "federate" : 1 }); kazoo_subscribe("$var(payload)"); #!endif diff --git a/kamailio/registrar-role.cfg b/kamailio/registrar-role.cfg index 83df339..8dffeb0 100644 --- a/kamailio/registrar-role.cfg +++ b/kamailio/registrar-role.cfg @@ -254,7 +254,7 @@ route[SAVE_LOCATION] } } - if(@contact.expires) { + if(@hf_value_exists.contact.expires == "1") { $var(expires) = @contact.expires; } else { if(is_present_hf("Expires")) {