From 9b036f8ccfd3b7f2574896ce7e236cac1d792729 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 25 May 2011 19:47:13 +0000 Subject: [PATCH] implement call restore logic - needs testing --- daemon/call.c | 70 ++++++++++++++++++++++++++++++++++++++++++-------- daemon/call.h | 4 +++ daemon/redis.c | 56 +++++++++++++++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 17 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index e69370f73..3e21c2fbc 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include "call.h" #include "poller.h" @@ -503,7 +505,7 @@ fail: } -static void get_port_pair(struct peer *p) { +static void get_port_pair(struct peer *p, int wanted_port) { struct call *c; struct callmaster *m; struct streamrelay *a, *b; @@ -516,6 +518,16 @@ static void get_port_pair(struct peer *p) { assert(a->fd == -1 && b->fd == -1); + if (wanted_port > 0) { + if ((wanted_port & 1)) + goto fail; + if (get_port(a, wanted_port)) + goto fail; + if (get_port(a, wanted_port + 1)) + goto fail; + return; + } + min = (m->port_min > 0 && m->port_min < 0xfff0) ? m->port_min : 1024; max = (m->port_max > 0 && m->port_max > min && m->port_max < 0xfff0) ? m->port_max : 0; @@ -649,7 +661,7 @@ static void steal_peer(struct peer *p, struct streamrelay *r) { } -static void callstream_init(struct callstream *s, struct call *ca) { +static void callstream_init(struct callstream *s, struct call *ca, int port1, int port2) { int i, j; struct peer *p; struct streamrelay *r; @@ -680,7 +692,7 @@ static void callstream_init(struct callstream *s, struct call *ca) { r->last = po->now; } - get_port_pair(p); + get_port_pair(p, (i == 0) ? port1 : port2); for (j = 0; j < 2; j++) { r = &p->rtps[j]; @@ -720,7 +732,7 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int if (!opmode) { DBG("creating new callstream"); cs = malloc(sizeof(*cs)); - callstream_init(cs, c); + callstream_init(cs, c, 0, 0); p = &cs->peers[0]; } else { @@ -730,12 +742,6 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int break; } cs = l->data; -#if 0 - if (cs->peers[1].filled) { - mylog(LOG_WARNING, "[%s] Got LOOKUP, but no incomplete callstreams found", c->callid); - break; - } -#endif g_queue_delete_link(c->callstreams, l); p = &cs->peers[1]; } @@ -904,7 +910,7 @@ static struct call *call_get_or_create(const char *callid, struct callmaster *m) c = g_hash_table_lookup(m->callhash, callid); if (!c) { - mylog(LOG_NOTICE, "[%s] Creating new call", callid); + mylog(LOG_NOTICE, "[%s] Creating new call", callid); /* XXX will spam syslog on recovery from DB */ c = malloc(sizeof(*c)); ZERO(*c); c->callmaster = m; @@ -1115,3 +1121,45 @@ void calls_status(struct callmaster *m, struct control_stream *s) { g_hash_table_foreach(m->callhash, call_status_iterator, s); } + + + + +void call_restore(struct callmaster *m, redisReply **hash, GList *streams) { + struct call *c; + struct callstream *cs; + redisReply *rps[2], *rp; + int i, kernel; + struct peer *p; + + c = call_get_or_create(hash[0]->str, m); + c->created = strtoll(hash[1]->str, NULL, 10); + strdupfree(&c->calling_agent, "UNKNOWN(recovered)"); + strdupfree(&c->called_agent, "UNKNOWN(recovered)"); + + for (; streams; streams = streams->next) { + rps[0] = streams->data; + streams = streams->next; + rps[1] = streams->data; + + cs = malloc(sizeof(*cs)); + callstream_init(cs, c, atoi(rps[0]->element[2]->str), atoi(rps[1]->element[2]->str)); + kernel = 0; + + for (i = 0; i < 2; i++) { + p = &cs->peers[i]; + rp = rps[i]; + + p->rtps[1].peer.ip = p->rtps[0].peer.ip = inet_addr(rp->element[0]->str); + p->rtps[0].peer.port = atoi(rp->element[1]->str); + p->rtps[1].peer.port = p->rtps[0].peer.port + 1; + strdupfree(&p->tag, rp->element[6]->str); + kernel = atoi(rp->element[3]->str); + p->filled = atoi(rp->element[4]->str); + p->confirmed = atoi(rp->element[5]->str); + } + + if (kernel) + kernelize(cs); + } +} diff --git a/daemon/call.h b/daemon/call.h index 9118fa0b2..dfe4c3550 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -5,6 +5,8 @@ #include +#include +#include #include "ipt_MEDIAPROXY.h" @@ -102,6 +104,8 @@ char *call_delete_udp(const char **, struct callmaster *); void calls_status(struct callmaster *, struct control_stream *); +void call_restore(struct callmaster *, redisReply **, GList *); + #endif diff --git a/daemon/redis.c b/daemon/redis.c index dd49c9e1a..51bf70def 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -170,8 +170,9 @@ static void redis_delete_uuid(char *uuid, struct callmaster *m) { int redis_restore(struct callmaster *m) { struct redis *r = m->redis; - redisReply *rp, *rp2, *rp3; - int i; + redisReply *rp, *rp2, *rp3, *rp4, *rp5, *rp6; + GQueue q = G_QUEUE_INIT; + int i, j, k, l; rp = redisCommand(r->ctx, "SMEMBERS calls"); if (!rp || rp->type != REDIS_REPLY_ARRAY) { @@ -192,16 +193,59 @@ int redis_restore(struct callmaster *m) { goto del2; if (rp3->elements != 2) goto del2; - if (rp3->element[0]->type != REDIS_REPLY_STRING) - goto del2; - if (rp3->element[1]->type != REDIS_REPLY_STRING) + for (j = 0; j < rp3->elements; j++) { + if (rp3->element[j]->type != REDIS_REPLY_STRING) + goto del2; + } + + rp4 = redisCommand(r->ctx, "LRANGE %s-streams 0 -1", rp2->str); + if (!rp4) goto del2; + if (rp4->type != REDIS_REPLY_ARRAY) + goto del3; + + for (j = 0; j < rp4->elements; j++) { + rp5 = rp4->element[j]; + if (rp5->type != REDIS_REPLY_STRING) + continue; + for (k = 0; k < 2; k++) { + rp6 = redisCommand(r->ctx, "HMGET %s:%i ip port localport kernel filled confirmed tag", rp5->str, k); + if (!rp6) + goto del4; + if (rp6->type != REDIS_REPLY_ARRAY) + goto del5; + if (rp6->elements != 7) + goto del5; + for (l = 0; l < rp6->elements; l++) { + if (rp6->element[l]->type != REDIS_REPLY_STRING) + goto del5; + } + g_queue_push_tail(&q, rp6); + } + } + + call_restore(m, rp3->element, q.head); + + if (q.head) + g_list_foreach(q.head, (GFunc) freeReplyObject, NULL); + g_queue_clear(&q); + freeReplyObject(rp4); + freeReplyObject(rp3); continue; +del5: + freeReplyObject(rp6); +del4: + if (q.head) + g_list_foreach(q.head, (GFunc) freeReplyObject, NULL); + g_queue_clear(&q); +del3: + freeReplyObject(rp4); del2: freeReplyObject(rp3); del: + mylog(LOG_WARNING, "Could not restore call with GUID %s from Redis DB due to incomplete data\n", rp2->str); redis_delete_uuid(rp2->str, m); } @@ -245,7 +289,7 @@ void redis_update(struct call *c) { p = &cs->peers[i]; redisAppendCommand(r->ctx, "DEL %s:%i", uuid, i); - redisAppendCommand(r->ctx, "HMSET %s:%i ip " IPF " port %i localport %i last-rtp %i last-rtcp %i kernel %i filled %i confirmed %i", uuid, i, IPP(p->rtps[0].peer.ip), p->rtps[0].peer.port, p->rtps[0].localport, p->rtps[0].last, p->rtps[1].last, p->kernelized, p->filled, p->confirmed); + redisAppendCommand(r->ctx, "HMSET %s:%i ip " IPF " port %i localport %i kernel %i filled %i confirmed %i tag %s", uuid, i, IPP(p->rtps[0].peer.ip), p->rtps[0].peer.port, p->rtps[0].localport, p->kernelized, p->filled, p->confirmed, p->tag); redisAppendCommand(r->ctx, "EXPIRE %s:%i 86400", uuid, i); count += 3; }