Browse Source

refactor presence role (#136)

* refactor presence role

* Update presence-role.cfg

* Update presence_notify_sync-role.cfg

* Update presence-role.cfg

* Update registrar-role.cfg

* Update presence_notify_sync-role.cfg

* typo

* remove trans

* Update presence-role.cfg

* Update presence-role.cfg

* Update presence_notify_sync-role.cfg

* Update presence-role.cfg

* Update presence-role.cfg

* Update presence-role.cfg

* update presence

* update local

* update bindings

* Update presence-role.cfg

* bring back comments on defaults

* remove PRESENCE_NAT role from definitions

* outbound should be loaded before stun

* Update presence-role.cfg

* add a threshold for 408 and other errors

* handle summary & detail queries

* Update accounting-role.cfg

* Update default.cfg

* fix call-id in logging

* Update fast-pickup-role.cfg
4.1
lazedo 9 years ago
committed by bitbashing
parent
commit
10e5707628
9 changed files with 306 additions and 94 deletions
  1. +3
    -3
      kamailio/accounting-role.cfg
  2. +22
    -23
      kamailio/default.cfg
  3. +5
    -1
      kamailio/defs.cfg
  4. +8
    -3
      kamailio/fast-pickup-role.cfg
  5. +0
    -1
      kamailio/local.cfg
  6. +158
    -30
      kamailio/presence-role.cfg
  7. +40
    -13
      kamailio/presence_notify_sync-role.cfg
  8. +68
    -18
      kamailio/presence_query-role.cfg
  9. +2
    -2
      kamailio/registrar-role.cfg

+ 3
- 3
kamailio/accounting-role.cfg View File

@ -1,8 +1,8 @@
####### Flags #######
flags
FLAG_ACC: 7,
FLAG_ACCMISSED: 8,
FLAG_ACCFAILED: 9;
FLAG_ACC: 8,
FLAG_ACCMISSED: 9,
FLAG_ACCFAILED: 10;
######## Accounting module ########
loadmodule "acc.so"


+ 22
- 23
kamailio/default.cfg View File

@ -63,17 +63,21 @@ tcp_wq_blk_size = 2100
tcp_wq_max = 10485760
####### UDP Parameters #########
udp4_raw = -1
udp4_raw = 0
#udp4_raw_mtu = 800
# # pmtu_discovery = no
#udp_mtu = 800
# #udp_mtu_try_proto = TCP
####### DNS Parameters #########
dns = no
rev_dns = no
dns_try_ipv6 = no
use_dns_cache = on
dns_cache_del_nonexp = no
dns_cache_del_nonexp = yes
dns_cache_flags = 1
dns_cache_gc_interval = 120
dns_cache_init = 1
dns_cache_init = 0
dns_cache_mem = 1000
dns_cache_negative_ttl = 60
dns_try_naptr = no
@ -86,12 +90,12 @@ disable_sctp = yes
####### Modules Section ########
mpath="/usr/lib64/kamailio/modules/:/usr/lib/x86_64-linux-gnu/kamailio/modules/"
######## Kamailio stun module ########
loadmodule "stun.so"
######## Kamailio outbound module ########
loadmodule "outbound.so"
######## Kamailio stun module ########
loadmodule "stun.so"
######## Kamailio path module ########
loadmodule "path.so"
@ -167,6 +171,8 @@ loadmodule "uac_redirect.so"
loadmodule "db_text.so"
modparam("db_text", "db_mode", 1)
modparam("db_text", "emptystring", 1)
modparam("db_text", "default_connection", "KAZOO_DB_URL")
####### Kazoo Integration module ##########
loadmodule "kazoo.so"
@ -795,7 +801,6 @@ onsend_route {
event_route[kazoo:mod-init]
{
#!ifdef PRESENCE_ROLE
### use this simple form of binding a listener
### kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME");
@ -814,7 +819,15 @@ event_route[kazoo:mod-init]
###
###
$var(payload) = '{ "exchange" : "dialoginfo" , "type" : "direct", "queue" : "BLF-QUEUE-MY_HOSTNAME", "routing" : "BLF-MY_HOSTNAME", "auto_delete" : 0, "durable" : 1, "no_ack" : 0, "wait_for_consumer_ack" : 1 }';
#!ifdef PRESENCE_ROLE
$var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-dialog-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'dialog.*.*', 'federate' : 1 }";
kazoo_subscribe("$var(payload)");
$var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-presence-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'update.*.*', 'federate' : 1 }";
kazoo_subscribe("$var(payload)");
$var(payload) = "{ 'exchange' : 'presence' , 'queue' : 'presence-mwi-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'mwi_updates.*', 'federate' : 1 }";
kazoo_subscribe("$var(payload)");
#!endif
@ -841,16 +854,9 @@ event_route[kazoo:mod-init]
#!endif
#!ifdef PRESENCE_SYNC_ROLE
$var(payload) = "{ 'exchange' : 'presence' , 'type' : 'topic', 'queue' : 'PRESENCE-QUEUE-MY_HOSTNAME', 'routing' : 'sync', 'auto_delete' : 1, 'durable' : 0, 'no_ack' : 1, 'wait_for_consumer_ack' : 0 }";
kazoo_subscribe("$var(payload)");
#!endif
#!ifdef PRESENCE_QUERY_ROLE
$var(payload) = "{ 'exchange' : 'omnipresence' , 'type' : 'topic', 'routing' : 'presence.search_req.*', 'federate' : 1 }";
$var(payload) = "{ 'exchange' : 'presence' , 'type' : 'topic', 'queue' : 'presence-search-MY_HOSTNAME', 'exclusive' : 0, 'routing' : 'presence.search_req.*', 'federate' : 1 }";
kazoo_subscribe("$var(payload)");
#!endif
@ -869,13 +875,6 @@ event_route[kazoo:mod-init]
#!endif
#!ifdef PRESENCE_BROADCAST_ROLE
$var(payload) = "{ 'exchange' : 'omnipresence' , 'queue' : 'PRESENCE-BROADCAST-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'presence.update' }";
kazoo_subscribe("$var(payload)");
#!endif
}
event_route[kazoo:consumer-event]


+ 5
- 1
kamailio/defs.cfg View File

@ -3,8 +3,12 @@
####### defs ########
#!ifndef KAZOO_DATA_DIR
#!substdef "!KAZOO_DATA_DIR!/etc/kazoo/kamailio/dbtext!g"
#!endif
#!ifndef KAZOO_DB_URL
#!substdef "!KAZOO_DB_URL!text:///etc/kazoo/kamailio/dbtext!g"
#!substdef "!KAZOO_DB_URL!text://KAZOO_DATA_DIR!g"
#!endif
#!ifndef MAX_WHILE_LOOPS


+ 8
- 3
kamailio/fast-pickup-role.cfg View File

@ -36,17 +36,22 @@ route[FAST_PICKUP_ATTEMPT]
}
if($var(replaced_call_id) != "none") {
xlog("L_INFO", "$ci|log|replaces call-id $var(replaced_call_id)\n");
xlog("L_INFO", "$ci|log|request has replaces call-id $var(replaced_call_id)\n");
$var(amqp_payload_request) = '{"Event-Category" : "call_event" , "Event-Name" : "channel_status_req", "Call-ID" : "' + $var(replaced_call_id) + '", "Active-Only" : true }';
$var(amqp_routing_key) = "call.status_req." + $(var(replaced_call_id){kz.encode});
sl_send_reply("100", "Attempting K query");
sl_send_reply("100", "locating your call");
xlog("L_INFO", "$ci|log|querying cluster for the location of call-id $var(replaced_call_id)\n");
if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request))) {
$du = $(kzR{kz.json,Switch-URL});
if($du != $null) {
xlog("L_INFO", "$ci|log|call-id $var(replaced_call_id) found redirecting call to $du, courtesy of kazoo\n");
xlog("L_INFO", "$ci|log|call-id $var(replaced_call_id) found redirecting call to $du\n");
route(EXTERNAL_TO_INTERNAL_RELAY);
exit();
} else {
remove_hf_re("^Replaces");
}
} else {
remove_hf_re("^Replaces");
}
}


+ 0
- 1
kamailio/local.cfg View File

@ -28,7 +28,6 @@ debug = L_INFO
# # #!trydef REGISTRAR_SYNC_ROLE
# # #!trydef PRESENCE_SYNC_ROLE
# # #!trydef PRESENCE_NOTIFY_SYNC_ROLE
# # #!trydef PRESENCE_BROADCAST_ROLE
################################################################################
## SERVER INFORMATION


+ 158
- 30
kamailio/presence-role.cfg View File

@ -4,6 +4,7 @@
#!trydef PRESENCE_MAX_EXPIRES 3600
modparam("htable", "htable", "p=>size=32;autoexpire=3600;")
modparam("htable", "htable", "first=>size=32;autoexpire=3600;initval =0;updateexpire=1")
loadmodule "presence.so"
loadmodule "presence_dialoginfo.so"
@ -26,58 +27,134 @@ modparam("presence", "min_expires", PRESENCE_MIN_EXPIRES)
modparam("presence", "max_expires", PRESENCE_MAX_EXPIRES)
modparam("presence", "sip_uri_match", 1)
modparam("presence", "waitn_time", 1)
modparam("presence", "notifier_processes", 5)
modparam("presence", "notifier_processes", 10)
modparam("presence", "db_url", "KAZOO_DB_URL")
modparam("presence", "xavp_cfg", "pres")
modparam("presence", "local_log_level", 6)
modparam("presence", "startup_mode", 0)
modparam("presence", "force_delete", 1)
modparam("presence", "timeout_rm_subs", 0)
modparam("presence", "cseq_offset", 1)
modparam("kazoo", "db_url", "KAZOO_DB_URL")
modparam("kazoo", "pua_mode", 1)
#!ifdef NAT_TRAVERSAL_ROLE
#!ifndef NAT_TRAVERSAL_LOADED
#!trydef NAT_TRAVERSAL_LOADED
loadmodule "nat_traversal.so"
#!endif
modparam("nat_traversal", "keepalive_method", "OPTIONS")
modparam("nat_traversal", "keepalive_from", "sip:sipcheck@MY_HOSTNAME")
modparam("nat_traversal", "keepalive_state_file", "KAZOO_DATA_DIR/keep_alive_state")
modparam("nat_traversal", "keepalive_interval", 45)
#!endif
####### SQL OPS module ##########
#!ifndef SQLOPS_LOADED
loadmodule "sqlops.so"
#!trydef SQLOPS_LOADED
#!endif
modparam("sqlops","sqlcon", "exec=>KAZOO_DB_URL")
route[PRESENCE_NAT]
{
if (client_nat_test("3")) {
fix_contact();
}
nat_keepalive();
force_rport();
}
####### Presence Logic ########
route[HANDLE_SUBSCRIBE]
{
if (is_method("SUBSCRIBE")) {
#!ifdef NAT_TRAVERSAL_ROLE
route(NAT_TEST_AND_CORRECT);
#!endif
if (!is_method("SUBSCRIBE")) {
return;
}
#!ifdef NAT_TRAVERSAL_ROLE
route(PRESENCE_NAT);
#!endif
if(has_totag()) {
route(HANDLE_RESUBSCRIBE);
} else {
route(HANDLE_NEW_SUBSCRIBE);
}
exit;
}
route[HANDLE_RESUBSCRIBE]
{
loose_route();
if(handle_subscribe()) {
if($subs(remote_cseq) < 5) {
$sht(first=>$subs(callid)) = $null;
$sht(first=>$subs(from_user)::$subs(to_user)::$subs(from_domain)::$subs(event)) = $null;
}
route(SUBSCRIBE_AMQP);
};
}
route[HANDLE_NEW_SUBSCRIBE]
{
if ($hdr(Event) == "dialog"
|| $hdr(Event) == "presence"
|| $hdr(Event) == "message-summary") {
if ($tU == $null) {
xlog("L_INFO", "$ci|stop|ignoring subscribe with empty TO username from a $ua\n");
sl_send_reply(400, "Missing TO username");
exit;
send_reply(400, "Missing TO username");
return;
}
if ($fU == $null) {
xlog("L_INFO", "$ci|stop|ignoring subscribe with empty FROM username from a $ua\n");
sl_send_reply(400, "Missing FROM username");
exit;
send_reply(400, "Missing FROM username");
return;
}
if (!handle_subscribe()) {
xlog("L_INFO", "$ci|stop|unsupported subsribe\n");
exit;
if($shtinc(first=>$ci) > 1) {
sql_query("exec", 'delete from active_watchers where callid = "$ci"');
xlog("L_INFO", "$ci|subscribe|resetting $hdr(Event) subscription from $fU to $tU in realm $fd : $sqlrows(exec)\n");
} else {
if($shtinc(first=>$fU::$tU::$fd::$hdr(Event)) > 1) {
sql_query("exec", 'delete from active_watchers where watcher_username="$fU" and to_user="$tU" and watcher_domain="$fd" and event="$hdr(Event)"');
xlog("L_INFO", "$ci|subscribe|resetting $hdr(Event) subscription from $fU to $tU in realm $fd : $sqlrows(exec)\n");
}
}
$var(Expires) = $hdr(Expires);
if($var(Expires) < PRESENCE_MIN_EXPIRES) {
$var(Expires) = PRESENCE_MIN_EXPIRES;
} else if($var(Expires) > PRESENCE_MAX_EXPIRES) {
$var(Expires) = PRESENCE_MAX_EXPIRES;
}
##RabbitMQ
$var(fs_path) = "%3C" + $rz + "%3A" + $Ri + "%3A" + $Rp + "%3Btransport=" + $proto + "%3Blr%3Breceived=" + $si+":"+$sp+"%3E";
$var(fs_contact) = "<" + $(ct{tobody.uri}) + ";fs_path=" + $var(fs_path) + ">";
if($(ct{tobody.params}) != $null) {
$var(fs_contact) = $var(fs_contact) + ";" + $(ct{tobody.params});
if (handle_subscribe()) {
route(SUBSCRIBE_AMQP);
xlog("L_INFO","$ci|end|new $hdr(Event) subscription from $fU to $tU in realm $fd : $sht(first=>$ci) : $sht(first=>$fU::$tU::$fd::$hdr(Event))\n");
} else {
xlog("L_INFO", "$ci|stop|error $T_reply_code for new $hdr(Event) subscription from $fU to $tU in realm $fd\n");
}
} else {
xlog("L_INFO", "$ci|stop|unsupported subscription package $hdr(Event) from $fU to $tU in realm $fd\n");
send_reply(489, "Bad Event");
}
}
$var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : "$var(Expires)", "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common})" }';
route[SUBSCRIBE_AMQP]
{
$var(Expires) = $hdr(Expires);
if($var(Expires) < PRESENCE_MIN_EXPIRES) {
$var(Expires) = PRESENCE_MIN_EXPIRES;
} else if($var(Expires) > PRESENCE_MAX_EXPIRES) {
$var(Expires) = PRESENCE_MAX_EXPIRES;
}
kazoo_publish("dialoginfo_subs", "dialoginfo_subs", $var(amqp_payload_request));
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "subscription", "Event-Package" : "$hdr(event)", "Expires" : "$var(Expires)", "Queue" : "BLF-MY_HOSTNAME", "Server-ID" : "BLF-MY_HOSTNAME" , "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "From" : "$fu", "User" : "$subs(uri)", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
kazoo_publish("dialoginfo_subs", "dialoginfo_subs", $var(amqp_payload_request));
exit;
}
}
route[HANDLE_PUBLISH]
@ -96,10 +173,57 @@ route[HANDLE_PUBLISH]
}
}
event_route[kazoo:consumer-event-presence-dialog-update]
{
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State})\n");
if(pres_has_subscribers("$(kzE{kz.json,From})", "dialog")) {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing dialog update for $(kzE{kz.json,From})\n");
kazoo_pua_publish_dialoginfo($kzE);
pres_refresh_watchers("$(kzE{kz.json,From})", "dialog", 1);
} else {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip dialog update for $(kzE{kz.json,From})\n");
}
if(pres_has_subscribers("$(kzE{kz.json,From})", "presence")) {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing presence update for $(kzE{kz.json,From})\n");
kazoo_pua_publish_presence($kzE);
pres_refresh_watchers("$(kzE{kz.json,From})", "presence", 1);
} else {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip presence update for $(kzE{kz.json,From})\n");
}
}
event_route[kazoo:consumer-event-presence-mwi-update]
{
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received message-summary update for $(kzE{kz.json,From})\n");
if(pres_has_subscribers("$(kzE{kz.json,From})", "message-summary")) {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing message-summary update for $(kzE{kz.json,From})\n");
kazoo_pua_publish_mwi($kzE);
pres_refresh_watchers("$(kzE{kz.json,From})", "message-summary", 1);
} else {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip message-summary update for $(kzE{kz.json,From})\n");
}
}
event_route[kazoo:consumer-event-presence-update]
{
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|received presence update for $(kzE{kz.json,From})\n");
if(pres_has_subscribers("$(kzE{kz.json,From})", "presence")) {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|publishing presence update for $(kzE{kz.json,From})\n");
kazoo_pua_publish_presence($kzE);
pres_refresh_watchers("$(kzE{kz.json,From})", "presence", 1);
} else {
xlog("L_INFO", "$(kzE{kz.json,Call-ID})|log|skip presence update for $(kzE{kz.json,From})\n");
}
}
route[HANDLE_PRESENCE_UPDATE]
{
$var(call-id) = $(kzE{kz.json,Call-ID});
if( $(kzE{kz.json,Event-Package}) == "dialog") {
if( $(kzE{kz.json,Event-Package}) == "dialog") {
if($sht(p=>$var(call-id)) != $(kzE{kz.json,State}) || $(kzE{kz.json,Flush-Level}) != $null) {
xlog("L_INFO", "$(kzE{kz.json,Target-Call-ID})|log|received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State}) $kzE\n");
$sht(p=>$(kzE{kz.json,Call-ID})) = $(kzE{kz.json,State});
@ -108,10 +232,14 @@ event_route[kazoo:consumer-event-presence-update]
#!endif
kazoo_pua_publish($kzE);
pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1);
$var(Presence) = $(kzE{re.subst,/"Event-Package": "dialog"/"Event-Package": "presence"/g});
xlog("L_INFO", "PRESENCE $var(Presence)");
kazoo_pua_publish($var(Presence));
pres_refresh_watchers("$(var(Presence){kz.json,From})", "$(var(Presence){kz.json,Event-Package})", 1);
} else {
xlog("L_INFO", "$var(call-id)|log|received duplicate $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) state $(kzE{kz.json,State}) $kzE\n");
xlog("L_INFO", "$var(call-id)|log|payload $kzE\n");
}
}
} else {
xlog("L_INFO", "$var(call-id)|log|received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From}) $kzE\n");
kazoo_pua_publish($kzE);


+ 40
- 13
kamailio/presence_notify_sync-role.cfg View File

@ -2,17 +2,35 @@ kazoo.presence_notify = 1 descr "enable/disable sending notify callback to omnip
kazoo.presence_notify_timeout = 3000 descr "timeout in ms waiting for notify reply"
kazoo.presence_notify_log_body = 0 descr "logs the body sent in the notification"
kazoo.presence_notify_log_resp_body = 0 descr "logs the body received from notification"
kazoo.presence_notify_log_to_table = 0 descr "logs notify/reply to active_watchers_log table"
kazoo.presence_notify_log_to_amqp = 1 descr "logs notify/reply to amqp"
kazoo.presence_notify_log_to_table = 1 descr "logs notify/reply to active_watchers_log table"
kazoo.presence_notify_log_to_amqp = 0 descr "logs notify/reply to amqp"
kazoo.presence_notify_record_route = 0 descr "add record route header to notify msg sent"
kazoo.presence_notify_log_init_body = 0 descr "logs the body before its sent"
kazoo.presence_notify_force_send_socket = 0 descr "forces the send socket to the contact"
######## Generic Hash Table container in shared memory ########
modparam("htable", "htable", "notify=>size=16;autoexpire=3600;updateexpire=1;initval=0")
#!trydef PRESENCE_NOTIFY_INIT
#!trydef MAX_NOTIFY_ERROR 5
route[PRESENCE_LOCAL_NOTIFY]
{
if($rm != "NOTIFY") {
return;
}
t_set_fr(@cfg_get.kazoo.presence_notify_timeout, @cfg_get.kazoo.presence_notify_timeout);
xlog("L_INFO", "$ci|log|init preparing notify to $du : $ruri");
xlog("L_INFO", "$ci|log|init preparing $subs(event) notify to $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri) : $du\n");
if(@cfg_get.kazoo.presence_notify_log_init_body == 1) {
xlog("L_INFO", "$ci|log|init|body $(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\n");
}
if(@cfg_get.kazoo.presence_notify_force_send_socket == 1) {
$fs = $_s($(pr{s.tolower}):$(hdr(Contact){nameaddr.uri}{uri.host}):$(hdr(Contact){nameaddr.uri}{uri.port}));
xlog("L_INFO", "$ci|log|init|forcing socket to $fs, $(pr{s.tolower}):$(hdr(Contact){nameaddr.uri}{uri.host}):$(hdr(Contact){nameaddr.uri}{uri.port}) , $ct\n");
}
if(@cfg_get.kazoo.presence_notify_record_route == 1) {
record_route();
}
}
@ -47,25 +65,34 @@ event_route[presence:notify-reply]
$xavp(pres=>delete_subscription) = 0;
if($notify_reply($rs) == 200) {
xlog("L_INFO", "$ci|end|notified $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)");
sht(notify=>$ci) = $null;
xlog("L_INFO", "$ci|end|notified $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)\n");
} else if($notify_reply($rs) == 481 && $subs(reason) == "timeout") {
xlog("L_INFO","$ci|end|sent subscription $hdr(Subscription-State)");
xlog("L_INFO","$ci|end|sent subscription $hdr(Subscription-State)\n");
} else {
if($rP != "UDP")
xlog("L_ERROR", "$ci|error|received $notify_reply($rs) when notifying $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)\n");
if($rP != "UDP") {
$xavp(pres=>delete_subscription) = 1;
xlog("L_ERROR", "$ci|error|received $notify_reply($rs) $subs(reason) when notifying $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)");
xlog("L_ERROR", "$ci|error|removing $rP watcher $subs(watcher_username)@$subs(watcher_domain) for $subs(pres_uri)\n");
} else {
$var(shtinc) = $shtinc(notify=>$ci::count);
if($var(shtinc) > MAX_NOTIFY_ERROR) {
$xavp(pres=>delete_subscription) = 1;
xlog("L_ERROR", "$ci|error|removing $rP watcher $subs(watcher_username)@$subs(watcher_domain) for $subs(pres_uri)\n");
}
}
}
if(@cfg_get.kazoo.presence_notify_log_body == 1)
xlog("L_INFO", "$ci|log|sent|body #012$(mb{s.escape.common}{s.replace,\','})#012");
xlog("L_INFO", "$ci|log|sent|body $(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\n");
if(@cfg_get.kazoo.presence_notify_log_resp_body == 1)
xlog("L_INFO", "$ci|log|resp|body #012$(notify_reply($mb){s.escape.common}{s.replace,\','})#012");
xlog("L_INFO", "$ci|log|resp|body $(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})\n");
if(@cfg_get.kazoo.presence_notify_log_to_amqp == 1) {
route(PRESENCE_NOTIFY_AMQP);
}
if(@cfg_get.kazoo.presence_notify_log_to_table == 1) {
$var(Query) = $_s(REPLACE active_watchers_log SET sent_msg="$(mb{s.escape.common}{s.replace,\','})", received_msg="$(notify_reply($mb){s.escape.common}{s.replace,\','})", time=$TS, result=$notify_reply($rs), user_agent="$(subs(user_agent){s.escape.common})", callid="$subs(callid)", to_user="$subs(to_user)", to_domain="$subs(to_domain)" where presentity_uri="$subs(uri)" watcher_username="$subs(watcher_username)" and watcher_domain="$subs(watcher_domain)" event="$subs(event)");
$var(Query) = $_s(REPLACE active_watchers_log SET sent_msg="$(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})", received_msg="$(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})", time=$TS, result=$notify_reply($rs), user_agent="$(subs(user_agent){s.escape.common}{s.replace,\','}{s.replace,$$,})", callid="$subs(callid)", to_user="$subs(to_user)", to_domain="$subs(to_domain)" where presentity_uri="$subs(uri)" watcher_username="$subs(watcher_username)" and watcher_domain="$subs(watcher_domain)" event="$subs(event)");
mq_add("presence_last_notity", "$subs(watcher_username)@$subs(watcher_domain)", "$var(Query)");
}
}
@ -77,7 +104,7 @@ route[PRESENCE_LOG_TIMER_ROUTE]
while(mq_fetch("presence_last_notity") == 1 && $var(runloop) < MAX_WHILE_LOOPS) {
# xlog("L_INFO", "Query : $mqv(presence_last_notity)");
if (sql_query("cb", "$mqv(presence_last_notity)") != 1) {
xlog("L_ERROR", "$ci|log|error updating active_watchers_log");
xlog("L_ERROR", "$ci|log|error updating active_watchers_log\n");
}
$var(runloop) = $var(runloop) + 1;
}
@ -86,8 +113,8 @@ route[PRESENCE_LOG_TIMER_ROUTE]
route[PRESENCE_NOTIFY_AMQP]
{
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$subs(event)", "Timestamp" : $TS, "Call-ID" : "$subs(callid)", "From" : "$fu", "To" : "$subs(to_user)@$subs(to_domain)", "Sent" : "$(TS{s.ftime,%Y-%m-%d %H:%M:%S})", "Body" : "Hostname : MY_HOSTNAME\r\nTimestamp : $(TS{s.ftime,%Y-%m-%d %H:%M:%S})\r\n$(mb{s.escape.common}{s.replace,\','})\r\nResponse\r\n$(notify_reply($mb){s.escape.common}{s.replace,\','})", "Sequence" : $cs, "Reply" : $notify_reply($rs) });
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$subs(event)", "Timestamp" : $TS, "Call-ID" : "$subs(callid)", "From" : "$fu", "To" : "$subs(to_user)@$subs(to_domain)", "Sent" : "$(TS{s.ftime,%Y-%m-%d %H:%M:%S})", "Body" : "Hostname : MY_HOSTNAME\r\nTimestamp : $(TS{s.ftime,%Y-%m-%d %H:%M:%S})\r\n$(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\r\nResponse\r\n$(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})", "Sequence" : $cs, "Reply" : $notify_reply($rs) });
$var(rk) = "notify." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode});
kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request));
xlog("L_INFO", "$ci|log|sent notify callback for event $subs(event) : $tu");
xlog("L_INFO", "$ci|log|sent notify callback for event $subs(event) : $tu\n");
}

+ 68
- 18
kamailio/presence_query-role.cfg View File

@ -3,53 +3,88 @@
####### SQL OPS module ##########
#!ifndef SQLOPS_LOADED
loadmodule "sqlops.so"
modparam("sqlops","sqlcon", "cb=>KAZOO_DB_URL")
#!trydef SQLOPS_LOADED
#!endif
modparam("sqlops","sqlcon", "cb=>KAZOO_DB_URL")
event_route[kazoo:consumer-event-presence-search-req]
route[PRESENCE_SEARCH_SUMMARY]
{
xlog("L_INFO", "received presence search_req $kzE\n");
xlog("L_INFO", "processing search subscriptions for $(kzE{kz.json,Realm})\n");
$var(Queue) = $(kzE{kz.json,Server-ID});
$var(Event) = $(kzE{kz.json,Event-Package});
$var(Domain) = $(kzE{kz.json,Realm});
$var(Username) = $(kzE{kz.json,Username});
$var(Now) = $TS;
$var(Items) = "";
$var(Query) = $_s(select * from presentity where domain = "$var(Domain)");
$var(Query) = $_s(select * from active_watchers where watcher_domain = "$var(Domain)");
if($var(Event) != "") {
$var(Query) = $var(Query) + $_s( and event = "$var(Event)");
}
if($var(Username) != "") {
$var(Query) = $var(Query) + $_s( and username = "$var(Username)");
$var(Query) = $var(Query) + $_s( and watcher_username = "$var(Username)");
}
$var(Query) = $var(Query) + " order by username, event, id";
xlog("L_INFO", "$ci| QUERY $var(Query)\n");
$var(Query) = $var(Query) + " order by to_user, event, watcher_username, callid";
xlog("L_DEBUG", "$ci| QUERY $var(Query)\n");
if (sql_xquery("cb", "$var(Query)", "ra") == 1)
{
$var(Subs) = "";
$var(Sep1) = "";
while($xavp(ra) != $null) {
$var(Username) = $xavp(ra=>username);
$var(Username) = $xavp(ra=>to_user);
$var(Sep2)="";
$var(Evt)="";
while($xavp(ra) != $null && $var(Username) == $xavp(ra=>username))
while($xavp(ra) != $null && $var(Username) == $xavp(ra=>to_user))
{
$var(Event) = $xavp(ra=>event);
$var(Sep3)="";
$var(Sub)="";
while($xavp(ra) != $null && $var(Username) == $xavp(ra=>username) && $var(Event) == $xavp(ra=>event))
$var(Count) = 0;
while($xavp(ra) != $null && $var(Username) == $xavp(ra=>to_user) && $var(Event) == $xavp(ra=>event))
{
$var(Sub) = $var(Sub) + $_s($var(Sep3)"$xavp(ra=>id)" : {"etag" : "$xavp(ra=>etag)", "body" : "$(xavp(ra=>body){s.escape.common})" });
$var(Sep3)=", ";
$var(Count) = $var(Count) + 1;
pv_unset("$xavp(ra)");
}
$var(Evt) = $var(Evt) + $var(Sep2) + $_s("$var(Event)") + " : { " + $var(Sub) + "}";
$var(Evt) = $var(Evt) + $var(Sep2) + $_s("$var(Event)" : $var(Count));
$var(Sep2)=", ";
}
$var(Usr) = $_s("$var(Username)") + " : { " + $var(Evt) + " }";
xlog("L_INFO", "$ci| RESULT \"Subscriptions\" : { $var(Usr) }\n");
$var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "search_partial_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Subscriptions" : { $var(Usr) } }';
kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request));
$var(Subs) = $var(Subs) + $var(Sep1) + $_s("$var(Username)") + " : { " + $var(Evt) + " }";
$var(Sep1)=", ";
}
}
xlog("L_DEBUG", "$ci| RESULT \"Subscriptions\" : { $var(Subs) }\n");
$var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "search_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Subscriptions" : { $var(Subs) } }';
kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request));
}
route[PRESENCE_SEARCH_DETAIL]
{
xlog("L_INFO", "processing status for $(kzE{kz.json,Username}) in realm $(kzE{kz.json,Realm})\n");
$var(Queue) = $(kzE{kz.json,Server-ID});
$var(Event) = $(kzE{kz.json,Event-Package});
$var(Domain) = $(kzE{kz.json,Realm});
$var(Username) = $(kzE{kz.json,Username});
$var(Now) = $TS;
$var(Items) = "";
$var(Query) = $_s(select * from active_watchers_log where presentity_uri = "sip:$var(Username)@$var(Domain)");
if($var(Event) != "") {
$var(Query) = $var(Query) + $_s( and event = "$var(Event)");
}
$var(Query) = $var(Query) + " order by event, watcher_username, callid";
xlog("L_DEBUG", "$ci| STATUS QUERY $var(Query)\n");
if (sql_xquery("cb", "$var(Query)", "ra") == 1) {
while($xavp(ra) != $null) {
$var(Event) = $xavp(ra=>event);
while($xavp(ra) != $null && $var(Event) == $xavp(ra=>event)) {
$var(Sub) = $_s("$var(Username)" : {"$xavp(ra=>event)" : { "$xavp(ra=>watcher_username)" : {"kamailio@MY_HOSTNAME" : {"call_id" : "$xavp(ra=>callid)", "time" : $xavp(ra=>time), "result" : $xavp(ra=>result), "sent" : "$(xavp(ra=>sent_msg){s.escape.common})", "received" : "$(xavp(ra=>received_msg){s.escape.common})", "User-Agent" : "$(xavp(ra=>user_agent){s.escape.common}{s.replace,\','}{s.replace,$$,})"}}}});
xlog("L_DEBUG", "$ci| RESULT \"Subscriptions\" : { $var(Sub) }\n");
$var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "search_partial_resp", "Msg-ID" : "$(kzE{kz.json,Msg-ID})", "Subscriptions" : { $var(Sub) } }';
kazoo_publish("targeted", "$var(Queue)", $var(amqp_payload_request));
pv_unset("$xavp(ra)");
}
}
}
@ -58,5 +93,20 @@ event_route[kazoo:consumer-event-presence-search-req]
}
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab
event_route[kazoo:consumer-event-presence-search-req]
{
switch($(kzE{kz.json,Search-Type})) {
case "summary":
route(PRESENCE_SEARCH_SUMMARY);
break;
case "detail":
route(PRESENCE_SEARCH_DETAIL);
break;
default:
xlog("L_INFO", "$ci|search type '$(kzE{kz.json,Search-Type})' not handled\n");
}
}
# vim: tabstop=4 softtabstop=4 shiftwidth=4 expandtab

+ 2
- 2
kamailio/registrar-role.cfg View File

@ -142,7 +142,7 @@ route[ATTEMPT_AUTHORIZATION]
route[KAZOO_AUTHORIZATION]
{
$var(nonce) = $adn;
$var(amqp_payload_request) = $_s({"Event-Category" : "directory" , "Event-Name" : "authn_req", "Method" : "REGISTER", "Auth-Nonce" : "$adn", "Auth-Realm" : "$fd", "Auth-User" : "$fU", "From" : "$fu", "To" : "$tu", "Orig-IP" : "$si", "Orig-Port" : "$sp", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','})", "Contact" : "$(ct{s.escape.common}{s.replace,\','})", "Call-ID" : "$ci" });
$var(amqp_payload_request) = $_s({"Event-Category" : "directory" , "Event-Name" : "authn_req", "Method" : "REGISTER", "Auth-Nonce" : "$adn", "Auth-Realm" : "$fd", "Auth-User" : "$fU", "From" : "$fu", "To" : "$tu", "Orig-IP" : "$si", "Orig-Port" : "$sp", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci" });
$var(amqp_routing_key) = "authn.req." + $(fd{kz.encode});
$avp(kz_timeout) = 2500;
if (!t_newtran()) {
@ -280,7 +280,7 @@ route[SAVE_LOCATION]
$var(ip) = "[" + $Ri + "]";
}
$var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Status" : "$var(Status)", "Event-Timestamp" : $TS, "Expires" : $(var(expires){s.int}), "First-Registration" : $var(new_reg), "Contact" : "$(ct{s.escape.common}{s.replace,\','})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','})" , "Custom-Channel-Vars" : $xavp(ulattrs=>custom_channel_vars), "Proxy-Path" : "sip:$var(ip)", "RUID" : "$xavp(ulrcd=>ruid)", "Source-IP": "$si", "Source-Port": "$sp" });
$var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Status" : "$var(Status)", "Event-Timestamp" : $TS, "Expires" : $(var(expires){s.int}), "First-Registration" : $var(new_reg), "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" , "Custom-Channel-Vars" : $xavp(ulattrs=>custom_channel_vars), "Proxy-Path" : "sip:$var(ip)", "RUID" : "$xavp(ulrcd=>ruid)", "Source-IP": "$si", "Source-Port": "$sp" });
$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $(fU{kz.encode});
kazoo_publish("registrar", $var(amqp_routing_key), $var(amqp_payload_request));


Loading…
Cancel
Save