diff --git a/daemon/Makefile b/daemon/Makefile index d8e9e8a80..16399407c 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -5,6 +5,7 @@ CFLAGS+= `pkg-config --cflags glib-2.0` CFLAGS+= `pkg-config --cflags gthread-2.0` CFLAGS+= `pkg-config --cflags zlib` CFLAGS+= `pkg-config --cflags openssl` +CFLAGS+= `pkg-config --cflags libevent` CFLAGS+= `pcre-config --cflags` CFLAGS+= -I../kernel-module/ CFLAGS+= -D_GNU_SOURCE @@ -48,6 +49,7 @@ LDFLAGS+= `pkg-config --libs zlib` LDFLAGS+= `pkg-config --libs libpcre` LDFLAGS+= `pkg-config --libs libcrypto` LDFLAGS+= `pkg-config --libs openssl` +LDFLAGS+= `pkg-config --libs libevent` LDFLAGS+= `pcre-config --libs` LDFLAGS+= `xmlrpc-c-config client --libs` LDFLAGS+= -lhiredis diff --git a/daemon/call.c b/daemon/call.c index b895d1c5c..62b47c61f 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1069,9 +1069,9 @@ out: if (ca && update) { if (sfd->call->callmaster->conf.redis_write) { - redis_update(ca, sfd->call->callmaster->conf.redis_write, ANY_REDIS_ROLE); + redis_update(ca, sfd->call->callmaster->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER); } else if (sfd->call->callmaster->conf.redis) { - redis_update(ca, sfd->call->callmaster->conf.redis, MASTER_REDIS_ROLE); + redis_update(ca, sfd->call->callmaster->conf.redis, MASTER_REDIS_ROLE, OP_OTHER); } } done: @@ -1079,9 +1079,6 @@ done: } - - - /* called with call->master_lock held in R */ static int call_timer_delete_monologues(struct call *c) { GSList *i; @@ -1515,9 +1512,9 @@ static void callmaster_timer(void *ptr) { if (update) { if (m->conf.redis_write) { - redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE); + redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER); } else if (m->conf.redis) { - redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE); + redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE, OP_OTHER); } } @@ -2847,10 +2844,10 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, /* called lock-free, but must hold a reference to the call */ void call_destroy(struct call *c) { - struct callmaster *m = c->callmaster; + struct callmaster *m; struct packet_stream *ps=0, *ps2=0; struct stream_fd *sfd; - struct poller *p = m->poller; + struct poller *p; GSList *l; int ret; struct call_monologue *ml; @@ -2866,6 +2863,13 @@ void call_destroy(struct call *c) { int found = 0; const struct rtp_payload_type *rtp_pt; + if (!c) { + return; + } + + m = c->callmaster; + p = m->poller; + rwlock_lock_w(&m->hashlock); ret = g_hash_table_remove(m->callhash, &c->callid); rwlock_unlock_w(&m->hashlock); @@ -3794,9 +3798,9 @@ static void calls_dump_iterator(void *key, void *val, void *ptr) { struct callmaster *m = c->callmaster; if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE); + redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER); } else if (m->conf.redis) { - redis_update(c, m->conf.redis, MASTER_REDIS_ROLE); + redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, OP_OTHER); } } diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index fa2783359..154d76663 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -190,9 +190,9 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o rwlock_unlock_w(&c->master_lock); if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE); + redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, opmode); } else if (m->conf.redis) { - redis_update(c, m->conf.redis, MASTER_REDIS_ROLE); + redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, opmode); } gettimeofday(&(monologue->started), NULL); @@ -346,9 +346,9 @@ out2: streams_free(&s); if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE); + redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, opmode); } else if (m->conf.redis) { - redis_update(c, m->conf.redis, MASTER_REDIS_ROLE); + redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, opmode); } ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); @@ -725,9 +725,9 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster rwlock_unlock_w(&call->master_lock); if (m->conf.redis_write) { - redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE); + redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE, opmode); } else if (m->conf.redis) { - redis_update(call, m->conf.redis, MASTER_REDIS_ROLE); + redis_update(call, m->conf.redis, MASTER_REDIS_ROLE, opmode); } obj_put(call); diff --git a/daemon/main.c b/daemon/main.c index b31260ab0..28173c633 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -637,6 +637,9 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(poller_timer_loop, ctx.p); + // TODO: if redis notify enabled. + thread_create_detach(redis_notify, ctx.m); + if (graphite_ip) thread_create_detach(graphite_loop, ctx.m); thread_create_detach(ice_thread_run, NULL); diff --git a/daemon/redis.c b/daemon/redis.c index 1f091db4d..b3797860a 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -5,6 +5,7 @@ #include #include +#include #include "redis.h" #include "compat.h" #include "aux.h" @@ -13,11 +14,9 @@ #include "str.h" #include "crypto.h" #include "dtls.h" - - - - - +#include "hiredis/hiredis.h" +#include "hiredis/async.h" +#include "hiredis/adapters/libevent.h" INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -203,7 +202,83 @@ err: return -1; } +int str_cut(char *str, int begin, int len) { + int l = strlen(str); + + if (len < 0) len = l - begin; + if (begin + len > l) len = l - begin; + memmove(str + begin, str + begin + len, l - len + 1); + + return len; +} + +static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id); + +void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { + + struct callmaster *cm = privdata; + struct redis *r = cm->conf.redis; + struct call* c; + str callid; + + redisReply *rr = (redisReply*)reply; + + if (reply == NULL || rr->type != REDIS_REPLY_ARRAY) + return; + + for (int j = 0; j < rr->elements; j++) { + rlog(LOG_INFO, "Redis-Notify: %u) %s\n", j, rr->element[j]->str); + } + + if (rr->elements != 4) + return; + + char *pch = strstr(rr->element[2]->str, "notifier-"); + if (pch == NULL) { + rlog(LOG_ERROR,"Redis-Notifier: The substring 'notifier-' has not been found in the redis notification !\n"); + return; + } + + pch += strlen("notifier-"); + str_cut(rr->element[2]->str,0,pch-rr->element[2]->str); + rr->element[2]->len = strlen(rr->element[2]->str); + rlog(LOG_INFO,"Redis-Notifier: Processing call with callid:%s\n",rr->element[2]->str); + + str_init(&callid,rr->element[2]->str); + + c = g_hash_table_lookup(cm->callhash, &callid); + + if (strncmp(rr->element[3]->str,"sadd",4)==0) { + if (c) { + rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str); + return; + } + redis_restore_call(r, cm, rr->element[2]); + } + + if (strncmp(rr->element[3]->str,"del",3)==0) { + call_destroy(c); + } + +} + +void redis_notify(void *d) { + struct callmaster *cm = d; + struct redis *r = cm->conf.redis; + struct event_base *base = event_base_new(); + + redisAsyncContext *c = redisAsyncConnect(r->host, r->port); + if (c->err) { + printf("error: %s\n", c->errstr); + return; + } + + redisLibeventAttach(c, base); + // redisAsyncCommand(c, onRedisNotification, d, "SUBSCRIBE testtopic"); + redisAsyncCommand(c, onRedisNotification, d, "psubscribe __key*__:notifier-*"); + event_base_dispatch(base); +} struct redis *redis_new(u_int32_t ip, u_int16_t port, int db, int role) { struct redis *r; @@ -270,6 +345,7 @@ static void redis_delete_call(struct call *c, struct redis *r) { struct endpoint_map *em; char *mono_key, *media_key, *em_key; + redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid)); @@ -1128,7 +1204,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, voi /* must be called lock-free */ -void redis_update(struct call *c, struct redis *r, int role) { +void redis_update(struct call *c, struct redis *r, int role, enum call_opmode opmode) { GSList *l, *n; GList *pt_list, *pt_iter; GList *k, *m; @@ -1152,6 +1228,7 @@ void redis_update(struct call *c, struct redis *r, int role) { rwlock_lock_r(&c->master_lock); + redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid)); @@ -1306,6 +1383,9 @@ void redis_update(struct call *c, struct redis *r, int role) { redis_pipe(r, "EXPIRE sfds-"PB" 86400", STR(&c->callid)); redis_pipe(r, "EXPIRE streams-"PB" 86400", STR(&c->callid)); redis_pipe(r, "SADD calls "PB"", STR(&c->callid)); + if (opmode==OP_ANSWER) { + redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); + } redis_consume(r); mutex_unlock(&r->lock); diff --git a/daemon/redis.h b/daemon/redis.h index 92c0c87e4..f7f626b16 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -9,6 +9,7 @@ #include #include #include +#include "call.h" #define MASTER_REDIS_ROLE 0 @@ -75,11 +76,12 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) #define REDIS_FMT(x) (x)->len, (x)->str +void redis_notify(void *d); struct redis *redis_new(u_int32_t, u_int16_t, int, int); int redis_restore(struct callmaster *, struct redis *, int); -void redis_update(struct call *, struct redis *, int); +void redis_update(struct call *, struct redis *, int, enum call_opmode); void redis_delete(struct call *, struct redis *, int); void redis_wipe(struct redis *, int); diff --git a/debian/control b/debian/control index 50379329c..c9323dba1 100644 --- a/debian/control +++ b/debian/control @@ -8,6 +8,8 @@ Build-Depends: debhelper (>= 5), libcurl3-openssl-dev | libcurl3-gnutls-dev, libglib2.0-dev (>= 2.30), libhiredis-dev, + libevent-2.0-5, + libevent-dev, libpcre3-dev, libssl-dev (>= 1.0.1), libxmlrpc-c3-dev (>= 1.16.07) | libxmlrpc-core-c3-dev (>= 1.16.07), diff --git a/debian/rules b/debian/rules index 6ea7b7e4a..86066cad2 100755 --- a/debian/rules +++ b/debian/rules @@ -84,6 +84,7 @@ install: build %: @echo "--- Building: $@" + dh_shlibdeps --dpkg-shlibdeps-params=--ignore-missing-info dh_installdirs -p$@ -P$(b)/$@ dh_link -p$@ -P$(b)/$@ dh_installdocs -p$@ -P$(b)/$@ debian/README.md.gz debian/README.html.gz