From b28e718ee4ae3ac8e72335702e15abbf06649f9c Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 22 Feb 2019 09:20:54 -0500 Subject: [PATCH] TT#52651 generalise streambuf interface Change-Id: I7d5ab8ffe13e52d4dbb1901531cc13fcc173d60d --- daemon/poller.c | 9 ++++++--- include/poller.h | 7 ++++--- lib/streambuf.c | 38 ++++++++++++++++++++++++++++---------- lib/streambuf.h | 9 ++++++++- recording-daemon/poller.c | 6 +++--- recording-daemon/poller.h | 6 +++--- 6 files changed, 52 insertions(+), 23 deletions(-) diff --git a/daemon/poller.c b/daemon/poller.c index 2aeac20b6..f3335dc34 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -366,7 +366,8 @@ out: } -void poller_blocked(struct poller *p, int fd) { +void poller_blocked(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); struct epoll_event e; if (!p || fd < 0) @@ -392,7 +393,8 @@ fail: mutex_unlock(&p->lock); } -void poller_error(struct poller *p, int fd) { +void poller_error(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); if (!p || fd < 0) return; @@ -412,7 +414,8 @@ fail: mutex_unlock(&p->lock); } -int poller_isblocked(struct poller *p, int fd) { +int poller_isblocked(struct poller *p, void *fdp) { + int fd = GPOINTER_TO_INT(fdp); int ret; if (!p || fd < 0) diff --git a/include/poller.h b/include/poller.h index 8ede2f343..01f72a6ff 100644 --- a/include/poller.h +++ b/include/poller.h @@ -34,9 +34,10 @@ struct poller *poller_new(void); int poller_add_item(struct poller *, struct poller_item *); int poller_update_item(struct poller *, struct poller_item *); int poller_del_item(struct poller *, int); -void poller_blocked(struct poller *, int); -int poller_isblocked(struct poller *, int); -void poller_error(struct poller *, int); + +void poller_blocked(struct poller *, void *); +int poller_isblocked(struct poller *, void *); +void poller_error(struct poller *, void *); int poller_poll(struct poller *, int); void poller_timer_loop(void *); diff --git a/lib/streambuf.c b/lib/streambuf.c index 26ff48118..87c67c61c 100644 --- a/lib/streambuf.c +++ b/lib/streambuf.c @@ -14,20 +14,38 @@ +static ssize_t __fd_write(void *, const void *, size_t); +static ssize_t __fd_read(void *, void *, size_t); -struct streambuf *streambuf_new(struct poller *p, int fd) { +static const struct streambuf_funcs __fd_funcs = { + .write = __fd_write, + .read = __fd_read, +}; + +static ssize_t __fd_write(void *fd, const void *b, size_t s) { + return write(GPOINTER_TO_INT(fd), b, s); +} +static ssize_t __fd_read(void *fd, void *b, size_t s) { + return read(GPOINTER_TO_INT(fd), b, s); +} + +struct streambuf *streambuf_new_ptr(struct poller *p, void *fd_ptr, const struct streambuf_funcs *funcs) { struct streambuf *b; b = g_slice_alloc0(sizeof(*b)); mutex_init(&b->lock); b->buf = g_string_new(""); - b->fd = fd; + b->fd_ptr = fd_ptr; b->poller = p; b->active = rtpe_now.tv_sec; + b->funcs = funcs; return b; } +struct streambuf *streambuf_new(struct poller *p, int fd) { + return streambuf_new_ptr(p, GINT_TO_POINTER(fd), &__fd_funcs); +} void streambuf_destroy(struct streambuf *b) { @@ -47,7 +65,7 @@ int streambuf_writeable(struct streambuf *b) { break; out = (b->buf->len > 1024) ? 1024 : b->buf->len; - ret = write(b->fd, b->buf->str, out); + ret = b->funcs->write(b->fd_ptr, b->buf->str, out); if (ret < 0) { if (errno == EINTR) @@ -65,7 +83,7 @@ int streambuf_writeable(struct streambuf *b) { } if (ret != out) { - poller_blocked(b->poller, b->fd); + poller_blocked(b->poller, b->fd_ptr); break; } } @@ -81,7 +99,7 @@ int streambuf_readable(struct streambuf *b) { mutex_lock(&b->lock); for (;;) { - ret = read(b->fd, buf, 1024); + ret = b->funcs->read(b->fd_ptr, buf, 1024); if (ret == 0) { // don't discard already read data in the buffer @@ -188,18 +206,18 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { mutex_lock(&b->lock); - while (len && !poller_isblocked(b->poller, b->fd)) { + while (len && !poller_isblocked(b->poller, b->fd_ptr)) { out = (len > 1024) ? 1024 : len; - ret = write(b->fd, s, out); + ret = b->funcs->write(b->fd_ptr, s, out); if (ret < 0) { if (errno == EINTR) continue; if (errno != EAGAIN && errno != EWOULDBLOCK) { - poller_error(b->poller, b->fd); + poller_error(b->poller, b->fd_ptr); break; } - poller_blocked(b->poller, b->fd); + poller_blocked(b->poller, b->fd_ptr); break; } if (ret == 0) @@ -211,7 +229,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { } if (b->buf->len > 5242880) - poller_error(b->poller, b->fd); + poller_error(b->poller, b->fd_ptr); else if (len) g_string_append_len(b->buf, s, len); diff --git a/lib/streambuf.h b/lib/streambuf.h index 72a4ae12a..e47810b5d 100644 --- a/lib/streambuf.h +++ b/lib/streambuf.h @@ -18,18 +18,25 @@ struct poller; +struct streambuf_funcs { + ssize_t (*write)(void *, const void *, size_t); + ssize_t (*read)(void *, void *, size_t); +}; struct streambuf { mutex_t lock; GString *buf; - int fd; + void *fd_ptr; struct poller *poller; time_t active; int eof; + const struct streambuf_funcs + *funcs; }; struct streambuf *streambuf_new(struct poller *, int); +struct streambuf *streambuf_new_ptr(struct poller *, void *, const struct streambuf_funcs *); void streambuf_destroy(struct streambuf *); int streambuf_writeable(struct streambuf *); int streambuf_readable(struct streambuf *); diff --git a/recording-daemon/poller.c b/recording-daemon/poller.c index e6d7cfa42..4fb1eb3ea 100644 --- a/recording-daemon/poller.c +++ b/recording-daemon/poller.c @@ -1,10 +1,10 @@ #include "poller.h" -void poller_blocked(struct poller *p, int fd) { +void poller_blocked(struct poller *p, void *fdp) { p->blocked = 1; } -int poller_isblocked(struct poller *p, int fd) { +int poller_isblocked(struct poller *p, void *fdp) { return p->blocked ? 1 : 0; } -void poller_error(struct poller *p, int fd) { +void poller_error(struct poller *p, void *fdp) { } diff --git a/recording-daemon/poller.h b/recording-daemon/poller.h index 91e242300..89218c14a 100644 --- a/recording-daemon/poller.h +++ b/recording-daemon/poller.h @@ -10,9 +10,9 @@ struct poller { int intro:1; }; -void poller_blocked(struct poller *, int); -int poller_isblocked(struct poller *, int); -void poller_error(struct poller *, int); +void poller_blocked(struct poller *, void *); +int poller_isblocked(struct poller *, void *); +void poller_error(struct poller *, void *); #endif