Browse Source

Implemented redis-notification base feature.

Currently, every rtpengine will subscribe to redis-keyspace notification
so it will receive a notification when an call is inserted. If the call
is not already handeled by the rtpengine, the call will be restored.

The reason for this is to have in-place redundancy. Imagine you have
multiple rtpengines running, eachone will have all calls of the others.
When one rtpengine fails somehow, infrastructure guys use BGP in order to
'move' the IP address from one rtpengine to another. Thisone can handle
the new calls instantly since they're already recovered by
redis-notification feature.

Next step is internally identify those calls in order to prevent some
timers to delete the calls where no RTP flows. Second will be
something we call 'partitioning'. It means that the subscription
to a redis notify will only be for the keyspace a dedicated rtpengine
writes to. This leads to the point that you can make redundancy groups
(partitions) of the rtpengines.
pull/225/head
Frederic-Philippe Metz 10 years ago
parent
commit
541e2999a9
8 changed files with 118 additions and 24 deletions
  1. +2
    -0
      daemon/Makefile
  2. +15
    -11
      daemon/call.c
  3. +6
    -6
      daemon/call_interfaces.c
  4. +3
    -0
      daemon/main.c
  5. +86
    -6
      daemon/redis.c
  6. +3
    -1
      daemon/redis.h
  7. +2
    -0
      debian/control
  8. +1
    -0
      debian/rules

+ 2
- 0
daemon/Makefile View File

@ -5,6 +5,7 @@ CFLAGS+= `pkg-config --cflags glib-2.0`
CFLAGS+= `pkg-config --cflags gthread-2.0`
CFLAGS+= `pkg-config --cflags zlib`
CFLAGS+= `pkg-config --cflags openssl`
CFLAGS+= `pkg-config --cflags libevent`
CFLAGS+= `pcre-config --cflags`
CFLAGS+= -I../kernel-module/
CFLAGS+= -D_GNU_SOURCE
@ -48,6 +49,7 @@ LDFLAGS+= `pkg-config --libs zlib`
LDFLAGS+= `pkg-config --libs libpcre`
LDFLAGS+= `pkg-config --libs libcrypto`
LDFLAGS+= `pkg-config --libs openssl`
LDFLAGS+= `pkg-config --libs libevent`
LDFLAGS+= `pcre-config --libs`
LDFLAGS+= `xmlrpc-c-config client --libs`
LDFLAGS+= -lhiredis


+ 15
- 11
daemon/call.c View File

@ -1069,9 +1069,9 @@ out:
if (ca && update) {
if (sfd->call->callmaster->conf.redis_write) {
redis_update(ca, sfd->call->callmaster->conf.redis_write, ANY_REDIS_ROLE);
redis_update(ca, sfd->call->callmaster->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER);
} else if (sfd->call->callmaster->conf.redis) {
redis_update(ca, sfd->call->callmaster->conf.redis, MASTER_REDIS_ROLE);
redis_update(ca, sfd->call->callmaster->conf.redis, MASTER_REDIS_ROLE, OP_OTHER);
}
}
done:
@ -1079,9 +1079,6 @@ done:
}
/* called with call->master_lock held in R */
static int call_timer_delete_monologues(struct call *c) {
GSList *i;
@ -1515,9 +1512,9 @@ static void callmaster_timer(void *ptr) {
if (update) {
if (m->conf.redis_write) {
redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE);
redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER);
} else if (m->conf.redis) {
redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE);
redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE, OP_OTHER);
}
}
@ -2847,10 +2844,10 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
/* called lock-free, but must hold a reference to the call */
void call_destroy(struct call *c) {
struct callmaster *m = c->callmaster;
struct callmaster *m;
struct packet_stream *ps=0, *ps2=0;
struct stream_fd *sfd;
struct poller *p = m->poller;
struct poller *p;
GSList *l;
int ret;
struct call_monologue *ml;
@ -2866,6 +2863,13 @@ void call_destroy(struct call *c) {
int found = 0;
const struct rtp_payload_type *rtp_pt;
if (!c) {
return;
}
m = c->callmaster;
p = m->poller;
rwlock_lock_w(&m->hashlock);
ret = g_hash_table_remove(m->callhash, &c->callid);
rwlock_unlock_w(&m->hashlock);
@ -3794,9 +3798,9 @@ static void calls_dump_iterator(void *key, void *val, void *ptr) {
struct callmaster *m = c->callmaster;
if (m->conf.redis_write) {
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE);
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER);
} else if (m->conf.redis) {
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE);
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, OP_OTHER);
}
}


+ 6
- 6
daemon/call_interfaces.c View File

@ -190,9 +190,9 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
rwlock_unlock_w(&c->master_lock);
if (m->conf.redis_write) {
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE);
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, opmode);
} else if (m->conf.redis) {
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE);
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, opmode);
}
gettimeofday(&(monologue->started), NULL);
@ -346,9 +346,9 @@ out2:
streams_free(&s);
if (m->conf.redis_write) {
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE);
redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, opmode);
} else if (m->conf.redis) {
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE);
redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, opmode);
}
ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret));
@ -725,9 +725,9 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
rwlock_unlock_w(&call->master_lock);
if (m->conf.redis_write) {
redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE);
redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE, opmode);
} else if (m->conf.redis) {
redis_update(call, m->conf.redis, MASTER_REDIS_ROLE);
redis_update(call, m->conf.redis, MASTER_REDIS_ROLE, opmode);
}
obj_put(call);


+ 3
- 0
daemon/main.c View File

@ -637,6 +637,9 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL);
thread_create_detach(poller_timer_loop, ctx.p);
// TODO: if redis notify enabled.
thread_create_detach(redis_notify, ctx.m);
if (graphite_ip)
thread_create_detach(graphite_loop, ctx.m);
thread_create_detach(ice_thread_run, NULL);


+ 86
- 6
daemon/redis.c View File

@ -5,6 +5,7 @@
#include <unistd.h>
#include <glib.h>
#include <glib.h>
#include "redis.h"
#include "compat.h"
#include "aux.h"
@ -13,11 +14,9 @@
#include "str.h"
#include "crypto.h"
#include "dtls.h"
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
#include "hiredis/adapters/libevent.h"
INLINE redisReply *redis_expect(int type, redisReply *r) {
if (!r)
@ -203,7 +202,83 @@ err:
return -1;
}
int str_cut(char *str, int begin, int len) {
int l = strlen(str);
if (len < 0) len = l - begin;
if (begin + len > l) len = l - begin;
memmove(str + begin, str + begin + len, l - len + 1);
return len;
}
static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id);
void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
struct callmaster *cm = privdata;
struct redis *r = cm->conf.redis;
struct call* c;
str callid;
redisReply *rr = (redisReply*)reply;
if (reply == NULL || rr->type != REDIS_REPLY_ARRAY)
return;
for (int j = 0; j < rr->elements; j++) {
rlog(LOG_INFO, "Redis-Notify: %u) %s\n", j, rr->element[j]->str);
}
if (rr->elements != 4)
return;
char *pch = strstr(rr->element[2]->str, "notifier-");
if (pch == NULL) {
rlog(LOG_ERROR,"Redis-Notifier: The substring 'notifier-' has not been found in the redis notification !\n");
return;
}
pch += strlen("notifier-");
str_cut(rr->element[2]->str,0,pch-rr->element[2]->str);
rr->element[2]->len = strlen(rr->element[2]->str);
rlog(LOG_INFO,"Redis-Notifier: Processing call with callid:%s\n",rr->element[2]->str);
str_init(&callid,rr->element[2]->str);
c = g_hash_table_lookup(cm->callhash, &callid);
if (strncmp(rr->element[3]->str,"sadd",4)==0) {
if (c) {
rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str);
return;
}
redis_restore_call(r, cm, rr->element[2]);
}
if (strncmp(rr->element[3]->str,"del",3)==0) {
call_destroy(c);
}
}
void redis_notify(void *d) {
struct callmaster *cm = d;
struct redis *r = cm->conf.redis;
struct event_base *base = event_base_new();
redisAsyncContext *c = redisAsyncConnect(r->host, r->port);
if (c->err) {
printf("error: %s\n", c->errstr);
return;
}
redisLibeventAttach(c, base);
// redisAsyncCommand(c, onRedisNotification, d, "SUBSCRIBE testtopic");
redisAsyncCommand(c, onRedisNotification, d, "psubscribe __key*__:notifier-*");
event_base_dispatch(base);
}
struct redis *redis_new(u_int32_t ip, u_int16_t port, int db, int role) {
struct redis *r;
@ -270,6 +345,7 @@ static void redis_delete_call(struct call *c, struct redis *r) {
struct endpoint_map *em;
char *mono_key, *media_key, *em_key;
redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid));
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"",
STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid));
@ -1128,7 +1204,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, voi
/* must be called lock-free */
void redis_update(struct call *c, struct redis *r, int role) {
void redis_update(struct call *c, struct redis *r, int role, enum call_opmode opmode) {
GSList *l, *n;
GList *pt_list, *pt_iter;
GList *k, *m;
@ -1152,6 +1228,7 @@ void redis_update(struct call *c, struct redis *r, int role) {
rwlock_lock_r(&c->master_lock);
redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid));
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid),
STR(&c->callid), STR(&c->callid));
@ -1306,6 +1383,9 @@ void redis_update(struct call *c, struct redis *r, int role) {
redis_pipe(r, "EXPIRE sfds-"PB" 86400", STR(&c->callid));
redis_pipe(r, "EXPIRE streams-"PB" 86400", STR(&c->callid));
redis_pipe(r, "SADD calls "PB"", STR(&c->callid));
if (opmode==OP_ANSWER) {
redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid));
}
redis_consume(r);
mutex_unlock(&r->lock);


+ 3
- 1
daemon/redis.h View File

@ -9,6 +9,7 @@
#include <glib.h>
#include <sys/types.h>
#include <hiredis/hiredis.h>
#include "call.h"
#define MASTER_REDIS_ROLE 0
@ -75,11 +76,12 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v)
#define REDIS_FMT(x) (x)->len, (x)->str
void redis_notify(void *d);
struct redis *redis_new(u_int32_t, u_int16_t, int, int);
int redis_restore(struct callmaster *, struct redis *, int);
void redis_update(struct call *, struct redis *, int);
void redis_update(struct call *, struct redis *, int, enum call_opmode);
void redis_delete(struct call *, struct redis *, int);
void redis_wipe(struct redis *, int);


+ 2
- 0
debian/control View File

@ -8,6 +8,8 @@ Build-Depends: debhelper (>= 5),
libcurl3-openssl-dev | libcurl3-gnutls-dev,
libglib2.0-dev (>= 2.30),
libhiredis-dev,
libevent-2.0-5,
libevent-dev,
libpcre3-dev,
libssl-dev (>= 1.0.1),
libxmlrpc-c3-dev (>= 1.16.07) | libxmlrpc-core-c3-dev (>= 1.16.07),


+ 1
- 0
debian/rules View File

@ -84,6 +84,7 @@ install: build
%:
@echo "--- Building: $@"
dh_shlibdeps --dpkg-shlibdeps-params=--ignore-missing-info
dh_installdirs -p$@ -P$(b)/$@
dh_link -p$@ -P$(b)/$@
dh_installdocs -p$@ -P$(b)/$@ debian/README.md.gz debian/README.html.gz


Loading…
Cancel
Save