diff --git a/daemon/Makefile b/daemon/Makefile index 6f0d30afa..766d76049 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -7,7 +7,7 @@ CFLAGS+= -D_GNU_SOURCE #CFLAGS+= -O2 CFLAGS+= -I$(LIBREDISDIR) LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs` -LDFLAGS+= -L$(LIBREDISDIR) -lhiredis +LDFLAGS+= -L$(LIBREDISDIR) -lhiredis -luuid SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c control_udp.c redis.c diff --git a/daemon/aux.h b/daemon/aux.h index 3c104ebfd..c64b803cd 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -11,6 +11,7 @@ #include #include #include +#include @@ -44,5 +45,12 @@ void g_string_vprintf(GString *string, const gchar *format, va_list args); #endif +static inline void uuid_str_generate(char *s) { + uuid_t uuid; + uuid_generate(uuid); + uuid_unparse(uuid, s); +} + + #endif diff --git a/daemon/call.c b/daemon/call.c index 7f003bc0b..56c5b89b6 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -16,6 +16,7 @@ #include "kernel.h" #include "control.h" #include "streambuf.h" +#include "redis.h" @@ -187,6 +188,8 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ if (pe2->confirmed && pe2->filled) kernelize(cs); + + redis_update(c); } skip: @@ -937,6 +940,8 @@ char *call_update_udp(const char **o, struct callmaster *m) { g_queue_clear(&q); + redis_update(c); + return streams_print(c->callstreams, 1, 0, o[1], 1); fail: @@ -972,6 +977,8 @@ char *call_lookup_udp(const char **o, struct callmaster *m) { g_queue_clear(&q); + redis_update(c); + return streams_print(c->callstreams, 1, 1, o[1], 1); fail: @@ -992,6 +999,8 @@ char *call_request(const char **o, struct callmaster *m) { num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0); streams_free(s); + redis_update(c); + return streams_print(c->callstreams, num, 0, NULL, 0); } @@ -1012,6 +1021,8 @@ char *call_lookup(const char **o, struct callmaster *m) { num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1); streams_free(s); + redis_update(c); + return streams_print(c->callstreams, num, 1, NULL, 0); } diff --git a/daemon/call.h b/daemon/call.h index b273cda11..9118fa0b2 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -60,6 +60,7 @@ struct call { GQueue *callstreams; char *callid; + char redis_uuid[37]; time_t created; char *calling_agent; char *called_agent; diff --git a/daemon/redis.c b/daemon/redis.c index cae5eb57f..6a28284fb 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "redis.h" #include "aux.h" @@ -19,6 +20,20 @@ +static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) { + redisReply *rp; + + rp = redisCommand(r->ctx, "TYPE %s%s", key, suffix ? : ""); + if (!rp || rp->type != REDIS_REPLY_STATUS) + return -1; + if (strcmp(rp->str, type) && strcmp(rp->str, "none")) + redisCommandNR(r->ctx, "DEL %s%s", key, suffix ? : ""); + return 0; +} + + + + static int redis_connect(struct redis *r, int wait) { struct timeval tv; redisReply *rp; @@ -136,7 +151,7 @@ int redis_restore(struct callmaster *m) { goto del2; if (rp3->element[0]->type != REDIS_REPLY_STRING) goto del2; - if (rp3->element[1]->type != REDIS_REPLY_INTEGER) + if (rp3->element[1]->type != REDIS_REPLY_STRING) goto del2; continue; @@ -144,7 +159,7 @@ int redis_restore(struct callmaster *m) { del2: freeReplyObject(rp3); del: - redisCommandNR(r->ctx, "DEL %s", rp2->str); + redisCommandNR(r->ctx, "DEL %s %s-streams", rp2->str); redisCommandNR(r->ctx, "SREM calls %s", rp2->str); } @@ -155,3 +170,46 @@ del: err: return -1; } + + + + +void redis_update(struct call *c) { + struct callmaster *cm = c->callmaster; + struct redis *r = cm->redis; + char uuid[37]; + GList *l; + struct callstream *cs; + int i; + struct peer *p; + + if (!r) + return; + + if (!c->redis_uuid[0]) + uuid_str_generate(c->redis_uuid); + + redis_check_type(r, c->redis_uuid, NULL, "hash"); + redisCommandNR(r->ctx, "HMSET %s callid %s created %i", c->redis_uuid, c->callid, c->created); + redisCommandNR(r->ctx, "DEL %s-streams-temp", c->redis_uuid); + + for (l = c->callstreams->head; l; l = l->next) { + cs = l->data; + uuid_str_generate(uuid); + + for (i = 0; i < 2; i++) { + p = &cs->peers[i]; + + redisCommandNR(r->ctx, "DEL %s:%i", uuid, i); + redisCommandNR(r->ctx, "HMSET %s:%i ip " IPF " port %i localport %i last-rtp %i last-rtcp %i kernel %i filled %i confirmed %i", uuid, i, IPP(p->rtps[0].peer.ip), p->rtps[0].peer.port, p->rtps[0].localport, p->rtps[0].last, p->rtps[1].last, p->kernelized, p->filled, p->confirmed); + redisCommandNR(r->ctx, "EXPIRE %s:%i 86400", uuid, i); + } + + redisCommandNR(r->ctx, "RPUSH %s-streams-temp %s", c->redis_uuid, uuid); + } + + redisCommandNR(r->ctx, "RENAME %s-streams-temp %s-streams", c->redis_uuid, c->redis_uuid); + redisCommandNR(r->ctx, "EXPIRE %s-streams 86400", c->redis_uuid); + redisCommandNR(r->ctx, "EXPIRE %s 86400", c->redis_uuid); + redisCommandNR(r->ctx, "SADD calls %s", c->redis_uuid); +} diff --git a/daemon/redis.h b/daemon/redis.h index f06c1f871..b223998ab 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -11,6 +11,7 @@ struct callmaster; +struct call; @@ -27,6 +28,7 @@ struct redis { struct redis *redis_new(u_int32_t, u_int16_t, int); int redis_restore(struct callmaster *); +void redis_update(struct call *c);