Browse Source

TT#28300 add cleanup of poller related data

Change-Id: I64a38869ce3120d066fc818e3c76941a1c8186b7
changes/10/40810/4
Richard Fuchs 6 years ago
parent
commit
1589c29e28
14 changed files with 135 additions and 21 deletions
  1. +7
    -1
      daemon/cli.c
  2. +22
    -1
      daemon/control_ng.c
  3. +9
    -1
      daemon/control_tcp.c
  4. +11
    -1
      daemon/control_udp.c
  5. +7
    -0
      daemon/cookie_cache.c
  6. +19
    -12
      daemon/main.c
  7. +25
    -0
      daemon/poller.c
  8. +21
    -3
      daemon/tcp_listener.c
  9. +7
    -2
      daemon/udp_listener.c
  10. +1
    -0
      include/control_ng.h
  11. +1
    -0
      include/cookie_cache.h
  12. +3
    -0
      include/obj.h
  13. +1
    -0
      include/poller.h
  14. +1
    -0
      include/tcp_listener.h

+ 7
- 1
daemon/cli.c View File

@ -1082,13 +1082,19 @@ static void cli_stream_readable(struct streambuf_stream *s) {
log_info_clear();
}
static void cli_free(void *p) {
struct cli *c = p;
streambuf_listener_shutdown(&c->listeners[0]);
streambuf_listener_shutdown(&c->listeners[1]);
}
struct cli *cli_new(struct poller *p, endpoint_t *ep) {
struct cli *c;
if (!p)
return NULL;
c = obj_alloc0("cli", sizeof(*c), NULL);
c = obj_alloc0("cli", sizeof(*c), cli_free);
if (streambuf_listener_init(&c->listeners[0], p, ep,
cli_incoming, cli_stream_readable,


+ 22
- 1
daemon/control_ng.c View File

@ -359,6 +359,26 @@ out:
}
void control_ng_free(void *p) {
struct control_ng *c = p;
// XXX this should go elsewhere
if (rtpe_cngs_hash) {
GList *ll = g_hash_table_get_values(rtpe_cngs_hash);
for (GList *l = ll; l; l = l->next) {
struct control_ng_stats *s = l->data;
g_slice_free1(sizeof(*s), s);
}
g_list_free(ll);
g_hash_table_destroy(rtpe_cngs_hash);
rtpe_cngs_hash = NULL;
}
poller_del_item(c->poller, c->udp_listeners[0].fd);
poller_del_item(c->poller, c->udp_listeners[1].fd);
close_socket(&c->udp_listeners[0]);
close_socket(&c->udp_listeners[1]);
cookie_cache_cleanup(&c->cookie_cache);
}
struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) {
struct control_ng *c;
@ -366,11 +386,12 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha
if (!p)
return NULL;
c = obj_alloc0("control_ng", sizeof(*c), NULL);
c = obj_alloc0("control_ng", sizeof(*c), control_ng_free);
cookie_cache_init(&c->cookie_cache);
c->udp_listeners[0].fd = -1;
c->udp_listeners[1].fd = -1;
c->poller = p;
if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj))
goto fail2;


+ 9
- 1
daemon/control_tcp.c View File

@ -154,6 +154,14 @@ static void control_incoming(struct streambuf_stream *s) {
}
static void control_tcp_free(void *p) {
struct control_tcp *c = p;
streambuf_listener_shutdown(&c->listeners[0]);
streambuf_listener_shutdown(&c->listeners[1]);
pcre_free(c->parse_re);
pcre_free_study(c->parse_ree);
}
struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) {
struct control_tcp *c;
const char *errptr;
@ -162,7 +170,7 @@ struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) {
if (!p)
return NULL;
c = obj_alloc0("control", sizeof(*c), NULL);
c = obj_alloc0("control", sizeof(*c), control_tcp_free);
if (streambuf_listener_init(&c->listeners[0], p, ep,
control_incoming, control_stream_readable,


+ 11
- 1
daemon/control_udp.c View File

@ -134,6 +134,16 @@ out:
log_info_clear();
}
void control_udp_free(void *p) {
struct control_udp *u = p;
pcre_free_study(u->parse_ree);
pcre_free(u->parse_re);
pcre_free(u->fallback_re);
close_socket(&u->udp_listeners[0]);
close_socket(&u->udp_listeners[1]);
cookie_cache_cleanup(&u->cookie_cache);
}
struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep) {
struct control_udp *c;
const char *errptr;
@ -142,7 +152,7 @@ struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep) {
if (!p)
return NULL;
c = obj_alloc0("control_udp", sizeof(*c), NULL);
c = obj_alloc0("control_udp", sizeof(*c), control_udp_free);
c->parse_re = pcre_compile(
/* cookie cmd flags callid viabranch:5 */


+ 7
- 0
daemon/cookie_cache.c View File

@ -76,3 +76,10 @@ void cookie_cache_remove(struct cookie_cache *c, const str *s) {
cond_broadcast(&c->cond);
mutex_unlock(&c->lock);
}
void cookie_cache_cleanup(struct cookie_cache *c) {
g_hash_table_destroy(c->current.cookies);
g_hash_table_destroy(c->old.cookies);
g_string_chunk_free(c->current.chunks);
g_string_chunk_free(c->old.chunks);
}

+ 19
- 12
daemon/main.c View File

@ -51,6 +51,10 @@
struct poller *rtpe_poller;
struct rtpengine_config initial_rtpe_config;
static struct control_tcp *rtpe_tcp;
static struct control_udp *rtpe_udp;
static struct cli *rtpe_cli;
struct rtpengine_config rtpe_config = {
// non-zero defaults
.kernel_table = -1,
@ -745,9 +749,6 @@ static void init_everything(void) {
static void create_everything(void) {
struct control_tcp *ct;
struct control_udp *cu;
struct cli *cl;
struct timeval tmp_tv;
struct timeval redis_start, redis_stop;
double redis_diff = 0;
@ -786,18 +787,18 @@ no_kernel:
}
}
ct = NULL;
rtpe_tcp = NULL;
if (rtpe_config.tcp_listen_ep.port) {
ct = control_tcp_new(rtpe_poller, &rtpe_config.tcp_listen_ep);
if (!ct)
rtpe_tcp = control_tcp_new(rtpe_poller, &rtpe_config.tcp_listen_ep);
if (!rtpe_tcp)
die("Failed to open TCP control connection port");
}
cu = NULL;
rtpe_udp = NULL;
if (rtpe_config.udp_listen_ep.port) {
interfaces_exclude_port(rtpe_config.udp_listen_ep.port);
cu = control_udp_new(rtpe_poller, &rtpe_config.udp_listen_ep);
if (!cu)
rtpe_udp = control_udp_new(rtpe_poller, &rtpe_config.udp_listen_ep);
if (!rtpe_udp)
die("Failed to open UDP control connection port");
}
@ -809,11 +810,11 @@ no_kernel:
die("Failed to open UDP control connection port");
}
cl = NULL;
rtpe_cli = NULL;
if (rtpe_config.cli_listen_ep.port) {
interfaces_exclude_port(rtpe_config.cli_listen_ep.port);
cl = cli_new(rtpe_poller, &rtpe_config.cli_listen_ep);
if (!cl)
rtpe_cli = cli_new(rtpe_poller, &rtpe_config.cli_listen_ep);
if (!rtpe_cli)
die("Failed to open UDP CLI connection port");
}
@ -947,5 +948,11 @@ int main(int argc, char **argv) {
call_free();
interfaces_free();
obj_release(rtpe_cli);
obj_release(rtpe_udp);
obj_release(rtpe_tcp);
obj_release(rtpe_control_ng);
poller_free(&rtpe_poller);
return 0;
}

+ 25
- 0
daemon/poller.c View File

@ -66,6 +66,31 @@ struct poller *poller_new(void) {
return p;
}
static void __ti_put(void *p) {
struct timer_item *ti = p;
obj_put(ti);
}
void poller_free(struct poller **pp) {
struct poller *p = *pp;
for (unsigned int i = 0; i < p->items_size; i++) {
struct poller_item_int *ip = p->items[i];
if (!ip)
continue;
p->items[i] = NULL;
obj_put(ip);
}
g_slist_free_full(p->timers, __ti_put);
g_slist_free_full(p->timers_add, __ti_put);
g_slist_free_full(p->timers_del, __ti_put);
if (p->fd != -1)
close(p->fd);
p->fd = -1;
if (p->items)
free(p->items);
free(p);
*pp = NULL;
}
static int epoll_events(struct poller_item *it, struct poller_item_int *ii) {
if (!it)


+ 21
- 3
daemon/tcp_listener.c View File

@ -55,13 +55,18 @@ static void tcp_listener_closed(int fd, void *p, uintptr_t u) {
abort();
}
static void __tlc_free(void *p) {
struct tcp_listener_callback *cb = p;
obj_put_o(cb->p);
}
int tcp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep,
tcp_listener_callback_t func, struct obj *obj)
{
struct poller_item i;
struct tcp_listener_callback *cb;
cb = obj_alloc("tcp_listener_callback", sizeof(*cb), NULL);
cb = obj_alloc("tcp_listener_callback", sizeof(*cb), __tlc_free);
cb->func = func;
cb->p = obj_get_o(obj);
cb->ul = sock;
@ -79,11 +84,11 @@ int tcp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep,
if (poller_add_item(p, &i))
goto fail;
obj_put(cb);
return 0;
fail:
close_socket(sock);
obj_put_o(obj);
obj_put(cb);
return -1;
}
@ -199,6 +204,11 @@ fail:
obj_put(s);
}
static void __sb_free(void *p) {
struct streambuf_callback *cb = p;
obj_put_o(cb->parent);
}
int streambuf_listener_init(struct streambuf_listener *listener, struct poller *p, const endpoint_t *ep,
streambuf_callback_t newconn_func,
streambuf_callback_t newdata_func,
@ -214,7 +224,7 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller *
mutex_init(&listener->lock);
listener->streams = g_hash_table_new(g_direct_hash, g_direct_equal);
cb = obj_alloc("streambuf_callback", sizeof(*cb), NULL);
cb = obj_alloc("streambuf_callback", sizeof(*cb), __sb_free);
cb->newconn_func = newconn_func;
cb->newdata_func = newdata_func;
cb->closed_func = closed_func;
@ -225,12 +235,20 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller *
if (tcp_listener_init(&listener->listener, p, ep, streambuf_listener_newconn, &cb->obj))
goto fail;
obj_put(cb);
return 0;
fail:
obj_put(cb);
return -1;
}
void streambuf_listener_shutdown(struct streambuf_listener *listener) {
if (!listener)
return;
poller_del_item(listener->poller, listener->listener.fd);
close_socket(&listener->listener);
g_hash_table_destroy(listener->streams);
}
void streambuf_stream_close(struct streambuf_stream *s) {
streambuf_stream_closed(s->sock.fd, s, 0);


+ 7
- 2
daemon/udp_listener.c View File

@ -55,13 +55,18 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) {
}
}
static void __ulc_free(void *p) {
struct udp_listener_callback *cb = p;
obj_put_o(cb->p);
}
int udp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep,
udp_listener_callback_t func, struct obj *obj)
{
struct poller_item i;
struct udp_listener_callback *cb;
cb = obj_alloc("udp_listener_callback", sizeof(*cb), NULL);
cb = obj_alloc("udp_listener_callback", sizeof(*cb), __ulc_free);
cb->func = func;
cb->p = obj_get_o(obj);
cb->ul = sock;
@ -77,11 +82,11 @@ int udp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep,
if (poller_add_item(p, &i))
goto fail;
obj_put(cb);
return 0;
fail:
close_socket(sock);
obj_put_o(obj);
obj_put(cb);
return -1;
}

+ 1
- 0
include/control_ng.h View File

@ -36,6 +36,7 @@ struct control_ng {
struct obj obj;
struct cookie_cache cookie_cache;
socket_t udp_listeners[2];
struct poller *poller;
};
struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char);


+ 1
- 0
include/cookie_cache.h View File

@ -22,5 +22,6 @@ void cookie_cache_init(struct cookie_cache *);
str *cookie_cache_lookup(struct cookie_cache *, const str *);
void cookie_cache_insert(struct cookie_cache *, const str *, const str *);
void cookie_cache_remove(struct cookie_cache *, const str *);
void cookie_cache_cleanup(struct cookie_cache *);
#endif

+ 3
- 0
include/obj.h View File

@ -86,6 +86,9 @@ INLINE void __obj_put(struct obj *o);
#endif
#define obj_release(op) do { if (op) obj_put_o((struct obj *) op); op = NULL; } while (0)
#include "log.h"


+ 1
- 0
include/poller.h View File

@ -31,6 +31,7 @@ struct poller;
struct poller *poller_new(void);
void poller_free(struct poller **);
int poller_add_item(struct poller *, struct poller_item *);
int poller_update_item(struct poller *, struct poller_item *);
int poller_del_item(struct poller *, int);


+ 1
- 0
include/tcp_listener.h View File

@ -40,6 +40,7 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller *
streambuf_callback_t closed_func,
streambuf_callback_t timer_func,
struct obj *obj);
void streambuf_listener_shutdown(struct streambuf_listener *);
void streambuf_stream_close(struct streambuf_stream *);
void streambuf_stream_shutdown(struct streambuf_stream *);


Loading…
Cancel
Save