From 382f8521d0926fc10eadb18611c959469500f521 Mon Sep 17 00:00:00 2001 From: lazedo Date: Tue, 12 Mar 2019 22:13:50 +0000 Subject: [PATCH] add a lock for db ops since we have several workers --- kamailio/keepalive.cfg | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/kamailio/keepalive.cfg b/kamailio/keepalive.cfg index faf52a2..159b3fb 100644 --- a/kamailio/keepalive.cfg +++ b/kamailio/keepalive.cfg @@ -17,7 +17,7 @@ kazoo.keepalive_timeout = KEEPALIVE_TIMEOUT descr "timeout in ms for keepalive t modparam("rtimer", "timer", "name=keepalive_timer;interval=1;mode=KEEPALIVE_S_TIMERS;") modparam("rtimer", "exec", "timer=keepalive_timer;route=KEEPALIVE_TIMER") -modparam("rtimer", "timer", "name=keepalive_db_timer;interval=250000u;mode=1;") +modparam("rtimer", "timer", "name=keepalive_db_timer;interval=1;mode=1;") modparam("rtimer", "exec", "timer=keepalive_db_timer;route=KEEPALIVE_DB_TIMER") modparam("mqueue","mqueue", "name=keepalive_db_queue") @@ -28,6 +28,7 @@ route[KEEPALIVE_DB_TIMER] while(mq_fetch("keepalive_db_queue") == 1 && $var(runloop) < MAX_WHILE_LOOPS) { $var(ci) = $mqk(keepalive_db_queue); xlog("L_DEBUG", "Query : $mqv(keepalive_db_queue)\n"); + lock("keepalive"); $var(sqlres) = sql_query("cb", "$mqv(keepalive_db_queue)"); xlog("L_DEBUG", "Query result : $var(sqlres)\n"); if($var(sqlres) < 0) { @@ -39,24 +40,28 @@ route[KEEPALIVE_DB_TIMER] xlog("L_DEBUG", "$var(ci)|log|error no rows affected when running query\n"); } } + unlock("keepalive"); $var(runloop) = $var(runloop) + 1; } } route[KEEPALIVE_CLEANUP] { + lock("keepalive"); $var(Query) = $_s(update location set expires = last_modified where id in(select a.id from location a inner join (select cast(substr(contact, 1, instr(contact,";")-1) as varchar(32)) contact from keepalive where slot = $rtimer_worker AND failed > $def(KEEPALIVE_FAILED_THRESHOLD)) b on substr(a.contact, 1, instr(a.contact,";")-1) = b.contact)); sql_query("cb", "$var(Query)"); $var(Query) = $_s(DELETE FROM active_watchers where id in(select a.id from keepalive a inner join active_watchers b on a.contact=b.contact where slot = $rtimer_worker and failed > $def(KEEPALIVE_FAILED_THRESHOLD))); sql_query("cb", "$var(Query)"); $var(Query) = $_s(DELETE FROM keepalive where slot = $rtimer_worker and failed > $def(KEEPALIVE_FAILED_THRESHOLD)); sql_query("cb", "$var(Query)"); + unlock("keepalive"); } route[KEEPALIVE_TIMER] { route(KEEPALIVE_CLEANUP); + lock("keepalive"); $var(Query) = $_s(UPDATE keepalive SET selected = 1 WHERE slot = $rtimer_worker AND selected = 0 AND time_sent < datetime('now', '-$def(KEEPALIVE_INTERVAL) seconds')); $var(sqlres) = sql_query("cb", "$var(Query)"); if($var(sqlres) < 0) { @@ -65,7 +70,9 @@ route[KEEPALIVE_TIMER] $var(nrows) = $sqlrows(cb); xlog("L_DEBUG", "$rtimer_worker|log|selected $var(nrows) endpoints to ping\n"); } + unlock("keepalive"); + lock("keepalive"); $var(Query) = $_s(SELECT id, contact, sockinfo from keepalive WHERE slot = $rtimer_worker AND selected = 1); xlog("L_DEBUG", "$rtimer_worker|timer|SQL => $var(Query)\n"); if (sql_xquery("cb", "$var(Query)", "ra") == 1) @@ -79,12 +86,16 @@ route[KEEPALIVE_TIMER] } } } + unlock("keepalive"); + lock("keepalive"); $var(Query) = $_s(UPDATE keepalive SET selected = 2 WHERE slot = $rtimer_worker AND selected = 1); $var(sqlres) = sql_query("cb", "$var(Query)"); if($var(sqlres) < 0) { xlog("L_ERROR", "$rtimer_worker|log|error running query : $var(Query)\n"); } + unlock("keepalive"); + }