|
|
|
@ -57,11 +57,18 @@ static ssize_t __socket_sendmsg(socket_t *s, struct msghdr *m, const endpoint_t |
|
|
|
static unsigned int __dummy_thread_loop(void) { |
|
|
|
return 0; |
|
|
|
} |
|
|
|
static void *__dummy_alloc(void *stack_storage, size_t len) { |
|
|
|
return stack_storage; |
|
|
|
} |
|
|
|
static void __dummy_free(struct uring_req *dummy) { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
__thread struct uring_methods uring_methods = { |
|
|
|
.sendmsg = __socket_sendmsg, |
|
|
|
.thread_loop = __dummy_thread_loop, |
|
|
|
.free = __dummy_free, |
|
|
|
.__alloc_req = __dummy_alloc, |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
@ -70,6 +77,13 @@ __thread struct uring_methods uring_methods = { |
|
|
|
#include <liburing.h> |
|
|
|
|
|
|
|
|
|
|
|
#define uring_alloc_req(T, fn) ({ \ |
|
|
|
T *__ret = g_new0(T, 1); \ |
|
|
|
__ret->req.handler = (fn); \ |
|
|
|
__ret; \ |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
struct uring_buffer_req { |
|
|
|
struct uring_req req; |
|
|
|
}; |
|
|
|
@ -107,6 +121,14 @@ static unsigned int __uring_thread_loop(void) { |
|
|
|
return num; |
|
|
|
} |
|
|
|
|
|
|
|
static void *__uring_alloc(void *dummy, size_t len) { |
|
|
|
return g_malloc(len); |
|
|
|
} |
|
|
|
|
|
|
|
static void __uring_free(struct uring_req *r) { |
|
|
|
g_free(r); |
|
|
|
} |
|
|
|
|
|
|
|
void uring_thread_init(void) { |
|
|
|
struct io_uring_params params = {0}; |
|
|
|
int ret = io_uring_queue_init_params(rtpe_common_config_ptr->io_uring_buffers, &rtpe_uring, ¶ms); |
|
|
|
@ -115,6 +137,8 @@ void uring_thread_init(void) { |
|
|
|
|
|
|
|
uring_methods.sendmsg = __uring_sendmsg; |
|
|
|
uring_methods.thread_loop = __uring_thread_loop; |
|
|
|
uring_methods.__alloc_req = __uring_alloc; |
|
|
|
uring_methods.free = __uring_free; |
|
|
|
} |
|
|
|
|
|
|
|
void uring_thread_cleanup(void) { |
|
|
|
@ -319,7 +343,7 @@ static void uring_poll_event(struct uring_req *req, int32_t res, uint32_t flags) |
|
|
|
if (p->evs->len > ereq->it.fd && p->evs->pdata[ereq->it.fd] == ereq) |
|
|
|
p->evs->pdata[ereq->it.fd] = NULL; |
|
|
|
} |
|
|
|
uring_req_free(&ereq->req); |
|
|
|
g_free(&ereq->req); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -337,7 +361,7 @@ static void uring_poll_removed(struct uring_req *req, int32_t res, uint32_t flag |
|
|
|
rreq->callback(rreq->arg); |
|
|
|
else |
|
|
|
close(rreq->fd); |
|
|
|
uring_req_free(req); |
|
|
|
g_free(req); |
|
|
|
} |
|
|
|
|
|
|
|
struct uring_poll_unblocked { |
|
|
|
@ -372,7 +396,7 @@ static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t fl |
|
|
|
|
|
|
|
if (ureq->it.obj) |
|
|
|
obj_put_o(ureq->it.obj); |
|
|
|
uring_req_free(req); |
|
|
|
g_free(req); |
|
|
|
} |
|
|
|
|
|
|
|
struct uring_poll_recv { |
|
|
|
@ -438,7 +462,7 @@ static void uring_poll_recv(struct uring_req *req, int32_t res, uint32_t flags) |
|
|
|
//ilog(LOG_INFO, "last uring recv event for fd %i for %p (%i)", rreq->it.fd, rreq->it.obj, rreq->it.obj->ref); |
|
|
|
if (rreq->it.obj) |
|
|
|
obj_put_o(rreq->it.obj); |
|
|
|
uring_req_free(&rreq->req); |
|
|
|
g_free(&rreq->req); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -493,7 +517,7 @@ static void uring_poller_do_buffers(struct poller *p, struct poller_req *preq) { |
|
|
|
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); |
|
|
|
io_uring_prep_provide_buffers(sqe, preq->buf, BUFFER_SIZE, BUFFERS_COUNT, 0, |
|
|
|
preq->num * BUFFERS_COUNT); |
|
|
|
struct uring_buffer_req *breq = uring_alloc_buffer_req(struct uring_buffer_req); |
|
|
|
struct uring_buffer_req *breq = uring_alloc_req(struct uring_buffer_req, uring_req_free); |
|
|
|
io_uring_sqe_set_data(sqe, breq); // XXX no content? not needed? |
|
|
|
} |
|
|
|
static void uring_poller_do_recv(struct poller *p, struct poller_req *preq) { |
|
|
|
@ -584,7 +608,7 @@ void uring_poller_poll(struct poller *p) { |
|
|
|
} |
|
|
|
|
|
|
|
void uring_poller_clear(struct poller *p) { |
|
|
|
struct uring_buffer_req *req = uring_alloc_buffer_req(struct uring_buffer_req); |
|
|
|
struct uring_buffer_req *req = uring_alloc_req(struct uring_buffer_req, uring_req_free); |
|
|
|
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); |
|
|
|
io_uring_prep_cancel(sqe, 0, IORING_ASYNC_CANCEL_ANY); |
|
|
|
io_uring_sqe_set_data(sqe, req); |
|
|
|
|