diff --git a/daemon/Makefile b/daemon/Makefile index a93bb134a..21c5870c0 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -55,7 +55,7 @@ include ../lib/lib.Makefile SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ - media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c + media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c LIBSRCS= loglib.c auxlib.c rtplib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index af3191c82..13492005f 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -22,6 +22,8 @@ #include "recording.h" #include "rtplib.h" #include "ssrc.h" +#include "tcp_listener.h" +#include "streambuf.h" @@ -417,7 +419,7 @@ void call_delete_tcp(char **out, struct callmaster *m) { call_delete_branch(m, &callid, NULL, NULL, NULL, NULL, -1); } -static void call_status_iterator(struct call *c, struct control_stream *s) { +static void call_status_iterator(struct call *c, struct streambuf_stream *s) { // GList *l; // struct callstream *cs; // struct peer *p; @@ -429,7 +431,7 @@ static void call_status_iterator(struct call *c, struct control_stream *s) { // m = c->callmaster; // mutex_lock(&c->master_lock); - control_stream_printf(s, "session "STR_FORMAT" - - - - %lli\n", + streambuf_printf(s->outbuf, "session "STR_FORMAT" - - - - %lli\n", STR_FMT(&c->callid), timeval_diff(&g_now, &c->created) / 1000000); @@ -438,13 +440,13 @@ static void call_status_iterator(struct call *c, struct control_stream *s) { // mutex_unlock(&c->master_lock); } -void calls_status_tcp(struct callmaster *m, struct control_stream *s) { +void calls_status_tcp(struct callmaster *m, struct streambuf_stream *s) { GQueue q = G_QUEUE_INIT; struct call *c; callmaster_get_all_calls(m, &q); - control_stream_printf(s, "proxy %u "UINT64F"/%i/%i\n", + streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n", g_queue_get_length(&q), atomic64_get(&m->stats.bytes), 0, 0); diff --git a/daemon/call_interfaces.h b/daemon/call_interfaces.h index 4887dc971..c87af22d1 100644 --- a/daemon/call_interfaces.h +++ b/daemon/call_interfaces.h @@ -14,7 +14,7 @@ struct call; struct call_stats; struct callmaster; -struct control_stream; +struct streambuf_stream; struct sockaddr_in6; struct sdp_ng_flags { @@ -69,7 +69,7 @@ extern int dtls_passive_def; str *call_request_tcp(char **, struct callmaster *); str *call_lookup_tcp(char **, struct callmaster *); void call_delete_tcp(char **, struct callmaster *); -void calls_status_tcp(struct callmaster *, struct control_stream *); +void calls_status_tcp(struct callmaster *, struct streambuf_stream *); str *call_update_udp(char **, struct callmaster *, const char*, const endpoint_t *); str *call_lookup_udp(char **, struct callmaster *); diff --git a/daemon/control_ng.c b/daemon/control_ng.c index ee46baafd..0c5d37930 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -104,7 +104,7 @@ struct control_ng_stats* get_control_ng_stats(struct control_ng* c, const sockad } static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr, - struct udp_listener *ul) + socket_t *ul) { struct control_ng *c = (void *) obj; bencode_buffer_t bencbuf; @@ -289,7 +289,7 @@ send_only: iov[2].iov_base = to_send->s; iov[2].iov_len = to_send->len; - socket_sendiov(&ul->sock, iov, iovlen, sin); + socket_sendiov(ul, iov, iovlen, sin); if (resp) cookie_cache_insert(&c->cookie_cache, &cookie, &reply); @@ -319,12 +319,12 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, struct callm if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj)) goto fail2; if (tos) - set_tos(&c->udp_listeners[0].sock,tos); + set_tos(&c->udp_listeners[0],tos); if (ipv46_any_convert(ep)) { if (udp_listener_init(&c->udp_listeners[1], p, ep, control_ng_incoming, &c->obj)) goto fail2; if (tos) - set_tos(&c->udp_listeners[1].sock,tos); + set_tos(&c->udp_listeners[1],tos); } return c; diff --git a/daemon/control_ng.h b/daemon/control_ng.h index 027ab8234..9851fdae7 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -27,7 +27,7 @@ struct control_ng { struct obj obj; struct callmaster *callmaster; struct cookie_cache cookie_cache; - struct udp_listener udp_listeners[2]; + socket_t udp_listeners[2]; }; struct control_ng *control_ng_new(struct poller *, endpoint_t *, struct callmaster *, unsigned char); diff --git a/daemon/control_tcp.c b/daemon/control_tcp.c index f65a048cb..6ffb29d3f 100644 --- a/daemon/control_tcp.c +++ b/daemon/control_tcp.c @@ -18,35 +18,19 @@ #include "call_interfaces.h" #include "socket.h" #include "log_funcs.h" +#include "tcp_listener.h" -struct control_stream { - struct obj obj; - - int fd; - mutex_t lock; - struct streambuf *inbuf; - struct streambuf *outbuf; - struct sockaddr_in inaddr; - - struct control_tcp *control; - struct poller *poller; - int linked:1; -}; - - struct control_tcp { struct obj obj; - int fd; + struct streambuf_listener listeners[2]; + pcre *parse_re; pcre_extra *parse_ree; - mutex_t lock; - GList *streams; - struct poller *poller; struct callmaster *callmaster; }; @@ -54,65 +38,48 @@ struct control_tcp { -static void control_stream_closed(int fd, void *p, uintptr_t u) { - struct control_stream *s = p; - struct control_tcp *c; - GList *l = NULL; +//static void control_stream_closed(int fd, void *p, uintptr_t u) { +static void control_stream_closed(struct streambuf_stream *s) { + ilog(LOG_INFO, "Control connection from %s closed", s->addr); +} - ilog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr)); - c = s->control; +static void control_list(struct control_tcp *c, struct streambuf_stream *s) { + for (int i = 0; i < G_N_ELEMENTS(c->listeners); i++) { + if (!c->listeners[i].listener.family || !c->listeners[i].poller) + continue; // not used -restart: - mutex_lock(&c->lock); - if (s->linked) { - /* we might get called when it's not quite linked yet */ - l = g_list_find(c->streams, s); - if (!l) { - mutex_unlock(&c->lock); - goto restart; - } - c->streams = g_list_delete_link(c->streams, l); - s->linked = 0; - } - mutex_unlock(&c->lock); - if (l) - obj_put(s); - poller_del_item(s->poller, fd); -} + mutex_lock(&c->listeners[i].lock); + GList *streams = g_hash_table_get_values(c->listeners[i].streams); + for (GList *l = streams; l; l = l->next) { + struct streambuf_stream *cl = l->data; + streambuf_printf(s->outbuf, "%s\n", cl->addr); + } -static void control_list(struct control_tcp *c, struct control_stream *s) { - struct control_stream *i; - GList *l; + mutex_unlock(&c->listeners[i].lock); - mutex_lock(&c->lock); - for (l = c->streams; l; l = l->next) { - i = l->data; - mutex_lock(&s->lock); - streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr)); - mutex_unlock(&s->lock); + g_list_free(streams); } - mutex_unlock(&c->lock); streambuf_printf(s->outbuf, "End.\n"); } -static int control_stream_parse(struct control_stream *s, char *line) { +static int control_stream_parse(struct streambuf_stream *s, char *line) { int ovec[60]; int ret; char **out; - struct control_tcp *c = s->control; + struct control_tcp *c = (void *) s->parent; str *output = NULL; ret = pcre_exec(c->parse_re, c->parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec)); if (ret <= 0) { - ilog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line); + ilog(LOG_WARNING, "Unable to parse command line from %s: %s", s->addr, line); return -1; } - ilog(LOG_INFO, "Got valid command from " DF ": %s", DP(s->inaddr), line); + ilog(LOG_INFO, "Got valid command from %s: %s", s->addr, line); pcre_get_substring_list(line, ovec, ret, (const char ***) &out); @@ -132,151 +99,64 @@ static int control_stream_parse(struct control_stream *s, char *line) { else if (!strcmp(out[RE_TCP_DIV_CMD], "status")) calls_status_tcp(c->callmaster, s); else if (!strcmp(out[RE_TCP_DIV_CMD], "build") || !strcmp(out[RE_TCP_DIV_CMD], "version")) - control_stream_printf(s, "Version: %s\n", RTPENGINE_VERSION); + streambuf_printf(s->outbuf, "Version: %s\n", RTPENGINE_VERSION); else if (!strcmp(out[RE_TCP_DIV_CMD], "controls")) control_list(c, s); else if (!strcmp(out[RE_TCP_DIV_CMD], "quit") || !strcmp(out[RE_TCP_DIV_CMD], "exit")) ; if (output) { - mutex_lock(&s->lock); streambuf_write_str(s->outbuf, output); - mutex_unlock(&s->lock); free(output); } pcre_free(out); log_info_clear(); - return -1; + return 1; } -static void control_stream_timer(int fd, void *p, uintptr_t u) { - struct control_stream *s = p; - int i; - - mutex_lock(&s->lock); - i = (poller_now - s->inbuf->active) >= 60 || (poller_now - s->outbuf->active) >= 60; - mutex_unlock(&s->lock); - - if (i) - control_stream_closed(s->fd, s, 0); +static void control_stream_timer(struct streambuf_stream *s) { + if ((poller_now - s->inbuf->active) >= 60 || (poller_now - s->outbuf->active) >= 60) + control_stream_closed(s); } -static void control_stream_readable(int fd, void *p, uintptr_t u) { - struct control_stream *s = p; +//static void control_stream_readable(int fd, void *p, uintptr_t u) { +static void control_stream_readable(struct streambuf_stream *s) { char *line; int ret; - mutex_lock(&s->lock); - - if (streambuf_readable(s->inbuf)) - goto close; - while ((line = streambuf_getline(s->inbuf))) { - mutex_unlock(&s->lock); - ilog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line); + ilog(LOG_DEBUG, "Got control line from %s: %s", s->addr, line); ret = control_stream_parse(s, line); free(line); + if (ret == 1) { + streambuf_stream_shutdown(s); + break; + } if (ret) - goto close_nolock; - mutex_lock(&s->lock); + goto close; } if (streambuf_bufsize(s->inbuf) > 1024) { - ilog(LOG_WARNING, "Buffer length exceeded in control connection from " DF, DP(s->inaddr)); + ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr); goto close; } - mutex_unlock(&s->lock); return; close: - mutex_unlock(&s->lock); -close_nolock: - control_stream_closed(fd, s, 0); -} - -static void control_stream_writeable(int fd, void *p, uintptr_t u) { - struct control_stream *s = p; - - if (streambuf_writeable(s->outbuf)) - control_stream_closed(fd, s, 0); -} - -static void control_closed(int fd, void *p, uintptr_t u) { - abort(); + streambuf_stream_close(s); } -static void control_stream_free(void *p) { - struct control_stream *s = p; - - close(s->fd); - streambuf_destroy(s->inbuf); - streambuf_destroy(s->outbuf); - mutex_destroy(&s->lock); +static void control_incoming(struct streambuf_stream *s) { + ilog(LOG_INFO, "New TCP control connection from %s", s->addr); } -static void control_incoming(int fd, void *p, uintptr_t u) { - int nfd; - struct control_tcp *c = p; - struct control_stream *s; - struct poller_item i; - struct sockaddr_in sin; - socklen_t sinl; - -next: - sinl = sizeof(sin); - nfd = accept(fd, (struct sockaddr *) &sin, &sinl); - if (nfd == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - return; - goto next; - } - nonblock(nfd); - - ilog(LOG_INFO, "New control connection from " DF, DP(sin)); - - s = obj_alloc0("control_stream", sizeof(*s), control_stream_free); - - s->fd = nfd; - s->control = c; - s->poller = c->poller; - s->inbuf = streambuf_new(c->poller, nfd); - s->outbuf = streambuf_new(c->poller, nfd); - memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); - mutex_init(&s->lock); - s->linked = 1; - - ZERO(i); - i.fd = nfd; - i.closed = control_stream_closed; - i.readable = control_stream_readable; - i.writeable = control_stream_writeable; - i.timer = control_stream_timer; - i.obj = &s->obj; - - if (poller_add_item(c->poller, &i)) - goto fail; - mutex_lock(&c->lock); - /* let the list steal our own ref */ - c->streams = g_list_prepend(c->streams, s); - mutex_unlock(&c->lock); - - goto next; - -fail: - obj_put(s); - goto next; -} - - -struct control_tcp *control_tcp_new(struct poller *p, const endpoint_t *ep, struct callmaster *m) { - socket_t sock; +struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { struct control_tcp *c; - struct poller_item i; const char *errptr; int erroff; @@ -285,14 +165,28 @@ struct control_tcp *control_tcp_new(struct poller *p, const endpoint_t *ep, stru if (!m) return NULL; - if (open_socket(&sock, SOCK_STREAM, ep->port, &ep->address)) - return NULL; + c = obj_alloc0("control", sizeof(*c), NULL); - if (listen(sock.fd, 5)) + if (streambuf_listener_init(&c->listeners[0], p, ep, + control_incoming, control_stream_readable, + control_stream_closed, + control_stream_timer, + &c->obj)) + { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); goto fail; - - - c = obj_alloc0("control", sizeof(*c), NULL); + } + if (ipv46_any_convert(ep)) { + if (streambuf_listener_init(&c->listeners[1], p, ep, + control_incoming, control_stream_readable, + control_stream_closed, + control_stream_timer, + &c->obj)) + { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + } c->parse_re = pcre_compile( /* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */ @@ -300,36 +194,14 @@ struct control_tcp *control_tcp_new(struct poller *p, const endpoint_t *ep, stru PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); c->parse_ree = pcre_study(c->parse_re, 0, &errptr); - c->fd = sock.fd; c->poller = p; c->callmaster = m; - mutex_init(&c->lock); - - ZERO(i); - i.fd = sock.fd; - i.closed = control_closed; - i.readable = control_incoming; - i.obj = &c->obj; - if (poller_add_item(p, &i)) - goto fail2; obj_put(c); return c; -fail2: - obj_put(c); fail: - close_socket(&sock); + // XXX streambuf_listener_close ... + obj_put(c); return NULL; } - - -void control_stream_printf(struct control_stream *s, const char *f, ...) { - va_list va; - - va_start(va, f); - mutex_lock(&s->lock); - streambuf_vprintf(s->outbuf, f, va); - mutex_unlock(&s->lock); - va_end(va); -} diff --git a/daemon/control_tcp.h b/daemon/control_tcp.h index 06b3c298f..9ca94c969 100644 --- a/daemon/control_tcp.h +++ b/daemon/control_tcp.h @@ -32,12 +32,11 @@ struct poller; struct callmaster; struct control_tcp; -struct control_stream; +struct streambuf_stream; -struct control_tcp *control_tcp_new(struct poller *, const endpoint_t *, struct callmaster *); -void control_stream_printf(struct control_stream *, const char *, ...) __attribute__ ((format (printf, 2, 3))); +struct control_tcp *control_tcp_new(struct poller *, endpoint_t *, struct callmaster *); diff --git a/daemon/control_udp.c b/daemon/control_udp.c index 0cf4eeee4..c64a3e6c4 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -22,7 +22,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr, - struct udp_listener *ul) { + socket_t *ul) { struct control_udp *u = (void *) obj; int ret; int ovec[100]; @@ -60,7 +60,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si iovlen = 2; } - socket_sendiov(&ul->sock, iov, iovlen, sin); + socket_sendiov(ul, iov, iovlen, sin); pcre_free(out); @@ -75,7 +75,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si reply = cookie_cache_lookup(&u->cookie_cache, &cookie); if (reply) { ilog(LOG_INFO, "Detected command from udp:%s as a duplicate", addr); - socket_sendto(&ul->sock, reply->s, reply->len, sin); + socket_sendto(ul, reply->s, reply->len, sin); free(reply); goto out; } @@ -118,11 +118,11 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si iov[2].iov_len = 9; iovlen++; } - socket_sendiov(&ul->sock, iov, iovlen, sin); + socket_sendiov(ul, iov, iovlen, sin); } if (reply) { - socket_sendto(&ul->sock, reply->s, reply->len, sin); + socket_sendto(ul, reply->s, reply->len, sin); cookie_cache_insert(&u->cookie_cache, &cookie, reply); free(reply); } diff --git a/daemon/control_udp.h b/daemon/control_udp.h index 0381c39e0..f30a45c14 100644 --- a/daemon/control_udp.h +++ b/daemon/control_udp.h @@ -50,7 +50,7 @@ struct control_udp { struct callmaster *callmaster; struct cookie_cache cookie_cache; - struct udp_listener udp_listeners[2]; + socket_t udp_listeners[2]; pcre *parse_re; pcre_extra *parse_ree; diff --git a/daemon/socket.c b/daemon/socket.c index 00f731c49..b1fe71f94 100644 --- a/daemon/socket.c +++ b/daemon/socket.c @@ -25,6 +25,8 @@ static int __ip4_is_specified(const sockaddr_t *a); static int __ip6_is_specified(const sockaddr_t *a); static int __ip_bind(socket_t *s, unsigned int, const sockaddr_t *); static int __ip_connect(socket_t *s, const endpoint_t *); +static int __ip_listen(socket_t *s, int backlog); +static int __ip_accept(socket_t *s, socket_t *new_sock); static int __ip_timestamping(socket_t *s); static int __ip4_sockaddr2endpoint(endpoint_t *, const void *); static int __ip6_sockaddr2endpoint(endpoint_t *, const void *); @@ -76,6 +78,8 @@ static struct socket_family __socket_families[__SF_LAST] = { .addrport2sockaddr = __ip4_addrport2sockaddr, .bind = __ip_bind, .connect = __ip_connect, + .listen = __ip_listen, + .accept = __ip_accept, .timestamping = __ip_timestamping, .recvfrom = __ip_recvfrom, .recvfrom_ts = __ip_recvfrom_ts, @@ -105,6 +109,8 @@ static struct socket_family __socket_families[__SF_LAST] = { .addrport2sockaddr = __ip6_addrport2sockaddr, .bind = __ip_bind, .connect = __ip_connect, + .listen = __ip_listen, + .accept = __ip_accept, .timestamping = __ip_timestamping, .recvfrom = __ip_recvfrom, .recvfrom_ts = __ip_recvfrom_ts, @@ -261,6 +267,30 @@ static int __ip_connect(socket_t *s, const endpoint_t *ep) { } return 0; } +static int __ip_listen(socket_t *s, int backlog) { + return listen(s->fd, backlog); +} +static int __ip_accept(socket_t *s, socket_t *newsock) { + int nfd; + struct sockaddr_storage sin; + socklen_t sinlen; + + ZERO(*newsock); + + sinlen = sizeof(sin); + nfd = accept(s->fd, (struct sockaddr *) &sin, &sinlen); + if (nfd == -1) { + __C_DBG("accept fail, fd=%d, port=%d", s->fd, s->local.port); + return -1; + } + + newsock->fd = nfd; + newsock->family = s->family; + newsock->local = s->local; + s->family->sockaddr2endpoint(&newsock->remote, &sin); + + return 0; +} static ssize_t __ip_recvfrom_ts(socket_t *s, void *buf, size_t len, endpoint_t *ep, struct timeval *tv) { ssize_t ret; struct sockaddr_storage sin; @@ -619,6 +649,8 @@ int open_socket(socket_t *r, int type, unsigned int port, const sockaddr_t *sa) nonblock(r->fd); reuseaddr(r->fd); + if (r->family->af == AF_INET6) + ipv6only(r->fd, 1); if (port > 0xffff) { __C_DBG("open socket fail, port=%d > 0xfffffd", port); diff --git a/daemon/socket.h b/daemon/socket.h index 5c57fa5a5..7a010d849 100644 --- a/daemon/socket.h +++ b/daemon/socket.h @@ -64,6 +64,8 @@ struct socket_family { int (*addrport2sockaddr)(void *, const sockaddr_t *, unsigned int); int (*bind)(socket_t *, unsigned int, const sockaddr_t *); int (*connect)(socket_t *, const endpoint_t *); + int (*listen)(socket_t *, int); + int (*accept)(socket_t *, socket_t *); int (*timestamping)(socket_t *); ssize_t (*recvfrom)(socket_t *, void *, size_t, endpoint_t *); ssize_t (*recvfrom_ts)(socket_t *, void *, size_t, endpoint_t *, struct timeval *); diff --git a/daemon/streambuf.c b/daemon/streambuf.c index 24a7c241d..c7da2dfc1 100644 --- a/daemon/streambuf.c +++ b/daemon/streambuf.c @@ -13,12 +13,15 @@ + + struct streambuf *streambuf_new(struct poller *p, int fd) { struct streambuf *b; b = malloc(sizeof(*b)); ZERO(*b); + mutex_init(&b->lock); b->buf = g_string_new(""); b->fd = fd; b->poller = p; @@ -38,6 +41,8 @@ int streambuf_writeable(struct streambuf *b) { int ret; unsigned int out; + mutex_lock(&b->lock); + for (;;) { if (!b->buf->len) break; @@ -48,8 +53,10 @@ int streambuf_writeable(struct streambuf *b) { if (ret < 0) { if (errno == EINTR) continue; - if (errno != EAGAIN && errno != EWOULDBLOCK) + if (errno != EAGAIN && errno != EWOULDBLOCK) { + mutex_unlock(&b->lock); return -1; + } ret = 0; } @@ -64,6 +71,7 @@ int streambuf_writeable(struct streambuf *b) { } } + mutex_unlock(&b->lock); return 0; } @@ -71,16 +79,24 @@ int streambuf_readable(struct streambuf *b) { int ret; char buf[1024]; + mutex_lock(&b->lock); + for (;;) { ret = read(b->fd, buf, 1024); - if (ret == 0) - return -1; + if (ret == 0) { + // don't discard already read data in the buffer + b->eof = 1; + ret = b->buf->len ? -2 : -1; + mutex_unlock(&b->lock); + return ret; + } if (ret < 0) { if (errno == EINTR) continue; if (errno == EAGAIN || errno == EWOULDBLOCK) break; + mutex_unlock(&b->lock); return -1; } @@ -88,15 +104,18 @@ int streambuf_readable(struct streambuf *b) { b->active = poller_now; } + mutex_unlock(&b->lock); return 0; } char *streambuf_getline(struct streambuf *b) { char *p; - int len; + int len, to_del; char *s = NULL; + mutex_lock(&b->lock); + for (;;) { if (s) { free(s); @@ -104,19 +123,29 @@ char *streambuf_getline(struct streambuf *b) { } p = memchr(b->buf->str, '\n', b->buf->len); - if (!p) - return NULL; - - len = p - b->buf->str; - if (len == 0) { - g_string_erase(b->buf, 0, 1); - continue; + if (!p) { + if (b->eof) { + // use entire string + len = b->buf->len; + to_del = len; + } + else + break; + } + else { + len = p - b->buf->str; + to_del = len + 1; + if (len == 0) { + // blank line, skip it + g_string_erase(b->buf, 0, 1); + continue; + } } s = malloc(len + 1); memcpy(s, b->buf->str, len); s[len] = '\0'; - g_string_erase(b->buf, 0, len + 1); + g_string_erase(b->buf, 0, to_del); if (s[--len] == '\r') { if (len == 0) @@ -127,6 +156,7 @@ char *streambuf_getline(struct streambuf *b) { break; } + mutex_unlock(&b->lock); return s; } @@ -157,6 +187,8 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { unsigned int out; int ret; + mutex_lock(&b->lock); + while (len && !poller_isblocked(b->poller, b->fd)) { out = (len > 1024) ? 1024 : len; ret = write(b->fd, s, out); @@ -183,4 +215,6 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { poller_error(b->poller, b->fd); else if (len) g_string_append_len(b->buf, s, len); + + mutex_unlock(&b->lock); } diff --git a/daemon/streambuf.h b/daemon/streambuf.h index a7f50ef26..0b79d95e2 100644 --- a/daemon/streambuf.h +++ b/daemon/streambuf.h @@ -10,6 +10,7 @@ #include "compat.h" #include "str.h" +#include "aux.h" @@ -18,10 +19,12 @@ struct poller; struct streambuf { + mutex_t lock; GString *buf; int fd; struct poller *poller; time_t active; + int eof; }; diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c new file mode 100644 index 000000000..c7fd6a155 --- /dev/null +++ b/daemon/tcp_listener.c @@ -0,0 +1,227 @@ +#include "tcp_listener.h" + +#include + +#include "poller.h" +#include "obj.h" +#include "socket.h" +#include "aux.h" +#include "log.h" +#include "streambuf.h" + +struct tcp_listener_callback { + struct obj obj; + tcp_listener_callback_t func; + socket_t *ul; + struct obj *p; +}; +struct streambuf_callback { + struct obj obj; + streambuf_callback_t newconn_func; + streambuf_callback_t newdata_func; + streambuf_callback_t closed_func; + streambuf_callback_t timer_func; + struct streambuf_listener *listener; + struct obj *parent; +}; + +static void tcp_listener_incoming(int fd, void *p, uintptr_t x) { + struct tcp_listener_callback *cb = p; + int ret; + char addr[64]; + socket_t *listener; + socket_t newsock; + + listener = cb->ul; + + for (;;) { + ret = listener->family->accept(listener, &newsock); + if (ret == -1) { + if (errno == EINTR) + continue; + if (errno != EWOULDBLOCK && errno != EAGAIN) + ilog(LOG_WARNING, "Error accepting TCP connection: %s", strerror(errno)); + return; + } + nonblock(newsock.fd); + + endpoint_print(&newsock.remote, addr, sizeof(addr)); + + cb->func(cb->p, &newsock, addr, listener); + } +} + +static void tcp_listener_closed(int fd, void *p, uintptr_t u) { + abort(); +} + +int tcp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep, + tcp_listener_callback_t func, struct obj *obj) +{ + struct poller_item i; + struct tcp_listener_callback *cb; + + cb = obj_alloc("tcp_listener_callback", sizeof(*cb), NULL); + cb->func = func; + cb->p = obj_get_o(obj); + cb->ul = sock; + + if (open_socket(sock, SOCK_STREAM, ep->port, &ep->address)) + goto fail; + if (sock->family->listen(sock, 5)) + goto fail; + + ZERO(i); + i.fd = sock->fd; + i.closed = tcp_listener_closed; + i.readable = tcp_listener_incoming; + i.obj = &cb->obj; + if (poller_add_item(p, &i)) + goto fail; + + return 0; + +fail: + close_socket(sock); + obj_put_o(obj); + obj_put(cb); + return -1; +} + +static void streambuf_stream_free(void *p) { + struct streambuf_stream *s = p; + close_socket(&s->sock); + streambuf_destroy(s->inbuf); + streambuf_destroy(s->outbuf); + obj_put(s->cb); + obj_put_o(s->parent); + free(s->addr); +} + +static void streambuf_stream_closed(int fd, void *p, uintptr_t u) { + struct streambuf_stream *s = p; + + if (s->sock.fd == -1) + return; + + s->cb->closed_func(s); + + struct streambuf_listener *l = s->listener; + mutex_lock(&l->lock); + int ret = g_hash_table_remove(l->streams, s); + mutex_unlock(&l->lock); + poller_del_item(s->listener->poller, s->sock.fd); + if (ret) + obj_put(s); +} + +static void streambuf_stream_timer(int fd, void *p, uintptr_t u) { + struct streambuf_stream *s = p; + s->cb->timer_func(s); +} + + +static void streambuf_stream_readable(int fd, void *p, uintptr_t u) { + struct streambuf_stream *s = p; + + int ret = streambuf_readable(s->inbuf); + if (ret == -1) + goto close; + s->cb->newdata_func(s); + if (ret == -2) + goto close; + + return; + +close: + streambuf_stream_closed(fd, s, 0); +} + +static void streambuf_stream_writeable(int fd, void *p, uintptr_t u) { + struct streambuf_stream *s = p; + + if (streambuf_writeable(s->outbuf)) + streambuf_stream_closed(fd, s, 0); +} + + +static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *addr, socket_t *listener_sock) { + struct streambuf_callback *cb = (void *) p; + struct streambuf_stream *s; + struct streambuf_listener *listener; + struct poller_item i; + + listener = cb->listener; + + s = obj_alloc0("streambuf_stream", sizeof(*s), streambuf_stream_free); + s->sock = *newsock; + s->inbuf = streambuf_new(listener->poller, newsock->fd); + s->outbuf = streambuf_new(listener->poller, newsock->fd); + s->listener = listener; + s->cb = obj_get(cb); + s->parent = obj_get_o(cb->parent); + s->addr = strdup(addr); + + ZERO(i); + i.fd = newsock->fd; + i.closed = streambuf_stream_closed; + i.readable = streambuf_stream_readable; + i.writeable = streambuf_stream_writeable; + i.timer = streambuf_stream_timer; + i.obj = &s->obj; + + if (poller_add_item(listener->poller, &i)) + goto fail; + + if (cb->newconn_func) + cb->newconn_func(s); + + mutex_lock(&listener->lock); + g_hash_table_insert(listener->streams, s, s); // hand over ref + mutex_unlock(&listener->lock); + + return; + +fail: + obj_put(s); +} + +int streambuf_listener_init(struct streambuf_listener *listener, struct poller *p, const endpoint_t *ep, + streambuf_callback_t newconn_func, + streambuf_callback_t newdata_func, + streambuf_callback_t closed_func, + streambuf_callback_t timer_func, + struct obj *obj) +{ + struct streambuf_callback *cb; + + ZERO(*listener); + + listener->poller = p; + mutex_init(&listener->lock); + listener->streams = g_hash_table_new(g_direct_hash, g_direct_equal); + + cb = obj_alloc("streambuf_callback", sizeof(*cb), NULL); + cb->newconn_func = newconn_func; + cb->newdata_func = newdata_func; + cb->closed_func = closed_func; + cb->timer_func = timer_func; + cb->parent = obj_get_o(obj); + cb->listener = listener; + + if (tcp_listener_init(&listener->listener, p, ep, streambuf_listener_newconn, &cb->obj)) + goto fail; + + return 0; + +fail: + obj_put(cb); + return -1; +} + +void streambuf_stream_close(struct streambuf_stream *s) { + streambuf_stream_closed(s->sock.fd, s, 0); +} +void streambuf_stream_shutdown(struct streambuf_stream *s) { + shutdown(s->sock.fd, SHUT_WR); +} diff --git a/daemon/tcp_listener.h b/daemon/tcp_listener.h new file mode 100644 index 000000000..0bb297a79 --- /dev/null +++ b/daemon/tcp_listener.h @@ -0,0 +1,47 @@ +#ifndef _TCP_LISTENER_H_ +#define _TCP_LISTENER_H_ + +#include "socket.h" +#include "obj.h" + + +struct poller; +struct obj; +struct streambuf_callback; +struct streambuf_stream; + +typedef void (*tcp_listener_callback_t)(struct obj *p, socket_t *sock, char *addr, socket_t *); +typedef void (*streambuf_callback_t)(struct streambuf_stream *); + +struct streambuf_listener { + socket_t listener; + struct poller *poller; + mutex_t lock; + GHashTable *streams; +}; +struct streambuf_stream { + struct obj obj; + socket_t sock; + struct streambuf_listener *listener; + struct streambuf_callback *cb; + struct obj *parent; + char *addr; + struct streambuf *inbuf, + *outbuf; + +}; + +int tcp_listener_init(socket_t *, struct poller *p, const endpoint_t *, tcp_listener_callback_t, struct obj *); + +int streambuf_listener_init(struct streambuf_listener *listener, struct poller *p, const endpoint_t *ep, + streambuf_callback_t newconn_func, + streambuf_callback_t newdata_func, + streambuf_callback_t closed_func, + streambuf_callback_t timer_func, + struct obj *obj); + +void streambuf_stream_close(struct streambuf_stream *); +void streambuf_stream_shutdown(struct streambuf_stream *); + + +#endif diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 209038dc4..656df6e70 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -17,7 +17,7 @@ struct udp_listener_callback { struct obj obj; udp_listener_callback_t func; - struct udp_listener *ul; + socket_t *ul; struct obj *p; }; @@ -31,13 +31,11 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) { char buf[0x10000]; char addr[64]; str str; - struct udp_listener *ul; socket_t *listener; endpoint_t sin; str.s = buf; - ul = cb->ul; - listener = &ul->sock; + listener = cb->ul; for (;;) { len = socket_recvfrom(listener, buf, sizeof(buf)-1, &sin); @@ -53,11 +51,11 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) { endpoint_print(&sin, addr, sizeof(addr)); str.len = len; - cb->func(cb->p, &str, &sin, addr, ul); + cb->func(cb->p, &str, &sin, addr, listener); } } -int udp_listener_init(struct udp_listener *u, struct poller *p, const endpoint_t *ep, +int udp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep, udp_listener_callback_t func, struct obj *obj) { struct poller_item i; @@ -66,15 +64,13 @@ int udp_listener_init(struct udp_listener *u, struct poller *p, const endpoint_t cb = obj_alloc("udp_listener_callback", sizeof(*cb), NULL); cb->func = func; cb->p = obj_get_o(obj); - cb->ul = u; + cb->ul = sock; - if (open_socket(&u->sock, SOCK_DGRAM, ep->port, &ep->address)) + if (open_socket(sock, SOCK_DGRAM, ep->port, &ep->address)) goto fail; - ipv6only(u->sock.fd, 1); - ZERO(i); - i.fd = u->sock.fd; + i.fd = sock->fd; i.closed = udp_listener_closed; i.readable = udp_listener_incoming; i.obj = &cb->obj; @@ -84,7 +80,7 @@ int udp_listener_init(struct udp_listener *u, struct poller *p, const endpoint_t return 0; fail: - close_socket(&u->sock); + close_socket(sock); obj_put_o(obj); obj_put(cb); return -1; diff --git a/daemon/udp_listener.h b/daemon/udp_listener.h index d69ef6d82..5aa44c107 100644 --- a/daemon/udp_listener.h +++ b/daemon/udp_listener.h @@ -9,16 +9,9 @@ struct poller; struct obj; -struct udp_listener; -typedef void (*udp_listener_callback_t)(struct obj *p, str *buf, const endpoint_t *ep, char *addr, - struct udp_listener *); +typedef void (*udp_listener_callback_t)(struct obj *p, str *buf, const endpoint_t *ep, char *addr, socket_t *); -struct udp_listener { - socket_t sock; - struct poller *poller; -}; - -int udp_listener_init(struct udp_listener *, struct poller *p, const endpoint_t *, udp_listener_callback_t, struct obj *); +int udp_listener_init(socket_t *, struct poller *p, const endpoint_t *, udp_listener_callback_t, struct obj *); #endif