Browse Source

implement call restore logic - needs testing

git.mgm/mediaproxy-ng/2.0
Richard Fuchs 15 years ago
parent
commit
9b036f8ccf
3 changed files with 113 additions and 17 deletions
  1. +59
    -11
      daemon/call.c
  2. +4
    -0
      daemon/call.h
  3. +50
    -6
      daemon/redis.c

+ 59
- 11
daemon/call.c View File

@ -8,6 +8,8 @@
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <hiredis.h>
#include <stdlib.h>
#include "call.h"
#include "poller.h"
@ -503,7 +505,7 @@ fail:
}
static void get_port_pair(struct peer *p) {
static void get_port_pair(struct peer *p, int wanted_port) {
struct call *c;
struct callmaster *m;
struct streamrelay *a, *b;
@ -516,6 +518,16 @@ static void get_port_pair(struct peer *p) {
assert(a->fd == -1 && b->fd == -1);
if (wanted_port > 0) {
if ((wanted_port & 1))
goto fail;
if (get_port(a, wanted_port))
goto fail;
if (get_port(a, wanted_port + 1))
goto fail;
return;
}
min = (m->port_min > 0 && m->port_min < 0xfff0) ? m->port_min : 1024;
max = (m->port_max > 0 && m->port_max > min && m->port_max < 0xfff0) ? m->port_max : 0;
@ -649,7 +661,7 @@ static void steal_peer(struct peer *p, struct streamrelay *r) {
}
static void callstream_init(struct callstream *s, struct call *ca) {
static void callstream_init(struct callstream *s, struct call *ca, int port1, int port2) {
int i, j;
struct peer *p;
struct streamrelay *r;
@ -680,7 +692,7 @@ static void callstream_init(struct callstream *s, struct call *ca) {
r->last = po->now;
}
get_port_pair(p);
get_port_pair(p, (i == 0) ? port1 : port2);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
@ -720,7 +732,7 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int
if (!opmode) {
DBG("creating new callstream");
cs = malloc(sizeof(*cs));
callstream_init(cs, c);
callstream_init(cs, c, 0, 0);
p = &cs->peers[0];
}
else {
@ -730,12 +742,6 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int
break;
}
cs = l->data;
#if 0
if (cs->peers[1].filled) {
mylog(LOG_WARNING, "[%s] Got LOOKUP, but no incomplete callstreams found", c->callid);
break;
}
#endif
g_queue_delete_link(c->callstreams, l);
p = &cs->peers[1];
}
@ -904,7 +910,7 @@ static struct call *call_get_or_create(const char *callid, struct callmaster *m)
c = g_hash_table_lookup(m->callhash, callid);
if (!c) {
mylog(LOG_NOTICE, "[%s] Creating new call", callid);
mylog(LOG_NOTICE, "[%s] Creating new call", callid); /* XXX will spam syslog on recovery from DB */
c = malloc(sizeof(*c));
ZERO(*c);
c->callmaster = m;
@ -1115,3 +1121,45 @@ void calls_status(struct callmaster *m, struct control_stream *s) {
g_hash_table_foreach(m->callhash, call_status_iterator, s);
}
void call_restore(struct callmaster *m, redisReply **hash, GList *streams) {
struct call *c;
struct callstream *cs;
redisReply *rps[2], *rp;
int i, kernel;
struct peer *p;
c = call_get_or_create(hash[0]->str, m);
c->created = strtoll(hash[1]->str, NULL, 10);
strdupfree(&c->calling_agent, "UNKNOWN(recovered)");
strdupfree(&c->called_agent, "UNKNOWN(recovered)");
for (; streams; streams = streams->next) {
rps[0] = streams->data;
streams = streams->next;
rps[1] = streams->data;
cs = malloc(sizeof(*cs));
callstream_init(cs, c, atoi(rps[0]->element[2]->str), atoi(rps[1]->element[2]->str));
kernel = 0;
for (i = 0; i < 2; i++) {
p = &cs->peers[i];
rp = rps[i];
p->rtps[1].peer.ip = p->rtps[0].peer.ip = inet_addr(rp->element[0]->str);
p->rtps[0].peer.port = atoi(rp->element[1]->str);
p->rtps[1].peer.port = p->rtps[0].peer.port + 1;
strdupfree(&p->tag, rp->element[6]->str);
kernel = atoi(rp->element[3]->str);
p->filled = atoi(rp->element[4]->str);
p->confirmed = atoi(rp->element[5]->str);
}
if (kernel)
kernelize(cs);
}
}

+ 4
- 0
daemon/call.h View File

@ -5,6 +5,8 @@
#include <sys/types.h>
#include <glib.h>
#include <hiredis.h>
#include "ipt_MEDIAPROXY.h"
@ -102,6 +104,8 @@ char *call_delete_udp(const char **, struct callmaster *);
void calls_status(struct callmaster *, struct control_stream *);
void call_restore(struct callmaster *, redisReply **, GList *);
#endif

+ 50
- 6
daemon/redis.c View File

@ -170,8 +170,9 @@ static void redis_delete_uuid(char *uuid, struct callmaster *m) {
int redis_restore(struct callmaster *m) {
struct redis *r = m->redis;
redisReply *rp, *rp2, *rp3;
int i;
redisReply *rp, *rp2, *rp3, *rp4, *rp5, *rp6;
GQueue q = G_QUEUE_INIT;
int i, j, k, l;
rp = redisCommand(r->ctx, "SMEMBERS calls");
if (!rp || rp->type != REDIS_REPLY_ARRAY) {
@ -192,16 +193,59 @@ int redis_restore(struct callmaster *m) {
goto del2;
if (rp3->elements != 2)
goto del2;
if (rp3->element[0]->type != REDIS_REPLY_STRING)
goto del2;
if (rp3->element[1]->type != REDIS_REPLY_STRING)
for (j = 0; j < rp3->elements; j++) {
if (rp3->element[j]->type != REDIS_REPLY_STRING)
goto del2;
}
rp4 = redisCommand(r->ctx, "LRANGE %s-streams 0 -1", rp2->str);
if (!rp4)
goto del2;
if (rp4->type != REDIS_REPLY_ARRAY)
goto del3;
for (j = 0; j < rp4->elements; j++) {
rp5 = rp4->element[j];
if (rp5->type != REDIS_REPLY_STRING)
continue;
for (k = 0; k < 2; k++) {
rp6 = redisCommand(r->ctx, "HMGET %s:%i ip port localport kernel filled confirmed tag", rp5->str, k);
if (!rp6)
goto del4;
if (rp6->type != REDIS_REPLY_ARRAY)
goto del5;
if (rp6->elements != 7)
goto del5;
for (l = 0; l < rp6->elements; l++) {
if (rp6->element[l]->type != REDIS_REPLY_STRING)
goto del5;
}
g_queue_push_tail(&q, rp6);
}
}
call_restore(m, rp3->element, q.head);
if (q.head)
g_list_foreach(q.head, (GFunc) freeReplyObject, NULL);
g_queue_clear(&q);
freeReplyObject(rp4);
freeReplyObject(rp3);
continue;
del5:
freeReplyObject(rp6);
del4:
if (q.head)
g_list_foreach(q.head, (GFunc) freeReplyObject, NULL);
g_queue_clear(&q);
del3:
freeReplyObject(rp4);
del2:
freeReplyObject(rp3);
del:
mylog(LOG_WARNING, "Could not restore call with GUID %s from Redis DB due to incomplete data\n", rp2->str);
redis_delete_uuid(rp2->str, m);
}
@ -245,7 +289,7 @@ void redis_update(struct call *c) {
p = &cs->peers[i];
redisAppendCommand(r->ctx, "DEL %s:%i", uuid, i);
redisAppendCommand(r->ctx, "HMSET %s:%i ip " IPF " port %i localport %i last-rtp %i last-rtcp %i kernel %i filled %i confirmed %i", uuid, i, IPP(p->rtps[0].peer.ip), p->rtps[0].peer.port, p->rtps[0].localport, p->rtps[0].last, p->rtps[1].last, p->kernelized, p->filled, p->confirmed);
redisAppendCommand(r->ctx, "HMSET %s:%i ip " IPF " port %i localport %i kernel %i filled %i confirmed %i tag %s", uuid, i, IPP(p->rtps[0].peer.ip), p->rtps[0].peer.port, p->rtps[0].localport, p->kernelized, p->filled, p->confirmed, p->tag);
redisAppendCommand(r->ctx, "EXPIRE %s:%i 86400", uuid, i);
count += 3;
}


Loading…
Cancel
Save