Browse Source

Backport 4.0 (#135)

* Update presence_notify_sync-role.cfg (#131)

* Create active_watchers_log (#132)

* Update default.cfg (#133)
4.0
lazedo 9 years ago
committed by bitbashing
parent
commit
51ec5f737a
3 changed files with 98 additions and 20 deletions
  1. +1
    -0
      kamailio/dbtext/active_watchers_log
  2. +18
    -5
      kamailio/default.cfg
  3. +79
    -15
      kamailio/presence_notify_sync-role.cfg

+ 1
- 0
kamailio/dbtext/active_watchers_log View File

@ -0,0 +1 @@
id(int,auto) presentity_uri(string) watcher_username(string,null) watcher_domain(string) to_user(string,null) to_domain(string) event(string) callid(string) time(int) result(int) sent_msg(blob) received_msg(blob) user_agent(string,null)

+ 18
- 5
kamailio/default.cfg View File

@ -21,6 +21,9 @@ mlock_pages = yes
phone2tel = 1
max_while_loops = MAX_WHILE_LOOPS
pv_buffer_size=65536
mem_join=1
####### Logging Parameters #########
debug = L_INFO
memdbg = 10
@ -94,6 +97,9 @@ loadmodule "path.so"
######## Kamailio control connector module ########
loadmodule "ctl.so"
loadmodule "cfg_rpc.so"
loadmodule "cfgutils.so"
loadmodule "corex.so"
######## Kamailio uuid module ########
loadmodule "uuid.so"
@ -166,11 +172,11 @@ modparam("db_text", "emptystring", 1)
loadmodule "kazoo.so"
modparam("kazoo", "node_hostname", "MY_HOSTNAME")
modparam("kazoo", "amqp_connection", "MY_AMQP_URL")
#!ifdef MY_AMQP_URL_SECONDARY
modparam("kazoo", "amqp_connection", "MY_AMQP_URL_SECONDARY")
#!ifdef MY_AMQP_SECONDARY_URL
modparam("kazoo", "amqp_connection", "MY_AMQP_SECONDARY_URL")
#!endif
#!ifdef MY_AMQP_URL_TERTIARY
modparam("kazoo", "amqp_connection", "MY_AMQP_URL_TERTIARY")
#!ifdef MY_AMQP_TERTIARY_URL
modparam("kazoo", "amqp_connection", "MY_AMQP_TERTIARY_URL")
#!endif
#!ifdef MY_AMQP_MAX_CHANNELS
modparam("kazoo", "amqp_max_channels", MY_AMQP_MAX_CHANNELS)
@ -863,6 +869,13 @@ event_route[kazoo:mod-init]
#!endif
#!ifdef PRESENCE_BROADCAST_ROLE
$var(payload) = "{ 'exchange' : 'omnipresence' , 'queue' : 'PRESENCE-BROADCAST-MY_HOSTNAME', 'exclusive' : 0 ,'type' : 'topic', 'routing' : 'presence.update' }";
kazoo_subscribe("$var(payload)");
#!endif
}
event_route[kazoo:consumer-event]
@ -917,7 +930,7 @@ route[ADD_AUTHORIZATION_HEADERS]
}
}
#!ifdef PRESENCE_NOTIFY_SYNC_ROLE
#!ifdef PRESENCE_NOTIFY_INIT
event_route[tm:local-request]
{
route(PRESENCE_LOCAL_NOTIFY);


+ 79
- 15
kamailio/presence_notify_sync-role.cfg View File

@ -1,29 +1,93 @@
######## Presence notify sync ########
## send notify sync back to kazoo
kazoo.presence_notify = 1 descr "enable/disable sending notify callback to omnipresence"
kazoo.presence_notify_timeout = 200 descr "timeout in ms waiting for notify reply"
kazoo.presence_notify_log_body = 0 descr "logs the body sent in the notification"
kazoo.presence_notify_log_resp_body = 0 descr "logs the body received from notification"
kazoo.presence_notify_log_to_table = 0 descr "logs notify/reply to active_watchers_log table"
kazoo.presence_notify_log_to_amqp = 1 descr "logs notify/reply to amqp"
#trydef PRESENCE_NOTIFY_INIT
route[PRESENCE_LOCAL_NOTIFY]
{
if($rm != "NOTIFY") {
return;
}
$avp(notify_body) = $rb;
$avp(notify_event) = $hdr(Event);
t_on_reply("PRESENCE_NOTIFY_REPLY");
t_on_failure("PRESENCE_NOTIFY_FAULT");
t_set_fr(@cfg_get.kazoo.presence_notify_timeout, @cfg_get.kazoo.presence_notify_timeout);
xlog("L_INFO", "$ci|log|init preparing notify to $du : $ruri");
}
onreply_route[PRESENCE_NOTIFY_REPLY]
####### SQL OPS module ##########
#!ifndef SQLOPS_LOADED
loadmodule "sqlops.so"
modparam("sqlops","sqlcon", "cb=>KAZOO_DB_URL")
#!trydef SQLOPS_LOADED
#!endif
####### MQUEUE module ##########
#!ifndef MQUEUE_LOADED
loadmodule "mqueue.so"
#!trydef MQUEUE_LOADED
#!endif
modparam("mqueue","mqueue", "name=presence_last_notity")
#!ifndef TIMER_LOADED
loadmodule "timer.so"
#!trydef TIMER_LOADED
#!endif
modparam("timer", "declare_timer", "PRESENCE_LOG_TIMER=PRESENCE_LOG_TIMER_ROUTE,500,fast,enable");
event_route[presence:notify-reply]
{
$var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$avp(notify_event)", "Call-ID" : "$ci", "From" : "$fu", "To" : "$tu", "Body" : "$(avp(notify_body){s.escape.common})", "Sequence" : $cs, "Reply" : $T_reply_code }';
$var(rk) = "notify." + $(td{kz.encode}) + "." + $(tU{kz.encode});
kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request));
xlog("L_INFO", "$ci|log|sent notify callback for event $avp(notify_event) : $tu\n");
if(@cfg_get.kazoo.presence_notify != 1)
return;
$xavp(pres=>delete_subscription) = 0;
if($notify_reply($rs) == 200) {
xlog("L_INFO", "$ci|end|notified $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)");
} else if($notify_reply($rs) == 481 && $subs(reason) == "timeout") {
xlog("L_INFO","$ci|end|sent subscription $hdr(Subscription-State)");
} else {
if($rP != "UDP")
$xavp(pres=>delete_subscription) = 1;
xlog("L_ERROR", "$ci|error|received $notify_reply($rs) $subs(reason) when notifying $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)");
}
if(@cfg_get.kazoo.presence_notify_log_body == 1)
xlog("L_INFO", "$ci|log|sent|body #012$(mb{s.escape.common}{s.replace,\','})#012");
if(@cfg_get.kazoo.presence_notify_log_resp_body == 1)
xlog("L_INFO", "$ci|log|resp|body #012$(notify_reply($mb){s.escape.common}{s.replace,\','})#012");
if(@cfg_get.kazoo.presence_notify_log_to_amqp == 1) {
route(PRESENCE_NOTIFY_AMQP);
}
if(@cfg_get.kazoo.presence_notify_log_to_table == 1) {
$var(Query) = $_s(REPLACE active_watchers_log SET sent_msg="$(mb{s.escape.common}{s.replace,\','})", received_msg="$(notify_reply($mb){s.escape.common}{s.replace,\','})", time=$TS, result=$notify_reply($rs), user_agent="$(subs(user_agent){s.escape.common})", callid="$subs(callid)", to_user="$subs(to_user)", to_domain="$subs(to_domain)" where presentity_uri="$subs(uri)" watcher_username="$subs(watcher_username)" and watcher_domain="$subs(watcher_domain)" event="$subs(event)");
mq_add("presence_last_notity", "$subs(watcher_username)@$subs(watcher_domain)", "$var(Query)");
}
}
route[PRESENCE_LOG_TIMER_ROUTE]
{
$var(runloop) = 1;
while(mq_fetch("presence_last_notity") == 1 && $var(runloop) < MAX_WHILE_LOOPS) {
# xlog("L_INFO", "Query : $mqv(presence_last_notity)");
if (sql_query("cb", "$mqv(presence_last_notity)") != 1) {
xlog("L_ERROR", "$ci|log|error updating active_watchers_log");
}
$var(runloop) = $var(runloop) + 1;
}
}
onreply_route[PRESENCE_NOTIFY_FAULT]
route[PRESENCE_NOTIFY_AMQP]
{
$var(amqp_payload_request) = '{"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$avp(notify_event)", "Call-ID" : "$ci", "From" : "$fu", "To" : "$tu", "Body" : "$(avp(notify_body){s.escape.common})", "Sequence" : $cs, "Reply" : $T_reply_code }';
$var(rk) = "notify." + $(td{kz.encode}) + "." + $(tU{kz.encode});
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$subs(event)", "Timestamp" : $TS, "Call-ID" : "$subs(callid)", "From" : "$fu", "To" : "$subs(to_user)@$subs(to_domain)", "Sent" : "$(TS{s.ftime,%Y-%m-%d %H:%M:%S})", "Body" : "Hostname : MY_HOSTNAME\r\nTimestamp : $(TS{s.ftime,%Y-%m-%d %H:%M:%S})\r\n$(mb{s.escape.common}{s.replace,\','})\r\nResponse\r\n$(notify_reply($mb){s.escape.common}{s.replace,\','})", "Sequence" : $cs, "Reply" : $notify_reply($rs) });
$var(rk) = "notify." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode});
kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request));
xlog("L_INFO", "$ci|log|sent notify callback error $T_reply_code for event $avp(notify_event) : $tu\n");
xlog("L_INFO", "$ci|log|sent notify callback for event $subs(event) : $tu");
}

Loading…
Cancel
Save