Browse Source

MT#56374 eliminate poller_timer functionality

All timers have been moved to their dedicated timer threads, making this
mechanism obsolete.

The only victim is the timeout handling for TCP control streams. Since
other TCP streams aren't using timeout handling either, and the TCP
control socket is barely used by anyone, we can live with not having a
dedicated timeout for these streams for now.

Change-Id: I83d9b9a844f4f494ad37b44f5d1312f272beff3f
pull/1694/head
Richard Fuchs 2 years ago
parent
commit
f9f9348d4f
8 changed files with 0 additions and 214 deletions
  1. +0
    -1
      daemon/cli.c
  2. +0
    -1
      daemon/control_ng.c
  3. +0
    -7
      daemon/control_tcp.c
  4. +0
    -6
      daemon/main.c
  5. +0
    -182
      daemon/poller.c
  6. +0
    -11
      daemon/tcp_listener.c
  7. +0
    -5
      include/poller.h
  8. +0
    -1
      include/tcp_listener.h

+ 0
- 1
daemon/cli.c View File

@ -1266,7 +1266,6 @@ struct cli *cli_new(struct poller *p, endpoint_t *ep) {
if (streambuf_listener_init(&c->listener, p, ep, if (streambuf_listener_init(&c->listener, p, ep,
cli_incoming, cli_stream_readable, cli_incoming, cli_stream_readable,
NULL, NULL,
NULL,
&c->obj)) &c->obj))
{ {
ilogs(control, LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); ilogs(control, LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));


+ 0
- 1
daemon/control_ng.c View File

@ -570,7 +570,6 @@ struct control_ng *control_ng_tcp_new(struct poller *p, endpoint_t *ep) {
if (streambuf_listener_init(&ctrl_ng->tcp_listener, p, ep, if (streambuf_listener_init(&ctrl_ng->tcp_listener, p, ep,
control_incoming, control_stream_readable, control_incoming, control_stream_readable,
control_closed, control_closed,
NULL,
&ctrl_ng->obj)) { &ctrl_ng->obj)) {
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
goto fail; goto fail;


+ 0
- 7
daemon/control_tcp.c View File

@ -113,12 +113,6 @@ static int control_stream_parse(struct streambuf_stream *s, char *line) {
} }
static void control_stream_timer(struct streambuf_stream *s) {
if ((rtpe_now.tv_sec - s->inbuf->active) >= 60 || (rtpe_now.tv_sec - s->outbuf->active) >= 60)
control_stream_closed(s);
}
//static void control_stream_readable(int fd, void *p, uintptr_t u) { //static void control_stream_readable(int fd, void *p, uintptr_t u) {
static void control_stream_readable(struct streambuf_stream *s) { static void control_stream_readable(struct streambuf_stream *s) {
char *line; char *line;
@ -172,7 +166,6 @@ struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) {
if (streambuf_listener_init(&c->listener, p, ep, if (streambuf_listener_init(&c->listener, p, ep,
control_incoming, control_stream_readable, control_incoming, control_stream_readable,
control_stream_closed, control_stream_closed,
control_stream_timer,
&c->obj)) &c->obj))
{ {
ilogs(control, LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); ilogs(control, LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));


+ 0
- 6
daemon/main.c View File

@ -1340,12 +1340,6 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL, "signal handler"); thread_create_detach(sighandler, NULL, "signal handler");
/* a single thread which is running a poller_timer_loop,
* it calls each second a loop and if it finds something it does some work.
*/
thread_create_detach_prio(poller_timer_loop, rtpe_poller, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "poller timer");
/* load monitoring thread */ /* load monitoring thread */
thread_create_looper(load_thread, rtpe_config.idle_scheduling, thread_create_looper(load_thread, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "load monitor", 500000); rtpe_config.idle_priority, "load monitor", 500000);


+ 0
- 182
daemon/poller.c View File

@ -12,24 +12,14 @@
#include <sys/epoll.h> #include <sys/epoll.h>
#include <glib.h> #include <glib.h>
#include <sys/time.h> #include <sys/time.h>
#include <main.h>
#include <redis.h>
#include <hiredis/adapters/libevent.h>
#include "helpers.h"
#include "obj.h" #include "obj.h"
#include "log_funcs.h" #include "log_funcs.h"
struct timer_item {
struct obj obj;
void (*func)(void *);
struct obj *obj_ptr;
};
struct poller_item_int { struct poller_item_int {
struct obj obj; struct obj obj;
struct poller_item item; struct poller_item item;
@ -43,12 +33,6 @@ struct poller {
mutex_t lock; mutex_t lock;
struct poller_item_int **items; struct poller_item_int **items;
unsigned int items_size; unsigned int items_size;
mutex_t timers_lock;
GSList *timers;
mutex_t timers_add_del_lock; /* nested below timers_lock */
GSList *timers_add;
GSList *timers_del;
}; };
struct poller_map { struct poller_map {
@ -125,16 +109,10 @@ struct poller *poller_new(void) {
if (p->fd == -1) if (p->fd == -1)
abort(); abort();
mutex_init(&p->lock); mutex_init(&p->lock);
mutex_init(&p->timers_lock);
mutex_init(&p->timers_add_del_lock);
return p; return p;
} }
static void __ti_put(void *p) {
struct timer_item *ti = p;
obj_put(ti);
}
void poller_free(struct poller **pp) { void poller_free(struct poller **pp) {
struct poller *p = *pp; struct poller *p = *pp;
for (unsigned int i = 0; i < p->items_size; i++) { for (unsigned int i = 0; i < p->items_size; i++) {
@ -144,9 +122,6 @@ void poller_free(struct poller **pp) {
p->items[i] = NULL; p->items[i] = NULL;
obj_put(ip); obj_put(ip);
} }
g_slist_free_full(p->timers, __ti_put);
g_slist_free_full(p->timers_add, __ti_put);
g_slist_free_full(p->timers_del, __ti_put);
if (p->fd != -1) if (p->fd != -1)
close(p->fd); close(p->fd);
p->fd = -1; p->fd = -1;
@ -166,14 +141,6 @@ static int epoll_events(struct poller_item *it, struct poller_item_int *ii) {
} }
static void poller_fd_timer(void *p) {
struct poller_item_int *it = p;
if (it->item.timer)
it->item.timer(it->item.fd, it->item.obj, it->item.uintp);
}
static void poller_item_free(void *p) { static void poller_item_free(void *p) {
struct poller_item_int *i = p; struct poller_item_int *i = p;
obj_put_o(i->item.obj); obj_put_o(i->item.obj);
@ -223,9 +190,6 @@ static int __poller_add_item(struct poller *p, struct poller_item *i, int has_lo
mutex_unlock(&p->lock); mutex_unlock(&p->lock);
if (i->timer)
poller_add_timer(p, poller_fd_timer, &ip->obj);
obj_put(ip); obj_put(ip);
return 0; return 0;
@ -265,9 +229,6 @@ int poller_del_item(struct poller *p, int fd) {
mutex_unlock(&p->lock); mutex_unlock(&p->lock);
if (it->item.timer)
poller_del_timer(p, poller_fd_timer, &it->obj);
obj_put(it); obj_put(it);
return 0; return 0;
@ -302,7 +263,6 @@ int poller_update_item(struct poller *p, struct poller_item *i) {
np->item.readable = i->readable; np->item.readable = i->readable;
np->item.writeable = i->writeable; np->item.writeable = i->writeable;
np->item.closed = i->closed; np->item.closed = i->closed;
/* updating timer is not supported */
mutex_unlock(&p->lock); mutex_unlock(&p->lock);
@ -310,72 +270,6 @@ int poller_update_item(struct poller *p, struct poller_item *i) {
} }
/* timers_lock and timers_add_del_lock must be held */
static void poller_timers_mod(struct poller *p) {
GSList *l, **ll, **kk;
struct timer_item *ti, *tj;
ll = &p->timers_add;
while (*ll) {
l = *ll;
*ll = l->next;
l->next = p->timers;
p->timers = l;
}
ll = &p->timers_del;
while (*ll) {
ti = (*ll)->data;
kk = &p->timers;
while (*kk) {
tj = (*kk)->data;
if (tj->func != ti->func)
goto next;
if (tj->obj_ptr != ti->obj_ptr)
goto next;
goto found;
next:
kk = &(*kk)->next;
}
/* deleted a timer that wasn't added yet. possible race, otherwise bug */
ll = &(*ll)->next;
continue;
found:
l = *ll;
*ll = (*ll)->next;
obj_put_o(l->data);
g_slist_free_1(l);
l = *kk;
*kk = (*kk)->next;
obj_put_o(l->data);
g_slist_free_1(l);
}
}
static void poller_timers_run(struct poller *p) {
GSList *l;
struct timer_item *ti;
mutex_lock(&p->timers_lock);
mutex_lock(&p->timers_add_del_lock);
poller_timers_mod(p);
mutex_unlock(&p->timers_add_del_lock);
for (l = p->timers; l; l = l->next) {
ti = l->data;
ti->func(ti->obj_ptr);
log_info_reset();
}
mutex_lock(&p->timers_add_del_lock);
poller_timers_mod(p);
mutex_unlock(&p->timers_add_del_lock);
mutex_unlock(&p->timers_lock);
}
int poller_poll(struct poller *p, int timeout) { int poller_poll(struct poller *p, int timeout) {
int ret, i; int ret, i;
struct poller_item_int *it; struct poller_item_int *it;
@ -528,82 +422,6 @@ out:
return ret; return ret;
} }
static void timer_item_free(void *p) {
struct timer_item *i = p;
if (i->obj_ptr)
obj_put_o(i->obj_ptr);
}
static int poller_timer_link(struct poller *p, GSList **lp, void (*f)(void *), struct obj *o) {
struct timer_item *i;
if (!f)
return -1;
i = obj_alloc0("timer_item", sizeof(*i), timer_item_free);
i->func = f;
i->obj_ptr = o ? obj_hold_o(o) : NULL;
mutex_lock(&p->timers_add_del_lock);
*lp = g_slist_prepend(*lp, i);
// coverity[lock_order : FALSE]
if (!mutex_trylock(&p->timers_lock)) {
poller_timers_mod(p);
mutex_unlock(&p->timers_lock);
}
mutex_unlock(&p->timers_add_del_lock);
return 0;
}
int poller_del_timer(struct poller *p, void (*f)(void *), struct obj *o) {
return poller_timer_link(p, &p->timers_del, f, o);
}
int poller_add_timer(struct poller *p, void (*f)(void *), struct obj *o) {
return poller_timer_link(p, &p->timers_add, f, o);
}
/* run in thread separate from poller_poll() */
void poller_timer_loop(void *d) {
struct poller *p = d;
while (!rtpe_shutdown) {
// run once a second on top of each second
struct timeval now;
gettimeofday(&now, NULL);
struct timeval next = { rtpe_now.tv_sec + 1, 0 };
if (now.tv_sec >= next.tv_sec)
goto now;
long long sleeptime = timeval_diff(&next, &now);
if (sleeptime <= 0)
goto now;
thread_cancel_enable();
usleep(sleeptime);
thread_cancel_disable();
continue;
now:
gettimeofday(&rtpe_now, NULL);
if (rtpe_redis_write && rtpe_redis_write->async_ev &&
(rtpe_redis_write->async_last + rtpe_config.redis_delete_async_interval
<= rtpe_now.tv_sec))
{
redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_LOOPBREAK);
rtpe_redis_write->async_last = rtpe_now.tv_sec;
}
poller_timers_run(p);
}
}
void poller_loop(void *d) { void poller_loop(void *d) {
struct poller_map *map = d; struct poller_map *map = d;
poller_map_add(map); poller_map_add(map);


+ 0
- 11
daemon/tcp_listener.c View File

@ -22,7 +22,6 @@ struct streambuf_callback {
streambuf_callback_t newconn_func; streambuf_callback_t newconn_func;
streambuf_callback_t newdata_func; streambuf_callback_t newdata_func;
streambuf_callback_t closed_func; streambuf_callback_t closed_func;
streambuf_callback_t timer_func;
struct streambuf_listener *listener; struct streambuf_listener *listener;
struct obj *parent; struct obj *parent;
}; };
@ -125,13 +124,6 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) {
obj_put(s); obj_put(s);
} }
static void streambuf_stream_timer(int fd, void *p, uintptr_t u) {
struct streambuf_stream *s = p;
if (s->cb->timer_func)
s->cb->timer_func(s);
}
static void streambuf_stream_readable(int fd, void *p, uintptr_t u) { static void streambuf_stream_readable(int fd, void *p, uintptr_t u) {
struct streambuf_stream *s = p; struct streambuf_stream *s = p;
@ -180,7 +172,6 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a
i.closed = streambuf_stream_closed; i.closed = streambuf_stream_closed;
i.readable = streambuf_stream_readable; i.readable = streambuf_stream_readable;
i.writeable = streambuf_stream_writeable; i.writeable = streambuf_stream_writeable;
i.timer = streambuf_stream_timer;
i.obj = &s->obj; i.obj = &s->obj;
if (cb->newconn_func) if (cb->newconn_func)
@ -219,7 +210,6 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller *
streambuf_callback_t newconn_func, streambuf_callback_t newconn_func,
streambuf_callback_t newdata_func, streambuf_callback_t newdata_func,
streambuf_callback_t closed_func, streambuf_callback_t closed_func,
streambuf_callback_t timer_func,
struct obj *obj) struct obj *obj)
{ {
struct streambuf_callback *cb; struct streambuf_callback *cb;
@ -234,7 +224,6 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller *
cb->newconn_func = newconn_func; cb->newconn_func = newconn_func;
cb->newdata_func = newdata_func; cb->newdata_func = newdata_func;
cb->closed_func = closed_func; cb->closed_func = closed_func;
cb->timer_func = timer_func;
cb->parent = obj_get_o(obj); cb->parent = obj_get_o(obj);
cb->listener = listener; cb->listener = listener;


+ 0
- 5
include/poller.h View File

@ -24,7 +24,6 @@ struct poller_item {
poller_func_t readable; poller_func_t readable;
poller_func_t writeable; poller_func_t writeable;
poller_func_t closed; poller_func_t closed;
poller_func_t timer;
}; };
struct poller; struct poller;
@ -44,12 +43,8 @@ int poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *); void poller_error(struct poller *, void *);
int poller_poll(struct poller *, int); int poller_poll(struct poller *, int);
void poller_timer_loop(void *);
void poller_loop(void *); void poller_loop(void *);
void poller_loop2(void *); void poller_loop2(void *);
int poller_add_timer(struct poller *, void (*)(void *), struct obj *);
int poller_del_timer(struct poller *, void (*)(void *), struct obj *);
#endif #endif

+ 0
- 1
include/tcp_listener.h View File

@ -38,7 +38,6 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller *
streambuf_callback_t newconn_func, streambuf_callback_t newconn_func,
streambuf_callback_t newdata_func, streambuf_callback_t newdata_func,
streambuf_callback_t closed_func, streambuf_callback_t closed_func,
streambuf_callback_t timer_func,
struct obj *obj); struct obj *obj);
void streambuf_listener_shutdown(struct streambuf_listener *); void streambuf_listener_shutdown(struct streambuf_listener *);


Loading…
Cancel
Save