@ -76,11 +76,49 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...)
# define REDIS_FMT(x) (int) (x)->len, (x)->str
/ / To protect against a restore race condition : Keyspace notifications are set up
/ / before existing calls are restored ( restore_thread ) . Therefore the following
/ / scenario is possible :
/ / NOTIF THREAD : receives SET , creates call
/ / RESTORE THREAD : executes KEYS *
/ / NOTIF THREAD : receives another SET :
/ / NOTIF THREAD : does call_destroy ( ) , which :
/ / adds ports to late - release list
/ / RESTORE THREAD : comes across call ID , does GET
/ / RESTORE THREAD : creates new call
/ / RESTORE THREAD : wants to allocate ports , but they ' re still in use
/ / NOTIF THREAD : now does release_closed_sockets ( )
static mutex_t redis_ports_release_lock = MUTEX_STATIC_INIT ;
static cond_t redis_ports_release_cond = COND_STATIC_INIT ;
static int redis_ports_release_balance = 0 ; / / negative = releasers , positive = allocators
static int redis_check_conn ( struct redis * r ) ;
static void json_restore_call ( struct redis * r , const str * id , bool foreign ) ;
static int redis_connect ( struct redis * r , int wait ) ;
static int json_build_ssrc ( struct call_monologue * ml , JsonReader * root_reader ) ;
/ / mutually exclusive multi - A multi - B lock
/ / careful with deadlocks against redis - > lock
static void redis_ports_release_push ( bool inc ) {
LOCK ( & redis_ports_release_lock ) ;
if ( inc ) {
while ( redis_ports_release_balance < 0 )
cond_wait ( & redis_ports_release_cond , & redis_ports_release_lock ) ;
}
else {
while ( redis_ports_release_balance > 0 )
cond_wait ( & redis_ports_release_cond , & redis_ports_release_lock ) ;
}
redis_ports_release_balance + = ( inc ? 1 : - 1 ) ;
}
static void redis_ports_release_pop ( bool inc ) {
LOCK ( & redis_ports_release_lock ) ;
redis_ports_release_balance - = ( inc ? 1 : - 1 ) ;
if ( redis_ports_release_balance = = 0 )
cond_broadcast ( & redis_ports_release_cond ) ;
}
static void redis_pipe ( struct redis * r , const char * fmt , . . . ) {
va_list ap ;
@ -375,8 +413,11 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
rwlock_unlock_w ( & c - > master_lock ) ;
if ( IS_FOREIGN_CALL ( c ) ) {
c - > redis_hosted_db = rtpe_redis_write - > db ; / / don ' t delete from foreign DB
/ / redis_notify - > lock is held
redis_ports_release_push ( true ) ;
call_destroy ( c ) ;
release_closed_sockets ( ) ;
redis_ports_release_pop ( true ) ;
}
else {
rlog ( LOG_WARN , " Redis-Notifier: Ignoring SET received for OWN call: " STR_FORMAT " \n " , STR_FMT ( & callid ) ) ;
@ -404,7 +445,11 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
rlog ( LOG_WARN , " Redis-Notifier: Ignoring DEL received for an OWN call: " STR_FORMAT " \n " , STR_FMT ( & callid ) ) ;
goto err ;
}
/ / redis_notify - > lock is held
redis_ports_release_push ( true ) ;
call_destroy ( c ) ;
release_closed_sockets ( ) ;
redis_ports_release_pop ( true ) ;
}
err :
@ -1898,6 +1943,9 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
rr_jsonStr = redis_get ( r , REDIS_REPLY_STRING , " GET " PB , STR ( callid ) ) ;
mutex_unlock ( & r - > lock ) ;
bool must_release_pop = true ;
redis_ports_release_push ( false ) ;
err = " could not retrieve JSON data from redis " ;
if ( ! rr_jsonStr )
goto err1 ;
@ -1911,6 +1959,7 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
if ( ! root_reader )
goto err1 ;
c = call_get_or_create ( callid , foreign , false ) ;
err = " failed to create call struct " ;
if ( ! c )
@ -2049,8 +2098,12 @@ err1:
STR_FMT_M ( callid ) ,
err ) ;
mutex_unlock ( & r - > lock ) ;
if ( c )
if ( c )
call_destroy ( c ) ;
release_closed_sockets ( ) ;
if ( must_release_pop ) / / avoid deadlock with redis_notify - > lock below
redis_ports_release_pop ( false ) ;
must_release_pop = false ;
mutex_lock ( & rtpe_redis_write - > lock ) ;
redisCommandNR ( rtpe_redis_write - > ctx , " DEL " PB , STR ( callid ) ) ;
@ -2064,6 +2117,9 @@ err1:
}
if ( c )
obj_put ( c ) ;
release_closed_sockets ( ) ;
if ( must_release_pop )
redis_ports_release_pop ( false ) ;
log_info_reset ( ) ;
}