Browse Source

Push the call and stream information into redis - WIP

git.mgm/mediaproxy-ng/2.0
Richard Fuchs 15 years ago
parent
commit
f4a773ba6c
6 changed files with 83 additions and 3 deletions
  1. +1
    -1
      daemon/Makefile
  2. +8
    -0
      daemon/aux.h
  3. +11
    -0
      daemon/call.c
  4. +1
    -0
      daemon/call.h
  5. +60
    -2
      daemon/redis.c
  6. +2
    -0
      daemon/redis.h

+ 1
- 1
daemon/Makefile View File

@ -7,7 +7,7 @@ CFLAGS+= -D_GNU_SOURCE
#CFLAGS+= -O2
CFLAGS+= -I$(LIBREDISDIR)
LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs`
LDFLAGS+= -L$(LIBREDISDIR) -lhiredis
LDFLAGS+= -L$(LIBREDISDIR) -lhiredis -luuid
SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c control_udp.c redis.c


+ 8
- 0
daemon/aux.h View File

@ -11,6 +11,7 @@
#include <glib.h>
#include <pcre.h>
#include <stdarg.h>
#include <uuid/uuid.h>
@ -44,5 +45,12 @@ void g_string_vprintf(GString *string, const gchar *format, va_list args);
#endif
static inline void uuid_str_generate(char *s) {
uuid_t uuid;
uuid_generate(uuid);
uuid_unparse(uuid, s);
}
#endif

+ 11
- 0
daemon/call.c View File

@ -16,6 +16,7 @@
#include "kernel.h"
#include "control.h"
#include "streambuf.h"
#include "redis.h"
@ -187,6 +188,8 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
if (pe2->confirmed && pe2->filled)
kernelize(cs);
redis_update(c);
}
skip:
@ -937,6 +940,8 @@ char *call_update_udp(const char **o, struct callmaster *m) {
g_queue_clear(&q);
redis_update(c);
return streams_print(c->callstreams, 1, 0, o[1], 1);
fail:
@ -972,6 +977,8 @@ char *call_lookup_udp(const char **o, struct callmaster *m) {
g_queue_clear(&q);
redis_update(c);
return streams_print(c->callstreams, 1, 1, o[1], 1);
fail:
@ -992,6 +999,8 @@ char *call_request(const char **o, struct callmaster *m) {
num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0);
streams_free(s);
redis_update(c);
return streams_print(c->callstreams, num, 0, NULL, 0);
}
@ -1012,6 +1021,8 @@ char *call_lookup(const char **o, struct callmaster *m) {
num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1);
streams_free(s);
redis_update(c);
return streams_print(c->callstreams, num, 1, NULL, 0);
}


+ 1
- 0
daemon/call.h View File

@ -60,6 +60,7 @@ struct call {
GQueue *callstreams;
char *callid;
char redis_uuid[37];
time_t created;
char *calling_agent;
char *called_agent;


+ 60
- 2
daemon/redis.c View File

@ -3,6 +3,7 @@
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#include <uuid/uuid.h>
#include "redis.h"
#include "aux.h"
@ -19,6 +20,20 @@
static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) {
redisReply *rp;
rp = redisCommand(r->ctx, "TYPE %s%s", key, suffix ? : "");
if (!rp || rp->type != REDIS_REPLY_STATUS)
return -1;
if (strcmp(rp->str, type) && strcmp(rp->str, "none"))
redisCommandNR(r->ctx, "DEL %s%s", key, suffix ? : "");
return 0;
}
static int redis_connect(struct redis *r, int wait) {
struct timeval tv;
redisReply *rp;
@ -136,7 +151,7 @@ int redis_restore(struct callmaster *m) {
goto del2;
if (rp3->element[0]->type != REDIS_REPLY_STRING)
goto del2;
if (rp3->element[1]->type != REDIS_REPLY_INTEGER)
if (rp3->element[1]->type != REDIS_REPLY_STRING)
goto del2;
continue;
@ -144,7 +159,7 @@ int redis_restore(struct callmaster *m) {
del2:
freeReplyObject(rp3);
del:
redisCommandNR(r->ctx, "DEL %s", rp2->str);
redisCommandNR(r->ctx, "DEL %s %s-streams", rp2->str);
redisCommandNR(r->ctx, "SREM calls %s", rp2->str);
}
@ -155,3 +170,46 @@ del:
err:
return -1;
}
void redis_update(struct call *c) {
struct callmaster *cm = c->callmaster;
struct redis *r = cm->redis;
char uuid[37];
GList *l;
struct callstream *cs;
int i;
struct peer *p;
if (!r)
return;
if (!c->redis_uuid[0])
uuid_str_generate(c->redis_uuid);
redis_check_type(r, c->redis_uuid, NULL, "hash");
redisCommandNR(r->ctx, "HMSET %s callid %s created %i", c->redis_uuid, c->callid, c->created);
redisCommandNR(r->ctx, "DEL %s-streams-temp", c->redis_uuid);
for (l = c->callstreams->head; l; l = l->next) {
cs = l->data;
uuid_str_generate(uuid);
for (i = 0; i < 2; i++) {
p = &cs->peers[i];
redisCommandNR(r->ctx, "DEL %s:%i", uuid, i);
redisCommandNR(r->ctx, "HMSET %s:%i ip " IPF " port %i localport %i last-rtp %i last-rtcp %i kernel %i filled %i confirmed %i", uuid, i, IPP(p->rtps[0].peer.ip), p->rtps[0].peer.port, p->rtps[0].localport, p->rtps[0].last, p->rtps[1].last, p->kernelized, p->filled, p->confirmed);
redisCommandNR(r->ctx, "EXPIRE %s:%i 86400", uuid, i);
}
redisCommandNR(r->ctx, "RPUSH %s-streams-temp %s", c->redis_uuid, uuid);
}
redisCommandNR(r->ctx, "RENAME %s-streams-temp %s-streams", c->redis_uuid, c->redis_uuid);
redisCommandNR(r->ctx, "EXPIRE %s-streams 86400", c->redis_uuid);
redisCommandNR(r->ctx, "EXPIRE %s 86400", c->redis_uuid);
redisCommandNR(r->ctx, "SADD calls %s", c->redis_uuid);
}

+ 2
- 0
daemon/redis.h View File

@ -11,6 +11,7 @@
struct callmaster;
struct call;
@ -27,6 +28,7 @@ struct redis {
struct redis *redis_new(u_int32_t, u_int16_t, int);
int redis_restore(struct callmaster *);
void redis_update(struct call *c);


Loading…
Cancel
Save