Browse Source

TT#119502 correctly restore calls from both Redis instances

Change-Id: I713d7e8ba0a7d14f5ef9016d33619df91ce6ec32
pull/1252/head
Richard Fuchs 5 years ago
parent
commit
a8d5076065
7 changed files with 68 additions and 21 deletions
  1. +9
    -4
      daemon/call.c
  2. +8
    -0
      daemon/cli.c
  3. +13
    -2
      daemon/main.c
  4. +2
    -0
      daemon/media_socket.c
  5. +34
    -14
      daemon/redis.c
  6. +1
    -0
      include/call.h
  7. +1
    -1
      include/redis.h

+ 9
- 4
daemon/call.c View File

@ -174,6 +174,10 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
if (!c->streams.head)
goto drop;
// ignore media timeout if call was recently taken over
if (c->foreign_media && rtpe_now.tv_sec - c->last_signal <= rtpe_config.timeout)
goto out;
for (it = c->streams.head; it; it = it->next) {
ps = it->data;
@ -644,6 +648,9 @@ static void call_timer(void *ptr) {
update = 0;
if (diff_packets)
sfd->call->foreign_media = 0;
sink = packet_stream_sink(ps);
if (!ke->target.non_forwarding && diff_packets) {
@ -2119,7 +2126,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
monologue = other_ml->active_dialogue;
call = monologue->call;
call->last_signal = rtpe_now.tv_sec;
call->last_signal = MAX(call->last_signal, rtpe_now.tv_sec);
call->deleted = 0;
__C_DBG("this="STR_FORMAT" other="STR_FORMAT, STR_FMT(&monologue->tag), STR_FMT(&other_ml->tag));
@ -2524,9 +2531,7 @@ void call_destroy(struct call *c) {
statistics_update_ip46_inc_dec(c, CMC_DECREMENT);
statistics_update_foreignown_dec(c);
if (IS_OWN_CALL(c)) {
redis_delete(c, rtpe_redis_write);
}
redis_delete(c, rtpe_redis_write);
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */


+ 8
- 0
daemon/cli.c View File

@ -1080,7 +1080,15 @@ static void cli_incoming_active_standby(struct cli_writer *cw, int foreign) {
g_hash_table_iter_init(&iter, rtpe_callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) {
struct call *c = value;
rwlock_lock_w(&c->master_lock);
call_make_own_foreign(c, foreign);
c->last_signal = MAX(c->last_signal, rtpe_now.tv_sec);
if (!foreign) {
c->foreign_media = 1; // ignore timeout until we have media
c->last_signal++; // we are authoritative now
}
rwlock_unlock_w(&c->master_lock);
redis_update_onekey(c, rtpe_redis_write);
}
rwlock_unlock_r(&rtpe_callhash_lock);


+ 13
- 2
daemon/main.c View File

@ -1050,8 +1050,19 @@ no_kernel:
gettimeofday(&redis_start, NULL);
// restore
if (redis_restore(rtpe_redis))
die("Refusing to continue without working Redis database");
if (rtpe_redis_notify) {
// active-active mode: the main DB has our own calls, while
// the "notify" DB has the "foreign" calls. "foreign" DB goes
// first as the "owned" DB can do a stray update back to Redis
if (redis_restore(rtpe_redis_notify, 1))
ilog(LOG_WARN, "Unable to restore calls from the active-active peer");
if (redis_restore(rtpe_redis_write, 0))
die("Refusing to continue without working Redis database");
}
else {
if (redis_restore(rtpe_redis, 0))
die("Refusing to continue without working Redis database");
}
// stop redis restore timer
gettimeofday(&redis_stop, NULL);


+ 2
- 0
daemon/media_socket.c View File

@ -2008,6 +2008,8 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (!phc->mp.stream->selected_sfd)
goto out;
phc->mp.call->foreign_media = 0;
if (phc->mp.call->drop_traffic) {
goto drop;
}


+ 34
- 14
daemon/redis.c View File

@ -438,7 +438,7 @@ void redis_notify_async_context_disconnect(const redisAsyncContext *redis_notify
redis_notify_async_context->err);
}
} else if (status == REDIS_OK) {
rlog(LOG_ERROR, "redis_notify_async_context_disconnect initiated by user");
rlog(LOG_NOTICE, "redis_notify_async_context_disconnect initiated by user");
} else {
rlog(LOG_ERROR, "redis_notify_async_context_disconnect invalid status code %d", status);
}
@ -1719,6 +1719,7 @@ static void json_restore_call(struct redis *r, const str *callid, int foreign) {
struct redis_list tags, sfds, streams, medias, maps;
struct call *c = NULL;
str s, id, meta;
time_t last_signal;
const char *err = 0;
int i;
@ -1744,13 +1745,25 @@ static void json_restore_call(struct redis *r, const str *callid, int foreign) {
if (!c)
goto err1;
err = "call already exists";
if (c->last_signal)
goto err2;
err = "'call' data incomplete";
if (json_get_hash(&call, "json", -1, root_reader))
goto err2;
err = "missing 'last signal' timestamp";
if (redis_hash_get_time_t(&last_signal, &call, "last_signal"))
goto err3;
if (c->last_signal) {
err = NULL;
// is the call we're loading newer than the one we have?
if (last_signal > c->last_signal) {
// switch ownership
call_make_own_foreign(c, foreign);
c->last_signal = last_signal;
}
goto err3; // no error, just bail
}
err = "'tags' incomplete";
if (json_get_list_hash(&tags, "tag", &call, "num_tags", root_reader))
goto err3;
@ -1770,9 +1783,7 @@ static void json_restore_call(struct redis *r, const str *callid, int foreign) {
err = "missing 'created' timestamp";
if (redis_hash_get_timeval(&c->created, &call, "created"))
goto err8;
err = "missing 'last signal' timestamp";
if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal"))
goto err8;
c->last_signal = last_signal;
if (redis_hash_get_int(&i, &call, "tos"))
c->tos = 184;
else
@ -1863,10 +1874,15 @@ err1:
err);
if (c)
call_destroy(c);
else {
mutex_lock(&rtpe_redis_write->lock);
redisCommandNR(rtpe_redis_write->ctx, "DEL " PB, STR(callid));
mutex_unlock(&rtpe_redis_write->lock);
mutex_lock(&rtpe_redis_write->lock);
redisCommandNR(rtpe_redis_write->ctx, "DEL " PB, STR(callid));
mutex_unlock(&rtpe_redis_write->lock);
if (rtpe_redis_notify) {
mutex_lock(&rtpe_redis_notify->lock);
redisCommandNR(rtpe_redis_notify->ctx, "DEL " PB, STR(callid));
mutex_unlock(&rtpe_redis_notify->lock);
}
}
if (c)
@ -1876,6 +1892,7 @@ err1:
struct thread_ctx {
GQueue r_q;
mutex_t r_m;
int foreign;
};
static void restore_thread(void *call_p, void *ctx_p) {
@ -1891,14 +1908,14 @@ static void restore_thread(void *call_p, void *ctx_p) {
r = g_queue_pop_head(&ctx->r_q);
mutex_unlock(&ctx->r_m);
json_restore_call(r, &callid, 0);
json_restore_call(r, &callid, ctx->foreign);
mutex_lock(&ctx->r_m);
g_queue_push_tail(&ctx->r_q, r);
mutex_unlock(&ctx->r_m);
}
int redis_restore(struct redis *r) {
int redis_restore(struct redis *r, int foreign) {
redisReply *calls = NULL, *call;
int i, ret = -1;
GThreadPool *gtp;
@ -1931,6 +1948,7 @@ int redis_restore(struct redis *r) {
mutex_init(&ctx.r_m);
g_queue_init(&ctx.r_q);
ctx.foreign = foreign;
for (i = 0; i < rtpe_config.redis_num_threads; i++)
g_queue_push_tail(&ctx.r_q,
redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required));
@ -2401,6 +2419,8 @@ void redis_update_onekey(struct call *c, struct redis *r) {
if (!r)
return;
if (c->foreign_call)
return;
mutex_lock(&r->lock);
// coverity[sleep : FALSE]


+ 1
- 0
include/call.h View File

@ -435,6 +435,7 @@ struct call {
unsigned int rec_forwarding:1;
unsigned int drop_traffic:1;
unsigned int foreign_call:1; // created_via_redis_notify call
unsigned int foreign_media:1; // for calls taken over, tracks whether we have media
unsigned int disable_jb:1;
unsigned int debug:1;
};


+ 1
- 1
include/redis.h View File

@ -107,7 +107,7 @@ void redis_delete_async_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int);
void redis_close(struct redis *r);
int redis_restore(struct redis *);
int redis_restore(struct redis *, int foreign);
void redis_update(struct call *, struct redis *);
void redis_update_onekey(struct call *c, struct redis *r);
void redis_delete(struct call *, struct redis *);


Loading…
Cancel
Save