diff --git a/daemon/main.c b/daemon/main.c index ff254134b..ca381467a 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1320,6 +1320,7 @@ static void create_everything(void) { rtpe_poller_del_item = uring_poller_del_item; rtpe_poller_del_item_callback = uring_poller_del_item_callback; rtpe_poller_blocked = uring_poller_blocked; + rtpe_poller_isblocked = uring_poller_isblocked; rtpe_poller_error = uring_poller_error; } #endif diff --git a/lib/poller.c b/lib/poller.c index ee58e4c50..064a532ea 100644 --- a/lib/poller.c +++ b/lib/poller.c @@ -39,6 +39,7 @@ bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; +bool (*rtpe_poller_isblocked)(struct poller *, void *) = poller_isblocked; void (*rtpe_poller_error)(struct poller *, void *) = poller_error; @@ -307,6 +308,30 @@ __thread unsigned int (*uring_thread_loop)(void) = __uring_thread_loop_dummy; #endif +bool poller_isblocked(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); + int ret; + + if (!p || fd < 0) + return false; + + LOCK(&p->lock); + + ret = -1; + if (fd >= p->items->len) + goto out; + struct poller_item_int *it; + if (!(it = p->items->pdata[fd])) + goto out; + if (!it->item.writeable) + goto out; + + ret = !!it->blocked; + +out: + return ret; +} + void poller_loop(void *d) { struct poller *p = d; int poller_size = rtpe_common_config_ptr->poller_size; diff --git a/lib/poller.h b/lib/poller.h index b5f4c28b7..d53b614e8 100644 --- a/lib/poller.h +++ b/lib/poller.h @@ -37,6 +37,7 @@ bool poller_del_item(struct poller *, int); bool poller_del_item_callback(struct poller *, int, void (*)(void *), void *); void poller_blocked(struct poller *, void *); +bool poller_isblocked(struct poller *, void *); void poller_error(struct poller *, void *); void poller_loop(void *); @@ -45,6 +46,7 @@ extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *); extern bool (*rtpe_poller_del_item)(struct poller *, int); extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *); extern void (*rtpe_poller_blocked)(struct poller *, void *); +extern bool (*rtpe_poller_isblocked)(struct poller *, void *); extern void (*rtpe_poller_error)(struct poller *, void *); diff --git a/lib/streambuf.c b/lib/streambuf.c index 61a092583..4b6c0bec6 100644 --- a/lib/streambuf.c +++ b/lib/streambuf.c @@ -209,7 +209,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { mutex_lock(&b->lock); - while (len) { + while (len && !rtpe_poller_isblocked(b->poller, b->fd_ptr)) { out = (len > 1024) ? 1024 : len; ret = b->funcs->write(b->fd_ptr, s, out); diff --git a/lib/uring.c b/lib/uring.c index 068f3f603..61056d7ed 100644 --- a/lib/uring.c +++ b/lib/uring.c @@ -29,6 +29,7 @@ struct poller { GPtrArray *evs; // holds uring_poll_event by fd struct bufferpool *bufferpool; struct uring_buffer *buffers[BUFFER_POOLS]; + GArray *blocked; }; struct poller_req { @@ -141,6 +142,7 @@ struct poller *uring_poller_new(void) { nonblock(ret->waker_fds[0]); nonblock(ret->waker_fds[1]); ret->evs = g_ptr_array_new(); + ret->blocked = g_array_new(false, true, sizeof(char)); ret->bufferpool = bufferpool_new(g_malloc, g_free, BUFFER_SIZE * BUFFERS_COUNT); for (int i = 0; i < BUFFER_POOLS; i++) { @@ -161,6 +163,7 @@ void uring_poller_free(struct poller **pp) { close((*pp)->waker_fds[0]); close((*pp)->waker_fds[1]); g_ptr_array_free((*pp)->evs, true); + g_array_free((*pp)->blocked, true); for (int i = 0; i < BUFFER_POOLS; i++) { bufferpool_release((*pp)->buffers[i]->buf); g_free((*pp)->buffers[i]); @@ -211,9 +214,24 @@ void uring_poller_blocked(struct poller *p, void *fdp) { LOCK(&p->lock); + if (p->blocked->len <= req->fd) + g_array_set_size(p->blocked, req->fd + 1); + g_array_index(p->blocked, char, req->fd) = 1; + g_queue_push_tail(&p->reqs, req); uring_poller_wake(p); } +bool uring_poller_isblocked(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); + if (fd < 0) + return false; + + LOCK(&p->lock); + + if (p->blocked->len <= fd) + return false; + return !!g_array_index(p->blocked, char, fd); +} void uring_poller_error(struct poller *p, void *fdp) { struct poller_req *req = g_new0(__typeof(*req), 1); req->type = ERROR; @@ -314,6 +332,7 @@ static void uring_poll_removed(struct uring_req *req, int32_t res, uint32_t flag struct uring_poll_unblocked { struct uring_req req; // must be first struct poller_item it; + struct poller *poller; }; static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t flags) { struct uring_poll_unblocked *ureq = (__typeof(ureq)) req; @@ -328,8 +347,12 @@ static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t fl ureq->it.fd, res); closed = true; } - else + else { + struct poller *p = ureq->poller; + if (p->blocked->len > ureq->it.fd) + g_array_index(p->blocked, char, ureq->it.fd) = 0; ureq->it.writeable(ureq->it.fd, ureq->it.obj); + } assert((flags & IORING_CQE_F_MORE) == 0); @@ -432,6 +455,7 @@ static void uring_poller_do_blocked(struct poller *p, struct poller_req *preq) { struct uring_poll_unblocked *ureq = uring_alloc_req(sizeof(*ureq), uring_poll_unblocked); ureq->it = ereq->it; + ureq->poller = p; if (ureq->it.obj) obj_hold_o(ureq->it.obj); struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); diff --git a/lib/uring.h b/lib/uring.h index 43e8cfa01..9e59c9b5b 100644 --- a/lib/uring.h +++ b/lib/uring.h @@ -49,6 +49,7 @@ void uring_poller_clear(struct poller *); bool uring_poller_add_item(struct poller *p, struct poller_item *i); bool uring_poller_del_item(struct poller *p, int fd); void uring_poller_blocked(struct poller *p, void *fdp); +bool uring_poller_isblocked(struct poller *p, void *fdp); void uring_poller_error(struct poller *p, void *fdp); bool uring_poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg); diff --git a/recording-daemon/poller.c b/recording-daemon/poller.c index 48d75e3b7..32331205b 100644 --- a/recording-daemon/poller.c +++ b/recording-daemon/poller.c @@ -3,7 +3,7 @@ void poller_blocked(struct poller *p, void *fdp) { p->state = PS_WRITE_BLOCKED; } -int poller_isblocked(struct poller *p, void *fdp) { +bool poller_isblocked(struct poller *p, void *fdp) { return p->state != PS_OPEN; } void poller_error(struct poller *p, void *fdp) { diff --git a/recording-daemon/poller.h b/recording-daemon/poller.h index c4d5f2c29..ede317a44 100644 --- a/recording-daemon/poller.h +++ b/recording-daemon/poller.h @@ -1,6 +1,7 @@ #ifndef __POLLER_H__ #define __POLLER_H__ +#include // dummy poller struct poller { @@ -16,7 +17,9 @@ struct poller { void poller_blocked(struct poller *, void *); void poller_error(struct poller *, void *); +bool poller_isblocked(struct poller *, void *); +#define rtpe_poller_isblocked poller_isblocked #define rtpe_poller_blocked poller_blocked #define rtpe_poller_error poller_error