diff --git a/daemon/call.c b/daemon/call.c index 3603ec640..8f10fdf1e 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -128,8 +128,8 @@ GHashTable *rtpe_callhash; static void __monologue_destroy(struct call_monologue *monologue); static int monologue_destroy(struct call_monologue *ml); -static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, - struct timeval *interval_start, struct timeval *interval_duration); +static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start, + struct timeval *interval_duration); /* called with call->master_lock held in R */ static int call_timer_delete_monologues(struct call *c) { @@ -171,7 +171,7 @@ out: -/* called with callmaster->hashlock held */ +/* called with hashlock held */ static void call_timer_iterator(gpointer data, gpointer user_data) { struct call *c = data; struct iterator_helper *hlp = user_data; @@ -383,19 +383,18 @@ fault: g_slice_free1(sizeof(*xh), xh); } -void kill_calls_timer(GSList *list, struct callmaster *m) { +void kill_calls_timer(GSList *list, const char *url) { struct call *ca; GList *csl; struct call_monologue *cm; - const char *url, *url_prefix, *url_suffix; + const char *url_prefix, *url_suffix; struct xmlrpc_helper *xh = NULL; char url_buf[128]; if (!list) return; - /* if m is NULL, it's the scheduled deletions, otherwise it's the timeouts */ - url = m ? rtpe_config.b2b_url : NULL; + /* if url is NULL, it's the scheduled deletions, otherwise it's the timeouts */ if (url) { xh = g_slice_alloc(sizeof(*xh)); xh->c = g_string_chunk_new(64); @@ -467,8 +466,7 @@ destroy: atomic64_add(&ps->stats.x, d); \ atomic64_add(&rtpe_statsps.x, d); \ } while (0) -static void callmaster_timer(void *ptr) { - struct callmaster *m = ptr; +static void call_timer(void *ptr) { struct iterator_helper hlp; GList *i, *l, *calls = NULL; struct rtpengine_list_entry *ke; @@ -588,7 +586,7 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); if (update) { - redis_update_onekey(ps->call, m->conf.redis_write); + redis_update_onekey(ps->call, rtpe_redis_write); } next: @@ -606,28 +604,20 @@ next: g_hash_table_destroy(hlp.addr_sfd); kill_calls_timer(hlp.del_scheduled, NULL); - kill_calls_timer(hlp.del_timeout, m); + kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url); } #undef DS -struct callmaster *callmaster_new() { - struct callmaster *c; - - c = obj_alloc0("callmaster", sizeof(*c), NULL); - +int call_init() { rtpe_callhash = g_hash_table_new(str_hash, str_equal); if (!rtpe_callhash) - goto fail; + return -1; rwlock_init(&rtpe_callhash_lock); - poller_add_timer(rtpe_poller, callmaster_timer, &c->obj); - - return c; + poller_add_timer(rtpe_poller, call_timer, NULL); -fail: - obj_put(c); - return NULL; + return 0; } @@ -1716,9 +1706,8 @@ out: return rtp_pt; /* may be NULL */ } -void add_total_calls_duration_in_interval(struct callmaster *cm, - struct timeval *interval_tv) { - struct timeval ongoing_calls_dur = add_ongoing_calls_dur_in_interval(cm, +void add_total_calls_duration_in_interval(struct timeval *interval_tv) { + struct timeval ongoing_calls_dur = add_ongoing_calls_dur_in_interval( &rtpe_latest_graphite_interval_start, interval_tv); mutex_lock(&rtpe_totalstats_interval.total_calls_duration_lock); @@ -1728,8 +1717,9 @@ void add_total_calls_duration_in_interval(struct callmaster *cm, mutex_unlock(&rtpe_totalstats_interval.total_calls_duration_lock); } -static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, - struct timeval *interval_start, struct timeval *interval_duration) { +static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start, + struct timeval *interval_duration) +{ GHashTableIter iter; gpointer key, value; struct timeval call_duration, res = {0}; @@ -1757,7 +1747,6 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, /* called lock-free, but must hold a reference to the call */ void call_destroy(struct call *c) { - struct callmaster *m; struct packet_stream *ps=0; struct stream_fd *sfd; GList *l; @@ -1771,8 +1760,6 @@ void call_destroy(struct call *c) { return; } - m = c->callmaster; - rwlock_lock_w(&rtpe_callhash_lock); ret = (g_hash_table_lookup(rtpe_callhash, &c->callid) == c); if (ret) @@ -1789,7 +1776,7 @@ void call_destroy(struct call *c) { statistics_update_foreignown_dec(c); if (IS_OWN_CALL(c)) { - redis_delete(c, m->conf.redis_write); + redis_delete(c, rtpe_redis_write); } rwlock_lock_w(&c->master_lock); @@ -2001,12 +1988,11 @@ static void __call_free(void *p) { assert(c->stream_fds.head == NULL); } -static struct call *call_create(const str *callid, struct callmaster *m) { +static struct call *call_create(const str *callid) { struct call *c; ilog(LOG_NOTICE, "Creating new call"); c = obj_alloc0("call", sizeof(*c), __call_free); - c->callmaster = m; mutex_init(&c->buffer_lock); call_buffer_init(&c->buffer); rwlock_init(&c->master_lock); @@ -2022,7 +2008,7 @@ static struct call *call_create(const str *callid, struct callmaster *m) { } /* returns call with master_lock held in W */ -struct call *call_get_or_create(const str *callid, struct callmaster *m, enum call_type type) { +struct call *call_get_or_create(const str *callid, enum call_type type) { struct call *c; restart: @@ -2031,7 +2017,7 @@ restart: if (!c) { rwlock_unlock_r(&rtpe_callhash_lock); /* completely new call-id, create call */ - c = call_create(callid, m); + c = call_create(callid); rwlock_lock_w(&rtpe_callhash_lock); if (g_hash_table_lookup(rtpe_callhash, callid)) { /* preempted */ @@ -2060,7 +2046,7 @@ restart: } /* returns call with master_lock held in W, or NULL if not found */ -struct call *call_get(const str *callid, struct callmaster *m) { +struct call *call_get(const str *callid) { struct call *ret; rwlock_lock_r(&rtpe_callhash_lock); @@ -2079,10 +2065,10 @@ struct call *call_get(const str *callid, struct callmaster *m) { } /* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */ -struct call *call_get_opmode(const str *callid, struct callmaster *m, enum call_opmode opmode) { +struct call *call_get_opmode(const str *callid, enum call_opmode opmode) { if (opmode == OP_OFFER) - return call_get_or_create(callid, m, CT_OWN_CALL); - return call_get(callid, m); + return call_get_or_create(callid, CT_OWN_CALL); + return call_get(callid); } /* must be called with call->master_lock held in W */ @@ -2351,7 +2337,7 @@ struct call_monologue *call_get_mono_dialogue(struct call *call, const str *from } -int call_delete_branch(struct callmaster *m, const str *callid, const str *branch, +int call_delete_branch(const str *callid, const str *branch, const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay) { struct call *c; @@ -2363,7 +2349,7 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc if (delete_delay < 0) delete_delay = rtpe_config.delete_delay; - c = call_get(callid, m); + c = call_get(callid); if (!c) { ilog(LOG_INFO, "Call-ID to delete not found"); goto err; @@ -2461,14 +2447,14 @@ out: } -static void callmaster_get_all_calls_interator(void *key, void *val, void *ptr) { +static void call_get_all_calls_interator(void *key, void *val, void *ptr) { GQueue *q = ptr; g_queue_push_tail(q, obj_get_o(val)); } -void callmaster_get_all_calls(struct callmaster *m, GQueue *q) { +void call_get_all_calls(GQueue *q) { rwlock_lock_r(&rtpe_callhash_lock); - g_hash_table_foreach(rtpe_callhash, callmaster_get_all_calls_interator, q); + g_hash_table_foreach(rtpe_callhash, call_get_all_calls_interator, q); rwlock_unlock_r(&rtpe_callhash_lock); } diff --git a/daemon/call.h b/daemon/call.h index b11ad5452..8a8cfe105 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -350,8 +350,6 @@ struct call_monologue { struct call { struct obj obj; - struct callmaster *callmaster; /* RO */ - mutex_t buffer_lock; call_buffer_t buffer; @@ -382,20 +380,6 @@ struct call { struct recording *recording; }; -struct callmaster_config { - struct redis *redis; - struct redis *redis_write; - struct redis *redis_notify; - - struct event_base *redis_notify_event_base; - struct redisAsyncContext *redis_notify_async_context; -}; - -struct callmaster { - struct obj obj; - - struct callmaster_config conf; -}; extern rwlock_t rtpe_callhash_lock; @@ -405,25 +389,22 @@ extern struct stats rtpe_statsps; /* per second stats, running timer */ extern struct stats rtpe_stats; /* copied from statsps once a second */ -struct callmaster *callmaster_new(void); -void callmaster_get_all_calls(struct callmaster *m, GQueue *q); +int call_init(void); +void call_get_all_calls(GQueue *q); -//void calls_dump_redis(struct callmaster *); -//void calls_dump_redis_read(struct callmaster *); -//void calls_dump_redis_write(struct callmaster *); struct call_monologue *__monologue_create(struct call *call); void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); -struct call *call_get_or_create(const str *callid, struct callmaster *m, enum call_type); -struct call *call_get_opmode(const str *callid, struct callmaster *m, enum call_opmode opmode); +struct call *call_get_or_create(const str *callid, enum call_type); +struct call *call_get_opmode(const str *callid, enum call_opmode opmode); struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag, const str *viabranch); -struct call *call_get(const str *callid, struct callmaster *m); +struct call *call_get(const str *callid); int monologue_offer_answer(struct call_monologue *monologue, GQueue *streams, const struct sdp_ng_flags *flags); -int call_delete_branch(struct callmaster *m, const str *callid, const str *branch, +int call_delete_branch(const str *callid, const str *branch, const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay); void call_destroy(struct call *); enum call_stream_state call_stream_state_machine(struct packet_stream *); @@ -434,7 +415,7 @@ int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address int *len, const struct local_intf *ifa, int keep_unspec); const struct transport_protocol *transport_protocol(const str *s); -void add_total_calls_duration_in_interval(struct callmaster *cm, struct timeval *interval_tv); +void add_total_calls_duration_in_interval(struct timeval *interval_tv); void __payload_type_free(void *p); void __rtp_stats_update(GHashTable *dst, GHashTable *src); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 6eecb0fc0..fce3be8f4 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -142,7 +142,7 @@ fail: return -1; } -static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_opmode opmode, const char* addr, +static str *call_update_lookup_udp(char **out, enum call_opmode opmode, const char* addr, const endpoint_t *sin) { struct call *c; @@ -159,7 +159,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o if (opmode == OP_ANSWER) str_swap(&fromtag, &totag); - c = call_get_opmode(&callid, m, opmode); + c = call_get_opmode(&callid, opmode); if (!c) { ilog(LOG_WARNING, "["STR_FORMAT"] Got UDP LOOKUP for unknown call-id", STR_FMT(&callid)); @@ -195,7 +195,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - redis_update_onekey(c, m->conf.redis_write); + redis_update_onekey(c, rtpe_redis_write); gettimeofday(&(monologue->started), NULL); @@ -219,11 +219,11 @@ out: return ret; } -str *call_update_udp(char **out, struct callmaster *m, const char* addr, const endpoint_t *sin) { - return call_update_lookup_udp(out, m, OP_OFFER, addr, sin); +str *call_update_udp(char **out, const char* addr, const endpoint_t *sin) { + return call_update_lookup_udp(out, OP_OFFER, addr, sin); } -str *call_lookup_udp(char **out, struct callmaster *m) { - return call_update_lookup_udp(out, m, OP_ANSWER, NULL, NULL); +str *call_lookup_udp(char **out) { + return call_update_lookup_udp(out, OP_ANSWER, NULL, NULL); } @@ -235,7 +235,7 @@ static int info_parse_func(char **a, void **ret, void *p) { return -1; } -static void info_parse(const char *s, GHashTable *ih, struct callmaster *m) { +static void info_parse(const char *s, GHashTable *ih) { pcre_multi_match(info_re, info_ree, s, 2, info_parse_func, ih, NULL); } @@ -273,7 +273,7 @@ fail: } -static void streams_parse(const char *s, struct callmaster *m, GQueue *q) { +static void streams_parse(const char *s, GQueue *q) { int i; i = 0; pcre_multi_match(streams_re, streams_ree, s, 3, streams_parse_func, &i, q); @@ -298,7 +298,7 @@ static void streams_free(GQueue *q) { -static str *call_request_lookup_tcp(char **out, struct callmaster *m, enum call_opmode opmode) { +static str *call_request_lookup_tcp(char **out, enum call_opmode opmode) { struct call *c; struct call_monologue *monologue; GQueue s = G_QUEUE_INIT; @@ -307,14 +307,14 @@ static str *call_request_lookup_tcp(char **out, struct callmaster *m, enum call_ str_init(&callid, out[RE_TCP_RL_CALLID]); infohash = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); - c = call_get_opmode(&callid, m, opmode); + c = call_get_opmode(&callid, opmode); if (!c) { ilog(LOG_WARNING, "["STR_FORMAT"] Got LOOKUP for unknown call-id", STR_FMT(&callid)); goto out; } - info_parse(out[RE_TCP_RL_INFO], infohash, m); - streams_parse(out[RE_TCP_RL_STREAMS], m, &s); + info_parse(out[RE_TCP_RL_INFO], infohash); + streams_parse(out[RE_TCP_RL_STREAMS], &s); str_init(&fromtag, g_hash_table_lookup(infohash, "fromtag")); if (!fromtag.s) { ilog(LOG_WARNING, "No from-tag in message"); @@ -343,7 +343,7 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - redis_update_onekey(c, m->conf.redis_write); + redis_update_onekey(c, rtpe_redis_write); ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -353,14 +353,14 @@ out: return ret; } -str *call_request_tcp(char **out, struct callmaster *m) { - return call_request_lookup_tcp(out, m, OP_OFFER); +str *call_request_tcp(char **out) { + return call_request_lookup_tcp(out, OP_OFFER); } -str *call_lookup_tcp(char **out, struct callmaster *m) { - return call_request_lookup_tcp(out, m, OP_ANSWER); +str *call_lookup_tcp(char **out) { + return call_request_lookup_tcp(out, OP_ANSWER); } -str *call_delete_udp(char **out, struct callmaster *m) { +str *call_delete_udp(char **out) { str callid, branch, fromtag, totag; __C_DBG("got delete for callid '%s' and viabranch '%s'", @@ -371,12 +371,12 @@ str *call_delete_udp(char **out, struct callmaster *m) { str_init(&fromtag, out[RE_UDP_DQ_FROMTAG]); str_init(&totag, out[RE_UDP_DQ_TOTAG]); - if (call_delete_branch(m, &callid, &branch, &fromtag, &totag, NULL, -1)) + if (call_delete_branch(&callid, &branch, &fromtag, &totag, NULL, -1)) return str_sprintf("%s E8\n", out[RE_UDP_COOKIE]); return str_sprintf("%s 0\n", out[RE_UDP_COOKIE]); } -str *call_query_udp(char **out, struct callmaster *m) { +str *call_query_udp(char **out) { struct call *c; str *ret, callid, fromtag, totag; struct call_stats stats; @@ -387,7 +387,7 @@ str *call_query_udp(char **out, struct callmaster *m) { str_init(&fromtag, out[RE_UDP_DQ_FROMTAG]); str_init(&totag, out[RE_UDP_DQ_TOTAG]); - c = call_get_opmode(&callid, m, OP_OTHER); + c = call_get_opmode(&callid, OP_OTHER); if (!c) { ilog(LOG_INFO, "["STR_FORMAT"] Call-ID to query not found", STR_FMT(&callid)); goto err; @@ -417,11 +417,11 @@ out: return ret; } -void call_delete_tcp(char **out, struct callmaster *m) { +void call_delete_tcp(char **out) { str callid; str_init(&callid, out[RE_TCP_D_CALLID]); - call_delete_branch(m, &callid, NULL, NULL, NULL, NULL, -1); + call_delete_branch(&callid, NULL, NULL, NULL, NULL, -1); } static void call_status_iterator(struct call *c, struct streambuf_stream *s) { @@ -430,10 +430,8 @@ static void call_status_iterator(struct call *c, struct streambuf_stream *s) { // struct peer *p; // struct streamrelay *r1, *r2; // struct streamrelay *rx1, *rx2; -// struct callmaster *m; // char addr1[64], addr2[64], addr3[64]; -// m = c->callmaster; // mutex_lock(&c->master_lock); streambuf_printf(s->outbuf, "session "STR_FORMAT" - - - - %lli\n", @@ -445,11 +443,11 @@ static void call_status_iterator(struct call *c, struct streambuf_stream *s) { // mutex_unlock(&c->master_lock); } -void calls_status_tcp(struct callmaster *m, struct streambuf_stream *s) { +void calls_status_tcp(struct streambuf_stream *s) { GQueue q = G_QUEUE_INIT; struct call *c; - callmaster_get_all_calls(m, &q); + call_get_all_calls(&q); streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n", g_queue_get_length(&q), @@ -661,7 +659,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu bencode_dictionary_get_str(input, "metadata", &out->metadata); } -static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster *m, +static const char *call_offer_answer_ng(bencode_item_t *input, bencode_item_t *output, enum call_opmode opmode, const char* addr, const endpoint_t *sin) { @@ -702,7 +700,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster goto out; /* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */ - call = call_get(&callid, m); + call = call_get(&callid); /* Failover scenario because of timeout on offer response: siprouter tries * to establish session with another rtpengine2 even though rtpengine1 @@ -715,12 +713,12 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster rwlock_unlock_w(&call->master_lock); call_destroy(call); obj_put(call); - call = call_get_or_create(&callid, m, CT_OWN_CALL); + call = call_get_or_create(&callid, CT_OWN_CALL); } } else { /* call == NULL, should create call */ - call = call_get_or_create(&callid, m, CT_OWN_CALL); + call = call_get_or_create(&callid, CT_OWN_CALL); } } @@ -783,7 +781,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster rwlock_unlock_w(&call->master_lock); if (!flags.no_redis_update) { - redis_update_onekey(call,m->conf.redis_write); + redis_update_onekey(call, rtpe_redis_write); } else { ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); } @@ -812,7 +810,7 @@ out: return errstr; } -const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output, const char* addr, +const char *call_offer_ng(bencode_item_t *input, bencode_item_t *output, const char* addr, const endpoint_t *sin) { rwlock_lock_r(&rtpe_config.config_lock); @@ -834,14 +832,14 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i } rwlock_unlock_r(&rtpe_config.config_lock); - return call_offer_answer_ng(input, m, output, OP_OFFER, addr, sin); + return call_offer_answer_ng(input, output, OP_OFFER, addr, sin); } -const char *call_answer_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { - return call_offer_answer_ng(input, m, output, OP_ANSWER, NULL, NULL); +const char *call_answer_ng(bencode_item_t *input, bencode_item_t *output) { + return call_offer_answer_ng(input, output, OP_ANSWER, NULL, NULL); } -const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { +const char *call_delete_ng(bencode_item_t *input, bencode_item_t *output) { str fromtag, totag, viabranch, callid; bencode_item_t *flags, *it; int fatal = 0, delete_delay; @@ -871,7 +869,7 @@ const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_ } } - if (call_delete_branch(m, &callid, &viabranch, &fromtag, &totag, output, delete_delay)) { + if (call_delete_branch(&callid, &viabranch, &fromtag, &totag, output, delete_delay)) { if (fatal) return "Call-ID not found or tags didn't match"; bencode_dictionary_add_string(output, "warning", "Call-ID not found or tags didn't match"); @@ -1145,7 +1143,7 @@ stats: ng_stats(bencode_dictionary_add_dictionary(dict, "RTCP"), &totals->totals[1], NULL); } -static void ng_list_calls( struct callmaster *m, bencode_item_t *output, long long int limit) { +static void ng_list_calls(bencode_item_t *output, long long int limit) { GHashTableIter iter; gpointer key, value; @@ -1161,13 +1159,13 @@ static void ng_list_calls( struct callmaster *m, bencode_item_t *output, long lo -const char *call_query_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { +const char *call_query_ng(bencode_item_t *input, bencode_item_t *output) { str callid, fromtag, totag; struct call *call; if (!bencode_dictionary_get_str(input, "call-id", &callid)) return "No call-id in message"; - call = call_get_opmode(&callid, m, OP_OTHER); + call = call_get_opmode(&callid, OP_OTHER); if (!call) return "Unknown call-id"; bencode_dictionary_get_str(input, "from-tag", &fromtag); @@ -1181,7 +1179,7 @@ const char *call_query_ng(bencode_item_t *input, struct callmaster *m, bencode_i } -const char *call_list_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { +const char *call_list_ng(bencode_item_t *input, bencode_item_t *output) { bencode_item_t *calls = NULL; long long int limit; @@ -1192,13 +1190,13 @@ const char *call_list_ng(bencode_item_t *input, struct callmaster *m, bencode_it } calls = bencode_dictionary_add_list(output, "calls"); - ng_list_calls(m, calls, limit); + ng_list_calls(calls, limit); return NULL; } -const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { +const char *call_start_recording_ng(bencode_item_t *input, bencode_item_t *output) { str callid; struct call *call; str metadata; @@ -1206,7 +1204,7 @@ const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m, if (!bencode_dictionary_get_str(input, "call-id", &callid)) return "No call-id in message"; bencode_dictionary_get_str(input, "metadata", &metadata); - call = call_get_opmode(&callid, m, OP_OTHER); + call = call_get_opmode(&callid, OP_OTHER); if (!call) return "Unknown call-id"; @@ -1218,13 +1216,13 @@ const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m, return NULL; } -const char *call_stop_recording_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { +const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output) { str callid; struct call *call; if (!bencode_dictionary_get_str(input, "call-id", &callid)) return "No call-id in message"; - call = call_get_opmode(&callid, m, OP_OTHER); + call = call_get_opmode(&callid, OP_OTHER); if (!call) return "Unknown call-id"; diff --git a/daemon/call_interfaces.h b/daemon/call_interfaces.h index 997329022..1cff0b1ce 100644 --- a/daemon/call_interfaces.h +++ b/daemon/call_interfaces.h @@ -13,7 +13,6 @@ struct call; struct call_stats; -struct callmaster; struct streambuf_stream; struct sockaddr_in6; @@ -67,24 +66,24 @@ extern int trust_address_def; extern int dtls_passive_def; -str *call_request_tcp(char **, struct callmaster *); -str *call_lookup_tcp(char **, struct callmaster *); -void call_delete_tcp(char **, struct callmaster *); -void calls_status_tcp(struct callmaster *, struct streambuf_stream *); +str *call_request_tcp(char **); +str *call_lookup_tcp(char **); +void call_delete_tcp(char **); +void calls_status_tcp(struct streambuf_stream *); -str *call_update_udp(char **, struct callmaster *, const char*, const endpoint_t *); -str *call_lookup_udp(char **, struct callmaster *); -str *call_delete_udp(char **, struct callmaster *); -str *call_query_udp(char **, struct callmaster *); +str *call_update_udp(char **, const char*, const endpoint_t *); +str *call_lookup_udp(char **); +str *call_delete_udp(char **); +str *call_query_udp(char **); -const char *call_offer_ng(bencode_item_t *, struct callmaster *, bencode_item_t *, const char*, +const char *call_offer_ng(bencode_item_t *, bencode_item_t *, const char*, const endpoint_t *); -const char *call_answer_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); -const char *call_delete_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); -const char *call_query_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); -const char *call_list_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); -const char *call_start_recording_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); -const char *call_stop_recording_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); +const char *call_answer_ng(bencode_item_t *, bencode_item_t *); +const char *call_delete_ng(bencode_item_t *, bencode_item_t *); +const char *call_query_ng(bencode_item_t *, bencode_item_t *); +const char *call_list_ng(bencode_item_t *, bencode_item_t *); +const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *); +const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *); void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output, struct call_stats *totals); diff --git a/daemon/cli.c b/daemon/cli.c index b6a82397f..2678add3a 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -28,33 +28,33 @@ #include "rtpengine_config.h" -typedef void (*cli_handler_func)(str *, struct callmaster *, struct streambuf *); +typedef void (*cli_handler_func)(str *, struct streambuf *); typedef struct { const char *cmd; cli_handler_func handler; } cli_handler_t; -static void cli_incoming_list(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_set(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_terminate(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_kslist(str *instr, struct callmaster* m, struct streambuf *replybuffer); - -static void cli_incoming_set_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_set_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_set_silenttimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_set_finaltimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_set_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer); - -static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); -static void cli_incoming_list_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer); +static void cli_incoming_list(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set(str *instr, struct streambuf *replybuffer); +static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer); +static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer); +static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer); +static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer); + +static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer); +static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer); + +static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer); static const cli_handler_t cli_top_handlers[] = { { "list", cli_incoming_list }, @@ -86,7 +86,7 @@ static const cli_handler_t cli_list_handlers[] = { }; -static void cli_handler_do(const cli_handler_t *handlers, str *instr, struct callmaster *m, +static void cli_handler_do(const cli_handler_t *handlers, str *instr, struct streambuf *replybuffer) { const cli_handler_t *h; @@ -94,14 +94,14 @@ static void cli_handler_do(const cli_handler_t *handlers, str *instr, struct cal for (h = handlers; h->cmd; h++) { if (str_shift_cmp(instr, h->cmd)) continue; - h->handler(instr, m, replybuffer); + h->handler(instr, replybuffer); return; } streambuf_printf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", instr->s); } -static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign_call, unsigned int uint_keyspace_db) { +static void destroy_own_foreign_calls(unsigned int foreign_call, unsigned int uint_keyspace_db) { struct call *c = NULL; struct call_monologue *ml = NULL; GQueue call_list = G_QUEUE_INIT; @@ -155,19 +155,19 @@ static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign } } -static void destroy_all_foreign_calls(struct callmaster *m) { - destroy_own_foreign_calls(m, CT_FOREIGN_CALL, UNDEFINED); +static void destroy_all_foreign_calls(void) { + destroy_own_foreign_calls(CT_FOREIGN_CALL, UNDEFINED); } -static void destroy_all_own_calls(struct callmaster *m) { - destroy_own_foreign_calls(m, CT_OWN_CALL, UNDEFINED); +static void destroy_all_own_calls(void) { + destroy_own_foreign_calls(CT_OWN_CALL, UNDEFINED); } -static void destroy_keyspace_foreign_calls(struct callmaster *m, unsigned int uint_keyspace_db) { - destroy_own_foreign_calls(m, CT_FOREIGN_CALL, uint_keyspace_db); +static void destroy_keyspace_foreign_calls(unsigned int uint_keyspace_db) { + destroy_own_foreign_calls(CT_FOREIGN_CALL, uint_keyspace_db); } -static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer) { struct timeval avg, calls_dur_iv; u_int64_t num_sessions, min_sess_iv, max_sess_iv; struct request_time offer_iv, answer_iv, delete_iv; @@ -254,7 +254,7 @@ static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct st g_list_free(list); } -static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer) { rwlock_lock_r(&rtpe_callhash_lock); streambuf_printf(replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(rtpe_callhash) - atomic64_get(&rtpe_stats.foreign_sessions)); streambuf_printf(replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&rtpe_stats.foreign_sessions)); @@ -262,14 +262,14 @@ static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, stru rwlock_unlock_r(&rtpe_callhash_lock); } -static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer) { /* don't lock anything while reading the value */ streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions); return ; } -static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer) { struct rlimit rlim; pid_t pid = getpid(); @@ -287,7 +287,7 @@ static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, str return ; } -static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer) { rwlock_lock_r(&rtpe_config.config_lock); /* don't lock anything while reading the value */ @@ -300,7 +300,7 @@ static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct s return ; } -static void cli_incoming_list_callid(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) { struct call* c=0; struct call_monologue *ml; struct call_media *md; @@ -316,7 +316,7 @@ static void cli_incoming_list_callid(str *instr, struct callmaster* m, struct st return; } - c = call_get(instr, m); + c = call_get(instr); if (!c) { streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s); @@ -400,7 +400,7 @@ static void cli_incoming_list_callid(str *instr, struct callmaster* m, struct st obj_put(c); } -static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer) { GHashTableIter iter; gpointer key, value; str *ptrkey; @@ -466,13 +466,13 @@ static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct } } else { // list session for callid - cli_incoming_list_callid(instr, m, replybuffer); + cli_incoming_list_callid(instr, replybuffer); } return; } -static void cli_incoming_set_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer) { unsigned long open_files_num; pid_t pid; char *endptr; @@ -505,7 +505,7 @@ static void cli_incoming_set_maxopenfiles(str *instr, struct callmaster* m, stru } } -static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer) { long maxsessions_num; int disabled = -1; char *endptr; @@ -540,7 +540,7 @@ static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struc return; } -static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer, int *conf_timeout) { +static void cli_incoming_set_gentimeout(str *instr, struct streambuf *replybuffer, int *conf_timeout) { long timeout_num; char *endptr; @@ -565,35 +565,35 @@ static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct } } -static void cli_incoming_set_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.timeout); +static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer) { + cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.timeout); } -static void cli_incoming_set_silenttimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.silent_timeout); +static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer) { + cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.silent_timeout); } -static void cli_incoming_set_finaltimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.final_timeout); +static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer) { + cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.final_timeout); } -static void cli_incoming_list(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list(str *instr, struct streambuf *replybuffer) { if (str_shift(instr, 1)) { streambuf_printf(replybuffer, "%s\n", "More parameters required."); return; } - cli_handler_do(cli_list_handlers, instr, m, replybuffer); + cli_handler_do(cli_list_handlers, instr, replybuffer); } -static void cli_incoming_set(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_set(str *instr, struct streambuf *replybuffer) { if (str_shift(instr, 1)) { streambuf_printf(replybuffer, "%s\n", "More parameters required."); return; } - cli_handler_do(cli_set_handlers, instr, m, replybuffer); + cli_handler_do(cli_set_handlers, instr, replybuffer); } -static void cli_incoming_terminate(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { struct call* c=0; struct call_monologue *ml; GList *i; @@ -606,10 +606,10 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre // --- terminate all calls if (!str_memcmp(instr,"all")) { // destroy own calls - destroy_all_own_calls(m); + destroy_all_own_calls(); // destroy foreign calls - destroy_all_foreign_calls(m); + destroy_all_foreign_calls(); // update cli ilog(LOG_INFO,"All calls terminated by operator."); @@ -620,7 +620,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre // --- terminate own calls } else if (!str_memcmp(instr,"own")) { // destroy own calls - destroy_all_own_calls(m); + destroy_all_own_calls(); // update cli ilog(LOG_INFO,"All own calls terminated by operator."); @@ -631,7 +631,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre // --- terminate foreign calls } else if (!str_memcmp(instr,"foreign")) { // destroy foreign calls - destroy_all_foreign_calls(m); + destroy_all_foreign_calls(); // update cli ilog(LOG_INFO,"All foreign calls terminated by operator."); @@ -641,7 +641,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre } // --- terminate a dedicated call id - c = call_get(instr, m); + c = call_get(instr); if (!c) { streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s); @@ -665,7 +665,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre obj_put(c); } -static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer) { unsigned long uint_keyspace_db; char *endptr; @@ -684,7 +684,7 @@ static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambu rwlock_lock_w(&rtpe_config.config_lock); if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) { g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); - redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db); + redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, uint_keyspace_db); streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db); } else { streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db); @@ -693,7 +693,7 @@ static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambu } } -static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer) { GList *l; unsigned long uint_keyspace_db; char *endptr; @@ -712,12 +712,12 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s); } else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) { // remove this keyspace - redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); + redis_notify_subscribe_action(UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data); streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db); // destroy foreign calls for this keyspace - destroy_keyspace_foreign_calls(m, uint_keyspace_db); + destroy_keyspace_foreign_calls(uint_keyspace_db); // update cli streambuf_printf(replybuffer, "Successfully removed all foreign calls for keyspace %lu.\n", uint_keyspace_db); @@ -728,7 +728,7 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf } -static void cli_incoming_kslist(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer) { GList *l; streambuf_printf(replybuffer, "\nSubscribed-on keyspaces:\n"); @@ -747,7 +747,6 @@ static void cli_incoming(struct streambuf_stream *s) { } static void cli_stream_readable(struct streambuf_stream *s) { - struct cli *cli = (void *) s->parent; static const int MAXINPUT = 1024; char *inbuf; str instr; @@ -764,17 +763,17 @@ static void cli_stream_readable(struct streambuf_stream *s) { ilog(LOG_INFO, "Got CLI command:%s",inbuf); str_init(&instr, inbuf); - cli_handler_do(cli_top_handlers, &instr, cli->callmaster, s->outbuf); + cli_handler_do(cli_top_handlers, &instr, s->outbuf); free(inbuf); streambuf_stream_shutdown(s); log_info_clear(); } -struct cli *cli_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { +struct cli *cli_new(struct poller *p, endpoint_t *ep) { struct cli *c; - if (!p || !m) + if (!p) return NULL; c = obj_alloc0("cli", sizeof(*c), NULL); @@ -801,7 +800,6 @@ struct cli *cli_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { } c->poller = p; - c->callmaster = m; obj_put(c); return c; @@ -812,10 +810,10 @@ fail: return NULL; } -static void cli_incoming_list_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer) { streambuf_printf(replybuffer, "%i\n", g_atomic_int_get(&log_level)); } -static void cli_incoming_set_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer) { +static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer) { int nl; if (str_shift(instr, 1)) { diff --git a/daemon/cli.h b/daemon/cli.h index 5ce263d4b..7f6264d55 100644 --- a/daemon/cli.h +++ b/daemon/cli.h @@ -8,12 +8,11 @@ struct cli { struct obj obj; - struct callmaster *callmaster; struct poller *poller; struct streambuf_listener listeners[2]; }; -struct cli *cli_new(struct poller *p, endpoint_t *, struct callmaster *m); +struct cli *cli_new(struct poller *p, endpoint_t *); #endif /* CLI_UDP_H_ */ diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 49508c473..3babbe7c4 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -183,7 +183,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin // start offer timer gettimeofday(&offer_start, NULL); - errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); + errstr = call_offer_ng(dict, resp, addr, sin); g_atomic_int_inc(&cur->offer); // stop offer timer @@ -197,7 +197,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin // start answer timer gettimeofday(&answer_start, NULL); - errstr = call_answer_ng(dict, c->callmaster, resp); + errstr = call_answer_ng(dict, resp); g_atomic_int_inc(&cur->answer); // stop answer timer @@ -211,7 +211,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin // start delete timer gettimeofday(&delete_start, NULL); - errstr = call_delete_ng(dict, c->callmaster, resp); + errstr = call_delete_ng(dict, resp); g_atomic_int_inc(&cur->delete); // stop delete timer @@ -222,19 +222,19 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin ilog(LOG_INFO, "delete time = %llu.%06llu sec", (unsigned long long)delete_stop.tv_sec, (unsigned long long)delete_stop.tv_usec); } else if (!str_cmp(&cmd, "query")) { - errstr = call_query_ng(dict, c->callmaster, resp); + errstr = call_query_ng(dict, resp); g_atomic_int_inc(&cur->query); } else if (!str_cmp(&cmd, "list")) { - errstr = call_list_ng(dict, c->callmaster, resp); + errstr = call_list_ng(dict, resp); g_atomic_int_inc(&cur->list); } else if (!str_cmp(&cmd, "start recording")) { - errstr = call_start_recording_ng(dict, c->callmaster, resp); + errstr = call_start_recording_ng(dict, resp); g_atomic_int_inc(&cur->start_recording); } else if (!str_cmp(&cmd, "stop recording")) { - errstr = call_stop_recording_ng(dict, c->callmaster, resp); + errstr = call_stop_recording_ng(dict, resp); g_atomic_int_inc(&cur->stop_recording); } else @@ -309,15 +309,14 @@ out: -struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, struct callmaster *m, unsigned char tos) { +struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) { struct control_ng *c; - if (!p || !m) + if (!p) return NULL; c = obj_alloc0("control_ng", sizeof(*c), NULL); - c->callmaster = m; cookie_cache_init(&c->cookie_cache); if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj)) diff --git a/daemon/control_ng.h b/daemon/control_ng.h index 8f22bb58c..e5f2d0429 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -8,7 +8,6 @@ struct poller; -struct callmaster; struct control_ng_stats { sockaddr_t proxy; @@ -25,12 +24,11 @@ struct control_ng_stats { struct control_ng { struct obj obj; - struct callmaster *callmaster; struct cookie_cache cookie_cache; socket_t udp_listeners[2]; }; -struct control_ng *control_ng_new(struct poller *, endpoint_t *, struct callmaster *, unsigned char); +struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char); void control_ng_init(void); extern mutex_t rtpe_cngs_lock; diff --git a/daemon/control_tcp.c b/daemon/control_tcp.c index c5a893fdb..35af4ce45 100644 --- a/daemon/control_tcp.c +++ b/daemon/control_tcp.c @@ -32,7 +32,6 @@ struct control_tcp { pcre_extra *parse_ree; struct poller *poller; - struct callmaster *callmaster; }; @@ -91,13 +90,13 @@ static int control_stream_parse(struct streambuf_stream *s, char *line) { if (!strcmp(out[RE_TCP_RL_CMD], "request")) - output = call_request_tcp(out, c->callmaster); + output = call_request_tcp(out); else if (!strcmp(out[RE_TCP_RL_CMD], "lookup")) - output = call_lookup_tcp(out, c->callmaster); + output = call_lookup_tcp(out); else if (!strcmp(out[RE_TCP_D_CMD], "delete")) - call_delete_tcp(out, c->callmaster); + call_delete_tcp(out); else if (!strcmp(out[RE_TCP_DIV_CMD], "status")) - calls_status_tcp(c->callmaster, s); + calls_status_tcp(s); else if (!strcmp(out[RE_TCP_DIV_CMD], "build") || !strcmp(out[RE_TCP_DIV_CMD], "version")) streambuf_printf(s->outbuf, "Version: %s\n", RTPENGINE_VERSION); else if (!strcmp(out[RE_TCP_DIV_CMD], "controls")) @@ -155,15 +154,13 @@ static void control_incoming(struct streambuf_stream *s) { } -struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { +struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) { struct control_tcp *c; const char *errptr; int erroff; if (!p) return NULL; - if (!m) - return NULL; c = obj_alloc0("control", sizeof(*c), NULL); @@ -195,7 +192,6 @@ struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep, struct cal c->parse_ree = pcre_study(c->parse_re, 0, &errptr); c->poller = p; - c->callmaster = m; obj_put(c); return c; diff --git a/daemon/control_tcp.h b/daemon/control_tcp.h index 9ca94c969..0b5c36268 100644 --- a/daemon/control_tcp.h +++ b/daemon/control_tcp.h @@ -30,13 +30,12 @@ #define RE_TCP_DIV_CMD 14 struct poller; -struct callmaster; struct control_tcp; struct streambuf_stream; -struct control_tcp *control_tcp_new(struct poller *, endpoint_t *, struct callmaster *); +struct control_tcp *control_tcp_new(struct poller *, endpoint_t *); diff --git a/daemon/control_udp.c b/daemon/control_udp.c index c64a3e6c4..ee34c69f8 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -86,13 +86,13 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si log_info_c_string(out[RE_UDP_DQ_CALLID]); if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U') - reply = call_update_udp(out, u->callmaster, addr, sin); + reply = call_update_udp(out, addr, sin); else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L') - reply = call_lookup_udp(out, u->callmaster); + reply = call_lookup_udp(out); else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'D') - reply = call_delete_udp(out, u->callmaster); + reply = call_delete_udp(out); else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'Q') - reply = call_query_udp(out, u->callmaster); + reply = call_query_udp(out); else if (chrtoupper(out[RE_UDP_V_CMD][0]) == 'V') { iovlen = 2; @@ -134,17 +134,16 @@ out: log_info_clear(); } -struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { +struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep) { struct control_udp *c; const char *errptr; int erroff; - if (!p || !m) + if (!p) return NULL; c = obj_alloc0("control_udp", sizeof(*c), NULL); - c->callmaster = m; c->parse_re = pcre_compile( /* cookie cmd flags callid viabranch:5 */ "^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \ diff --git a/daemon/control_udp.h b/daemon/control_udp.h index f30a45c14..420b9fb62 100644 --- a/daemon/control_udp.h +++ b/daemon/control_udp.h @@ -39,7 +39,6 @@ #define RE_UDP_V_PARMS 20 struct poller; -struct callmaster; @@ -48,7 +47,6 @@ struct callmaster; struct control_udp { struct obj obj; - struct callmaster *callmaster; struct cookie_cache cookie_cache; socket_t udp_listeners[2]; @@ -61,7 +59,7 @@ struct control_udp { -struct control_udp *control_udp_new(struct poller *, endpoint_t *, struct callmaster *); +struct control_udp *control_udp_new(struct poller *, endpoint_t *); diff --git a/daemon/graphite.c b/daemon/graphite.c index af6141a5a..39e2343d3 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -85,16 +85,10 @@ int connect_to_graphite_server(const endpoint_t *graphite_ep) { return 0; } -int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) { +int send_graphite_data(struct totalstats *sent_data) { int rc=0; - // sanity checks - if (!cm) { - ilog(LOG_ERROR, "NULL callmaster when trying to send data"); - return -1; - } - if (graphite_sock.fd < 0) { ilog(LOG_ERROR,"Graphite socket is not connected."); return -1; @@ -248,17 +242,11 @@ static inline void copy_with_lock(struct totalstats *ts_dst, struct totalstats * mutex_unlock(ts_lock); } -void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds) { +void graphite_loop_run(endpoint_t *graphite_ep, int seconds) { int rc=0; struct pollfd wfds[1]; - // sanity checks - if (!cm) { - ilog(LOG_ERROR, "NULL callmaster"); - return ; - } - if (!graphite_ep) { ilog(LOG_ERROR, "NULL graphite_ep"); return ; @@ -311,9 +299,9 @@ void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int secon } if (graphite_sock.fd >= 0 && connection_state == STATE_CONNECTED) { - add_total_calls_duration_in_interval(cm, &graphite_interval_tv); + add_total_calls_duration_in_interval(&graphite_interval_tv); - rc = send_graphite_data(cm, &graphite_stats); + rc = send_graphite_data(&graphite_stats); gettimeofday(&rtpe_latest_graphite_interval_start, NULL); if (rc < 0) { ilog(LOG_ERROR,"Sending graphite data failed."); @@ -327,14 +315,6 @@ void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int secon } void graphite_loop(void *d) { - struct callmaster *cm = d; - - // sanity checks - if (!cm) { - ilog(LOG_ERROR, "NULL callmaster"); - return ; - } - if (rtpe_config.graphite_interval <= 0) { ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); rtpe_config.graphite_interval=1; @@ -343,5 +323,5 @@ void graphite_loop(void *d) { connect_to_graphite_server(&rtpe_config.graphite_ep); while (!rtpe_shutdown) - graphite_loop_run(cm, &rtpe_config.graphite_ep, rtpe_config.graphite_interval); // time in seconds + graphite_loop_run(&rtpe_config.graphite_ep, rtpe_config.graphite_interval); // time in seconds } diff --git a/daemon/graphite.h b/daemon/graphite.h index d83ccbc2d..84a6647d9 100644 --- a/daemon/graphite.h +++ b/daemon/graphite.h @@ -19,8 +19,8 @@ enum connection_state { extern struct timeval rtpe_latest_graphite_interval_start; int connect_to_graphite_server(const endpoint_t *ep); -int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data); -void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds); +int send_graphite_data(struct totalstats *sent_data); +void graphite_loop_run(endpoint_t *graphite_ep, int seconds); void set_prefix(char* prefix); void graphite_loop(void *d); void set_latest_graphite_interval_start(struct timeval *tv); diff --git a/daemon/main.c b/daemon/main.c index 8d4797312..5633402ff 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -41,11 +41,6 @@ -struct main_context { - struct callmaster *m; -}; - - struct poller *rtpe_poller; @@ -497,10 +492,12 @@ static void init_everything() { if (call_interfaces_init()) abort(); statistics_init(); + if (call_init()) + abort(); } -static void create_everything(struct main_context *ctx) { +static void create_everything(void) { struct control_tcp *ct; struct control_udp *cu; struct control_ng *cn; @@ -524,10 +521,6 @@ no_kernel: if (!rtpe_poller) die("poller creation failed"); - ctx->m = callmaster_new(); - if (!ctx->m) - die("callmaster creation failed"); - dtls_timer(rtpe_poller); rwlock_init(&rtpe_config.config_lock); @@ -546,7 +539,7 @@ no_kernel: ct = NULL; if (tcp_listen_ep.port) { - ct = control_tcp_new(rtpe_poller, &tcp_listen_ep, ctx->m); + ct = control_tcp_new(rtpe_poller, &tcp_listen_ep); if (!ct) die("Failed to open TCP control connection port"); } @@ -554,7 +547,7 @@ no_kernel: cu = NULL; if (udp_listen_ep.port) { interfaces_exclude_port(udp_listen_ep.port); - cu = control_udp_new(rtpe_poller, &udp_listen_ep, ctx->m); + cu = control_udp_new(rtpe_poller, &udp_listen_ep); if (!cu) die("Failed to open UDP control connection port"); } @@ -562,7 +555,7 @@ no_kernel: cn = NULL; if (ng_listen_ep.port) { interfaces_exclude_port(ng_listen_ep.port); - cn = control_ng_new(rtpe_poller, &ng_listen_ep, ctx->m, rtpe_config.control_tos); + cn = control_ng_new(rtpe_poller, &ng_listen_ep, rtpe_config.control_tos); if (!cn) die("Failed to open UDP control connection port"); } @@ -570,27 +563,27 @@ no_kernel: cl = NULL; if (cli_listen_ep.port) { interfaces_exclude_port(cli_listen_ep.port); - cl = cli_new(rtpe_poller, &cli_listen_ep, ctx->m); + cl = cli_new(rtpe_poller, &cli_listen_ep); if (!cl) die("Failed to open UDP CLI connection port"); } if (!is_addr_unspecified(&redis_write_ep.address)) { - ctx->m->conf.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required); - if (!ctx->m->conf.redis_write) + rtpe_redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required); + if (!rtpe_redis_write) die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.", endpoint_print_buf(&redis_write_ep)); } if (!is_addr_unspecified(&redis_ep.address)) { - ctx->m->conf.redis = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); - ctx->m->conf.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); - if (!ctx->m->conf.redis || !ctx->m->conf.redis_notify) + rtpe_redis = redis_new(&redis_ep, redis_db, redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + rtpe_redis_notify = redis_new(&redis_ep, redis_db, redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + if (!rtpe_redis || !rtpe_redis_notify) die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.", endpoint_print_buf(&redis_ep)); - if (!ctx->m->conf.redis_write) - ctx->m->conf.redis_write = ctx->m->conf.redis; + if (!rtpe_redis_write) + rtpe_redis_write = rtpe_redis; } daemonize(); @@ -600,12 +593,12 @@ no_kernel: rtcp_init(); // must come after Homer init - if (ctx->m->conf.redis) { + if (rtpe_redis) { // start redis restore timer gettimeofday(&redis_start, NULL); // restore - if (redis_restore(ctx->m, ctx->m->conf.redis)) + if (redis_restore(rtpe_redis)) die("Refusing to continue without working Redis database"); // stop redis restore timer @@ -624,13 +617,12 @@ no_kernel: int main(int argc, char **argv) { - struct main_context ctx; int idx=0; early_init(); options(&argc, &argv); init_everything(); - create_everything(&ctx); + create_everything(); ilog(LOG_INFO, "Startup complete, version %s", RTPENGINE_VERSION); @@ -638,10 +630,10 @@ int main(int argc, char **argv) { thread_create_detach(poller_timer_loop, rtpe_poller); if (!is_addr_unspecified(&redis_ep.address)) - thread_create_detach(redis_notify_loop, ctx.m); + thread_create_detach(redis_notify_loop, NULL); if (!is_addr_unspecified(&rtpe_config.graphite_ep.address)) - thread_create_detach(graphite_loop, ctx.m); + thread_create_detach(graphite_loop, NULL); thread_create_detach(ice_thread_run, NULL); @@ -663,7 +655,7 @@ int main(int argc, char **argv) { } if (!is_addr_unspecified(&redis_ep.address)) - redis_notify_event_base_action(ctx.m, EVENT_BASE_LOOPBREAK); + redis_notify_event_base_action(EVENT_BASE_LOOPBREAK); threads_join_all(1); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a7be8dbaa..60b5042bf 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1596,7 +1596,7 @@ out: ca = sfd->call ? : NULL; if (ca && update) { - redis_update_onekey(ca, ca->callmaster->conf.redis_write); + redis_update_onekey(ca, rtpe_redis_write); } done: log_info_clear(); diff --git a/daemon/redis.c b/daemon/redis.c index 5649e099e..5eeecb5e7 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -32,6 +32,14 @@ #include "ssrc.h" #include "main.h" +struct redis *rtpe_redis; +struct redis *rtpe_redis_write; +struct redis *rtpe_redis_notify; + +struct event_base *rtpe_redis_notify_event_base; +struct redisAsyncContext *rtpe_redis_notify_async_context; + + INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -71,7 +79,7 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define REDIS_FMT(x) (x)->len, (x)->str static int redis_check_conn(struct redis *r); -static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type); +static void json_restore_call(struct redis *r, const str *id, enum call_type type); static void redis_pipe(struct redis *r, const char *fmt, ...) { va_list ap; @@ -260,25 +268,17 @@ err: void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) { - - struct callmaster *cm = privdata; struct redis *r = 0; struct call *c = NULL; str callid; str keyspace_id; - // sanity checks - if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on on_redis_notification"); - return; - } - - if (!cm->conf.redis_notify) { + if (!rtpe_redis_notify) { rlog(LOG_ERROR, "A redis notification has been received but no redis_notify database found"); return; } - r = cm->conf.redis_notify; + r = rtpe_redis_notify; mutex_lock(&r->lock); @@ -323,7 +323,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) } if (strncmp(rr->element[3]->str,"set",3)==0) { - c = call_get(&callid, cm); + c = call_get(&callid); if (c) { rwlock_unlock_w(&c->master_lock); if (IS_FOREIGN_CALL(c)) @@ -333,11 +333,11 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; } } - json_restore_call(r, cm, &callid, CT_FOREIGN_CALL); + json_restore_call(r, &callid, CT_FOREIGN_CALL); } if (strncmp(rr->element[3]->str,"del",3)==0) { - c = call_get(&callid, cm); + c = call_get(&callid); if (!c) { rlog(LOG_NOTICE, "Redis-Notifier: DEL did not find call with callid: %s\n", rr->element[2]->str); goto err; @@ -376,36 +376,30 @@ void redis_async_context_disconnect(const redisAsyncContext *redis_notify_async_ } } -int redis_async_context_alloc(struct callmaster *cm) { +int redis_async_context_alloc() { struct redis *r = 0; - // sanity checks - if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on context free"); - return -1; - } - - if (!cm->conf.redis_notify) { + if (!rtpe_redis_notify) { rlog(LOG_INFO, "redis_notify database is NULL."); return -1; } // get redis_notify database - r = cm->conf.redis_notify; + r = rtpe_redis_notify; rlog(LOG_INFO, "Use Redis %s for notifications", endpoint_print_buf(&r->endpoint)); // alloc async context - cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); - if (!cm->conf.redis_notify_async_context) { + rtpe_redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); + if (!rtpe_redis_notify_async_context) { rlog(LOG_ERROR, "redis_notify_async_context can't create new"); return -1; } - if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", cm->conf.redis_notify_async_context->errstr); + if (rtpe_redis_notify_async_context->err) { + rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", rtpe_redis_notify_async_context->errstr); return -1; } - if (redisAsyncSetDisconnectCallback(cm->conf.redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) { + if (redisAsyncSetDisconnectCallback(rtpe_redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) { rlog(LOG_ERROR, "redis_notify_async_context can't set disconnect callback"); return -1; } @@ -413,14 +407,8 @@ int redis_async_context_alloc(struct callmaster *cm) { return 0; } -int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action action) { - // sanity checks - if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on event base action %d", action); - return -1; - } - - if (!cm->conf.redis_notify_event_base && action!=EVENT_BASE_ALLOC) { +int redis_notify_event_base_action(enum event_base_action action) { + if (!rtpe_redis_notify_event_base && action!=EVENT_BASE_ALLOC) { rlog(LOG_ERROR, "redis_notify_event_base is NULL on event base action %d", action); return -1; } @@ -428,8 +416,8 @@ int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action // exec event base action switch (action) { case EVENT_BASE_ALLOC: - cm->conf.redis_notify_event_base = event_base_new(); - if (!cm->conf.redis_notify_event_base) { + rtpe_redis_notify_event_base = event_base_new(); + if (!rtpe_redis_notify_event_base) { rlog(LOG_ERROR, "Fail alloc redis_notify_event_base"); return -1; } else { @@ -438,12 +426,12 @@ int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action break; case EVENT_BASE_FREE: - event_base_free(cm->conf.redis_notify_event_base); + event_base_free(rtpe_redis_notify_event_base); rlog(LOG_DEBUG, "Success free redis_notify_event_base"); break; case EVENT_BASE_LOOPBREAK: - if (event_base_loopbreak(cm->conf.redis_notify_event_base)) { + if (event_base_loopbreak(rtpe_redis_notify_event_base)) { rlog(LOG_ERROR, "Fail loopbreak redis_notify_event_base"); return -1; } else { @@ -459,38 +447,32 @@ int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action return 0; } -int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action action, int keyspace) { - // sanity checks - if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on subscribe action"); - return -1; - } - - if (!cm->conf.redis_notify_async_context) { +int redis_notify_subscribe_action(enum subscribe_action action, int keyspace) { + if (!rtpe_redis_notify_async_context) { rlog(LOG_ERROR, "redis_notify_async_context is NULL on subscribe action"); return -1; } - if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", cm->conf.redis_notify_async_context->errstr); + if (rtpe_redis_notify_async_context->err) { + rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", rtpe_redis_notify_async_context->errstr); return -1; } switch (action) { case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { + if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe") != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); return -1; } @@ -503,39 +485,33 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a return 0; } -static int redis_notify(struct callmaster *cm) { +static int redis_notify() { struct redis *r = 0; GList *l; - // sanity checks - if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on redis_notify()"); - return -1; - } - - if (!cm->conf.redis_notify) { + if (!rtpe_redis_notify) { rlog(LOG_ERROR, "redis_notify database is NULL on redis_notify()"); return -1; } - if (!cm->conf.redis_notify_async_context) { + if (!rtpe_redis_notify_async_context) { rlog(LOG_ERROR, "redis_notify_async_context is NULL on redis_notify()"); return -1; } - if (!cm->conf.redis_notify_event_base) { + if (!rtpe_redis_notify_event_base) { rlog(LOG_ERROR, "redis_notify_event_base is NULL on redis_notify()"); return -1; } // get redis_notify database - r = cm->conf.redis_notify; + r = rtpe_redis_notify; rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint)); // attach event base - if (redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base) == REDIS_ERR) { - if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", cm->conf.redis_notify_async_context->errstr); + if (redisLibeventAttach(rtpe_redis_notify_async_context, rtpe_redis_notify_event_base) == REDIS_ERR) { + if (rtpe_redis_notify_async_context->err) { + rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", rtpe_redis_notify_async_context->errstr); } else { rlog(LOG_ERROR, "redis_notify_async_context can't attach event base"); @@ -546,12 +522,12 @@ static int redis_notify(struct callmaster *cm) { // subscribe to the values in the configured keyspaces rwlock_lock_r(&rtpe_config.config_lock); for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { - redis_notify_subscribe_action(cm, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); + redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); } rwlock_unlock_r(&rtpe_config.config_lock); // dispatch event base => thread blocks here - if (event_base_dispatch(cm->conf.redis_notify_event_base) < 0) { + if (event_base_dispatch(rtpe_redis_notify_event_base) < 0) { rlog(LOG_ERROR, "Fail event_base_dispatch()"); return -1; } @@ -562,16 +538,9 @@ static int redis_notify(struct callmaster *cm) { void redis_notify_loop(void *d) { int seconds = 1, redis_notify_return = 0; time_t next_run = rtpe_now.tv_sec; - struct callmaster *cm = (struct callmaster *)d; struct redis *r; - // sanity checks - if (!cm) { - ilog(LOG_ERROR, "NULL callmaster"); - return ; - } - - r = cm->conf.redis_notify; + r = rtpe_redis_notify; if (!r) { rlog(LOG_ERROR, "Don't use Redis notifications. See --redis-notifications parameter."); return ; @@ -584,18 +553,18 @@ void redis_notify_loop(void *d) { } // alloc redis async context - if (redis_async_context_alloc(cm) < 0) { + if (redis_async_context_alloc() < 0) { return ; } // alloc event base - if (redis_notify_event_base_action(cm, EVENT_BASE_ALLOC) < 0) { + if (redis_notify_event_base_action(EVENT_BASE_ALLOC) < 0) { return ; } // initial redis_notify if (redis_check_conn(r) == REDIS_STATE_CONNECTED) { - redis_notify_return = redis_notify(cm); + redis_notify_return = redis_notify(); } // loop redis_notify => in case of lost connection @@ -610,23 +579,23 @@ void redis_notify_loop(void *d) { if (redis_check_conn(r) == REDIS_STATE_RECONNECTED || redis_notify_return < 0) { // alloc new redis async context upon redis breakdown - if (redis_async_context_alloc(cm) < 0) { + if (redis_async_context_alloc() < 0) { continue; } // prepare notifications - redis_notify_return = redis_notify(cm); + redis_notify_return = redis_notify(); } } // unsubscribe notifications - redis_notify_subscribe_action(cm, UNSUBSCRIBE_ALL, 0); + redis_notify_subscribe_action(UNSUBSCRIBE_ALL, 0); // free async context - redisAsyncDisconnect(cm->conf.redis_notify_async_context); + redisAsyncDisconnect(rtpe_redis_notify_async_context); // free event base - redis_notify_event_base_action(cm, EVENT_BASE_FREE); + redis_notify_event_base_action(EVENT_BASE_FREE); } struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) { @@ -1436,7 +1405,7 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) { return 0; } -static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { +static void json_restore_call(struct redis *r, const str *callid, enum call_type type) { redisReply* rr_jsonStr; struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; @@ -1462,7 +1431,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (!root_reader) goto err1; - c = call_get_or_create(callid, m, type); + c = call_get_or_create(callid, type); err = "failed to create call struct"; if (!c) goto err1; @@ -1582,9 +1551,9 @@ err1: if (c) call_destroy(c); else { - mutex_lock(&m->conf.redis_write->lock); - redisCommandNR(m->conf.redis_write->ctx, "DEL " PB, STR(callid)); - mutex_unlock(&m->conf.redis_write->lock); + mutex_lock(&rtpe_redis_write->lock); + redisCommandNR(rtpe_redis_write->ctx, "DEL " PB, STR(callid)); + mutex_unlock(&rtpe_redis_write->lock); } } if (c) @@ -1592,7 +1561,6 @@ err1: } struct thread_ctx { - struct callmaster *m; GQueue r_q; mutex_t r_m; }; @@ -1610,14 +1578,14 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); + json_restore_call(r, &callid, CT_OWN_CALL); mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); mutex_unlock(&ctx->r_m); } -int redis_restore(struct callmaster *m, struct redis *r) { +int redis_restore(struct redis *r) { redisReply *calls = NULL, *call; int i, ret = -1; GThreadPool *gtp; @@ -1647,7 +1615,6 @@ int redis_restore(struct callmaster *m, struct redis *r) { goto err; } - ctx.m = m; mutex_init(&ctx.r_m); g_queue_init(&ctx.r_q); for (i = 0; i < rtpe_config.redis_num_threads; i++) diff --git a/daemon/redis.h b/daemon/redis.h index 1932ea8d3..1b9f8fccd 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -43,7 +43,6 @@ enum subscribe_action { UNSUBSCRIBE_ALL, }; -struct callmaster; struct call; @@ -74,6 +73,15 @@ struct redis_list { }; +extern struct redis *rtpe_redis; +extern struct redis *rtpe_redis_write; +extern struct redis *rtpe_redis_notify; + +extern struct event_base *rtpe_redis_notify_event_base; +extern struct redisAsyncContext *rtpe_redis_notify_async_context; + + + #if !GLIB_CHECK_VERSION(2,40,0) INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) { gboolean ret = TRUE; @@ -93,13 +101,13 @@ void redis_notify_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); -int redis_restore(struct callmaster *, struct redis *); +int redis_restore(struct redis *); void redis_update(struct call *, struct redis *); void redis_update_onekey(struct call *c, struct redis *r); void redis_delete(struct call *, struct redis *); void redis_wipe(struct redis *); -int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action); -int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action action, int keyspace); +int redis_notify_event_base_action(enum event_base_action); +int redis_notify_subscribe_action(enum subscribe_action action, int keyspace);