You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

194 lines
9.5 KiB

kazoo.presence_notify = 1 descr "enable/disable sending notify callback to omnipresence"
kazoo.presence_notify_timeout = 3500 descr "timeout in ms waiting for notify reply"
kazoo.presence_notify_log_body = 0 descr "logs the body sent in the notification"
kazoo.presence_notify_log_resp_body = 0 descr "logs the body received from notification"
kazoo.presence_notify_log_to_table = 1 descr "logs notify/reply to active_watchers_log table"
kazoo.presence_notify_log_to_amqp = 0 descr "logs notify/reply to amqp"
kazoo.presence_notify_record_route = 1 descr "add record route header to notify msg sent"
kazoo.presence_notify_log_init_body = 0 descr "logs the body before its sent"
kazoo.presence_notify_force_send_socket = 1 descr "forces the send socket to the contact"
######## Generic Hash Table container in shared memory ########
modparam("htable", "htable", "notify=>size=16;autoexpire=3600;updateexpire=1;initval=0")
#!trydef PRESENCE_NOTIFY_INIT
#!trydef MAX_NOTIFY_ERROR 5
route[PRESENCE_LOCAL_NOTIFY]
{
if($rm != "NOTIFY") {
return;
}
t_set_fr($sel(cfg_get.kazoo.presence_notify_timeout), $sel(cfg_get.kazoo.presence_notify_timeout));
xlog("L_DEBUG", "$ci|log|init preparing $subs(event) notify to $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri) : $du\n");
if($sel(cfg_get.kazoo.presence_notify_log_init_body) == 1) {
xlog("L_INFO", "$ci|log|init|body $(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\n");
}
if($sel(cfg_get.kazoo.presence_notify_force_send_socket) == 1) {
$fs = $_s($(pr{s.tolower}):$(hdr(Contact){nameaddr.uri}{uri.host}):$(hdr(Contact){nameaddr.uri}{uri.port}));
xlog("L_DEBUG", "$ci|log|init|forcing socket to $fs, $(pr{s.tolower}):$(hdr(Contact){nameaddr.uri}{uri.host}):$(hdr(Contact){nameaddr.uri}{uri.port}) , $ct\n");
}
if($sel(cfg_get.kazoo.presence_notify_record_route) == 1) {
record_route();
}
#!ifdef NAT_TRAVERSAL_ROLE
if(!isdsturiset()) {
handle_ruri_alias();
}
#!endif
}
modparam("mqueue","mqueue", "name=presence_last_notity")
modparam("rtimer", "timer", "name=notifytimer;interval=500000u;mode=1;")
modparam("rtimer", "exec", "timer=notifytimer;route=PRESENCE_LOG_TIMER_ROUTE")
modparam("rtimer", "timer", "name=pres_cleanup;interval=10;mode=1;")
modparam("rtimer", "exec", "timer=pres_cleanup;route=PRESENCE_CLEANUP")
modparam("rtimer", "timer", "name=pres_publisher_cleanup;interval=5;mode=1;")
modparam("rtimer", "exec", "timer=pres_publisher_cleanup;route=PRESENCE_PUBLISHER_CLEANUP")
event_route[presence:notify-reply]
{
if($sel(cfg_get.kazoo.presence_notify) != 1)
return;
$xavp(pres=>delete_subscription) = 0;
if($subs(reason) == "timeout") {
$xavp(pres=>delete_subscription) = 1;
xlog("L_INFO", "$ci|end|deleting subscription $subs(pres_uri) for $subs(watcher_username)@$subs(watcher_domain) due to timeout\n");
} else if($notify_reply($rs) == 200) {
$sht(notify=>$ci) = $null;
$sht(notify=>$ci::count) = 0;
xlog("L_INFO", "$ci|end|notified $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri)\n");
} else if($notify_reply($rs) == 481 && $subs(reason) == "timeout") {
xlog("L_INFO","$ci|end|sent subscription $hdr(Subscription-State)\n");
} else if($notify_reply($rs) == 481) {
$xavp(pres=>delete_subscription) = 1;
xlog("L_INFO", "$ci|end|deleting subscription $subs(pres_uri) as $subs(watcher_username)@$subs(watcher_domain) replied with 481 (non existent)\n");
} else {
if($rP != "UDP") {
$xavp(pres=>delete_subscription) = 1;
xlog("L_ERROR", "$ci|error|removing $rP watcher $subs(watcher_username)@$subs(watcher_domain) for $subs(pres_uri) with reply $notify_reply($rs)\n");
} else {
$var(shtinc) = $shtinc(notify=>$ci::count);
if($var(shtinc) > MAX_NOTIFY_ERROR) {
$xavp(pres=>delete_subscription) = 1;
xlog("L_ERROR", "$ci|error|removing $rP watcher $subs(watcher_username)@$subs(watcher_domain) for $subs(pres_uri) with reply $notify_reply($rs)\n");
} else {
$var(level) = 4 - $var(shtinc);
xlog("$var(level)", "$ci|error|received $notify_reply($rs) ($var(shtinc)/$def(MAX_NOTIFY_ERROR)) when notifying $subs(watcher_username)@$subs(watcher_domain) on behalf of $subs(pres_uri) with reply $notify_reply($rs)\n");
}
}
}
if($sel(cfg_get.kazoo.presence_notify_log_body) == 1)
xlog("L_INFO", "$ci|log|sent|body $(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\n");
if($sel(cfg_get.kazoo.presence_notify_log_resp_body) == 1)
xlog("L_INFO", "$ci|log|resp|body $(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})\n");
if($sel(cfg_get.kazoo.presence_notify_log_to_amqp) == 1) {
route(PRESENCE_NOTIFY_AMQP);
}
if($sel(cfg_get.kazoo.presence_notify_log_to_table) == 1) {
if($xavp(pres=>delete_subscription) != 1 && $subs(reason) != "timeout") {
$var(Query) = $_s(KZQ_REPLACE_WATCHERS_LOG);
mq_add("presence_last_notity", "$subs(callid)", "$var(Query)");
}
}
}
route[PRESENCE_LOG_TIMER_ROUTE]
{
$var(runloop) = 1;
while(mq_fetch("presence_last_notity") == 1 && $var(runloop) < MAX_WHILE_LOOPS) {
$var(ci) = $mqk(presence_last_notity);
xlog("L_DEBUG", "Query : $mqv(presence_last_notity)\n");
$var(sqlres) = sql_query("cb", "$mqv(presence_last_notity)");
xlog("L_DEBUG", "Query result : $var(sqlres)\n");
if($var(sqlres) < 0) {
xlog("L_ERROR", "$var(ci)|log|error running query : $mqv(presence_last_notity)\n");
} else {
$var(nrows) = $sqlrows(cb);
xlog("L_DEBUG", "$var(ci)|log|end UPDATED $var(nrows)\n");
if($var(nrows) == 0) {
xlog("L_DEBUG", "$var(ci)|log|error no rows affected when running query\n");
}
}
$var(runloop) = $var(runloop) + 1;
}
}
route[PRESENCE_NOTIFY_AMQP]
{
$var(amqp_payload_request) = $_s({"Event-Category" : "presence", "Event-Name" : "notify", "Event-Package" : "$subs(event)", "Timestamp" : $TS, "Call-ID" : "$subs(callid)", "From" : "$fu", "To" : "$subs(to_user)@$subs(to_domain)", "Sent" : "$(TS{s.ftime,%Y-%m-%d %H:%M:%S})", "Body" : "Hostname : MY_HOSTNAME\r\nTimestamp : $(TS{s.ftime,%Y-%m-%d %H:%M:%S})\r\n$(mb{s.escape.common}{s.replace,\','}{s.replace,$$,})\r\nResponse\r\n$(notify_reply($mb){s.escape.common}{s.replace,\','}{s.replace,$$,})","Remote-CSeq" : $subs(remote_cseq), "Local-CSeq" : $subs(local_cseq), "Sequence" : $cs, "Version" : $subs(version), "Reply" : $notify_reply($rs) });
$var(rk) = "notify." + $(subs(to_domain){kz.encode}) + "." + $(subs(to_user){kz.encode});
kazoo_publish("omnipresence", "$var(rk)", $var(amqp_payload_request));
xlog("L_INFO", "$ci|log|sent notify callback for event $subs(event) : $tu\n");
}
route[PRESENCE_CLEANUP]
{
$var(Query) = $_s(DELETE FROM active_watchers WHERE expires > 0 AND datetime(expires, 'unixepoch') < datetime('now', '-90 seconds'););
mq_add("presence_last_notity", "$uuid(g)", "$var(Query)");
$var(Query) = $_s(DELETE FROM PRESENTITY WHERE expires > 0 AND datetime(expires, 'unixepoch') < datetime('now'););
mq_add("presence_last_notity", "$uuid(g)", "$var(Query)");
$var(Query) = $_s(DELETE FROM PRESENTITY WHERE ID IN(select id from presentities where event = "dialog" and state = "terminated" and received < datetime('now', '-5 minutes')););
mq_add("presence_last_notity", "$uuid(g)", "$var(Query)");
$var(Query) = $_s(DELETE FROM ACTIVE_WATCHERS_LOG WHERE ID IN(select id from active_watchers_log a where not exists(select callid from active_watchers b where b.callid = a.callid and b.watcher_username = a.watcher_username and b.watcher_domain = a.watcher_domain)););
mq_add("presence_last_notity", "$uuid(g)", "$var(Query)");
}
route[PRESENCE_PUBLISHER_CLEANUP]
{
xlog("L_DEBUG", "processing presence publisher cleanup\n");
$var(sqlres) = sql_query("cb", "update tmp_probe set action = 1 where action = 0");
if($var(sqlres) < 0) {
xlog("L_ERROR", "$var(ci)|log|error cleaning tmp_probe\n");
return;
} else {
$var(nrows) = $sqlrows(cb);
if($var(nrows) > 0) {
if (sql_xquery("cb", "select * from tmp_probe where action = 1", "cleanup_pres") == 1)
{
while($xavp(cleanup_pres) != $null) {
xlog("L_DEBUG", "processing $xavp(cleanup_pres=>event) notifies for $xavp(cleanup_pres=>presentity_uri)\n");
pres_refresh_watchers("$xavp(cleanup_pres=>presentity_uri)", "$xavp(cleanup_pres=>event)", 1);
pv_unset("$xavp(cleanup_pres)");
}
}
$var(sqlres) = sql_query("cb", "delete from tmp_probe where action = 1");
if($var(sqlres) < 0) {
xlog("L_ERROR", "$var(ci)|log|error cleaning tmp_probe\n");
} else {
$var(nrows) = $sqlrows(cb);
if($var(nrows) > 0) {
xlog("L_DEBUG", "presence publisher cleanup processed $var(nrows) rows\n");
}
}
}
}
}
route[PRESENCE_DEFERRED_INIT]
{
xlog("L_INFO", "processing presence deferred init\n");
$var(sqlres) = sql_query("cb", "update tmp_probe set action = 0 where action = 2");
if($var(sqlres) < 0) {
xlog("L_ERROR", "$var(ci)|log|error cleaning tmp_probe\n");
return;
} else {
$var(nrows) = $sqlrows(cb);
if($var(nrows) > 0) {
xlog("L_NOTICE", "scheduled update for $var(nrows) watched presentities/event\n");
}
}
}