Browse Source

TT#26757 add tcp_listener framework

Change-Id: I402d36637235ba0cc03e77d426f4dd9cbc4722a9
changes/21/17421/9
Richard Fuchs 8 years ago
parent
commit
d10952a029
17 changed files with 452 additions and 245 deletions
  1. +1
    -1
      daemon/Makefile
  2. +6
    -4
      daemon/call_interfaces.c
  3. +2
    -2
      daemon/call_interfaces.h
  4. +4
    -4
      daemon/control_ng.c
  5. +1
    -1
      daemon/control_ng.h
  6. +63
    -191
      daemon/control_tcp.c
  7. +2
    -3
      daemon/control_tcp.h
  8. +5
    -5
      daemon/control_udp.c
  9. +1
    -1
      daemon/control_udp.h
  10. +32
    -0
      daemon/socket.c
  11. +2
    -0
      daemon/socket.h
  12. +46
    -12
      daemon/streambuf.c
  13. +3
    -0
      daemon/streambuf.h
  14. +227
    -0
      daemon/tcp_listener.c
  15. +47
    -0
      daemon/tcp_listener.h
  16. +8
    -12
      daemon/udp_listener.c
  17. +2
    -9
      daemon/udp_listener.h

+ 1
- 1
daemon/Makefile View File

@ -55,7 +55,7 @@ include ../lib/lib.Makefile
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c
LIBSRCS= loglib.c auxlib.c rtplib.c
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o)


+ 6
- 4
daemon/call_interfaces.c View File

@ -22,6 +22,8 @@
#include "recording.h"
#include "rtplib.h"
#include "ssrc.h"
#include "tcp_listener.h"
#include "streambuf.h"
@ -417,7 +419,7 @@ void call_delete_tcp(char **out, struct callmaster *m) {
call_delete_branch(m, &callid, NULL, NULL, NULL, NULL, -1);
}
static void call_status_iterator(struct call *c, struct control_stream *s) {
static void call_status_iterator(struct call *c, struct streambuf_stream *s) {
// GList *l;
// struct callstream *cs;
// struct peer *p;
@ -429,7 +431,7 @@ static void call_status_iterator(struct call *c, struct control_stream *s) {
// m = c->callmaster;
// mutex_lock(&c->master_lock);
control_stream_printf(s, "session "STR_FORMAT" - - - - %lli\n",
streambuf_printf(s->outbuf, "session "STR_FORMAT" - - - - %lli\n",
STR_FMT(&c->callid),
timeval_diff(&g_now, &c->created) / 1000000);
@ -438,13 +440,13 @@ static void call_status_iterator(struct call *c, struct control_stream *s) {
// mutex_unlock(&c->master_lock);
}
void calls_status_tcp(struct callmaster *m, struct control_stream *s) {
void calls_status_tcp(struct callmaster *m, struct streambuf_stream *s) {
GQueue q = G_QUEUE_INIT;
struct call *c;
callmaster_get_all_calls(m, &q);
control_stream_printf(s, "proxy %u "UINT64F"/%i/%i\n",
streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n",
g_queue_get_length(&q),
atomic64_get(&m->stats.bytes), 0, 0);


+ 2
- 2
daemon/call_interfaces.h View File

@ -14,7 +14,7 @@
struct call;
struct call_stats;
struct callmaster;
struct control_stream;
struct streambuf_stream;
struct sockaddr_in6;
struct sdp_ng_flags {
@ -69,7 +69,7 @@ extern int dtls_passive_def;
str *call_request_tcp(char **, struct callmaster *);
str *call_lookup_tcp(char **, struct callmaster *);
void call_delete_tcp(char **, struct callmaster *);
void calls_status_tcp(struct callmaster *, struct control_stream *);
void calls_status_tcp(struct callmaster *, struct streambuf_stream *);
str *call_update_udp(char **, struct callmaster *, const char*, const endpoint_t *);
str *call_lookup_udp(char **, struct callmaster *);


+ 4
- 4
daemon/control_ng.c View File

@ -104,7 +104,7 @@ struct control_ng_stats* get_control_ng_stats(struct control_ng* c, const sockad
}
static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr,
struct udp_listener *ul)
socket_t *ul)
{
struct control_ng *c = (void *) obj;
bencode_buffer_t bencbuf;
@ -289,7 +289,7 @@ send_only:
iov[2].iov_base = to_send->s;
iov[2].iov_len = to_send->len;
socket_sendiov(&ul->sock, iov, iovlen, sin);
socket_sendiov(ul, iov, iovlen, sin);
if (resp)
cookie_cache_insert(&c->cookie_cache, &cookie, &reply);
@ -319,12 +319,12 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, struct callm
if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj))
goto fail2;
if (tos)
set_tos(&c->udp_listeners[0].sock,tos);
set_tos(&c->udp_listeners[0],tos);
if (ipv46_any_convert(ep)) {
if (udp_listener_init(&c->udp_listeners[1], p, ep, control_ng_incoming, &c->obj))
goto fail2;
if (tos)
set_tos(&c->udp_listeners[1].sock,tos);
set_tos(&c->udp_listeners[1],tos);
}
return c;


+ 1
- 1
daemon/control_ng.h View File

@ -27,7 +27,7 @@ struct control_ng {
struct obj obj;
struct callmaster *callmaster;
struct cookie_cache cookie_cache;
struct udp_listener udp_listeners[2];
socket_t udp_listeners[2];
};
struct control_ng *control_ng_new(struct poller *, endpoint_t *, struct callmaster *, unsigned char);


+ 63
- 191
daemon/control_tcp.c View File

@ -18,35 +18,19 @@
#include "call_interfaces.h"
#include "socket.h"
#include "log_funcs.h"
#include "tcp_listener.h"
struct control_stream {
struct obj obj;
int fd;
mutex_t lock;
struct streambuf *inbuf;
struct streambuf *outbuf;
struct sockaddr_in inaddr;
struct control_tcp *control;
struct poller *poller;
int linked:1;
};
struct control_tcp {
struct obj obj;
int fd;
struct streambuf_listener listeners[2];
pcre *parse_re;
pcre_extra *parse_ree;
mutex_t lock;
GList *streams;
struct poller *poller;
struct callmaster *callmaster;
};
@ -54,65 +38,48 @@ struct control_tcp {
static void control_stream_closed(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
struct control_tcp *c;
GList *l = NULL;
//static void control_stream_closed(int fd, void *p, uintptr_t u) {
static void control_stream_closed(struct streambuf_stream *s) {
ilog(LOG_INFO, "Control connection from %s closed", s->addr);
}
ilog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr));
c = s->control;
static void control_list(struct control_tcp *c, struct streambuf_stream *s) {
for (int i = 0; i < G_N_ELEMENTS(c->listeners); i++) {
if (!c->listeners[i].listener.family || !c->listeners[i].poller)
continue; // not used
restart:
mutex_lock(&c->lock);
if (s->linked) {
/* we might get called when it's not quite linked yet */
l = g_list_find(c->streams, s);
if (!l) {
mutex_unlock(&c->lock);
goto restart;
}
c->streams = g_list_delete_link(c->streams, l);
s->linked = 0;
}
mutex_unlock(&c->lock);
if (l)
obj_put(s);
poller_del_item(s->poller, fd);
}
mutex_lock(&c->listeners[i].lock);
GList *streams = g_hash_table_get_values(c->listeners[i].streams);
for (GList *l = streams; l; l = l->next) {
struct streambuf_stream *cl = l->data;
streambuf_printf(s->outbuf, "%s\n", cl->addr);
}
static void control_list(struct control_tcp *c, struct control_stream *s) {
struct control_stream *i;
GList *l;
mutex_unlock(&c->listeners[i].lock);
mutex_lock(&c->lock);
for (l = c->streams; l; l = l->next) {
i = l->data;
mutex_lock(&s->lock);
streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr));
mutex_unlock(&s->lock);
g_list_free(streams);
}
mutex_unlock(&c->lock);
streambuf_printf(s->outbuf, "End.\n");
}
static int control_stream_parse(struct control_stream *s, char *line) {
static int control_stream_parse(struct streambuf_stream *s, char *line) {
int ovec[60];
int ret;
char **out;
struct control_tcp *c = s->control;
struct control_tcp *c = (void *) s->parent;
str *output = NULL;
ret = pcre_exec(c->parse_re, c->parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec));
if (ret <= 0) {
ilog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line);
ilog(LOG_WARNING, "Unable to parse command line from %s: %s", s->addr, line);
return -1;
}
ilog(LOG_INFO, "Got valid command from " DF ": %s", DP(s->inaddr), line);
ilog(LOG_INFO, "Got valid command from %s: %s", s->addr, line);
pcre_get_substring_list(line, ovec, ret, (const char ***) &out);
@ -132,151 +99,64 @@ static int control_stream_parse(struct control_stream *s, char *line) {
else if (!strcmp(out[RE_TCP_DIV_CMD], "status"))
calls_status_tcp(c->callmaster, s);
else if (!strcmp(out[RE_TCP_DIV_CMD], "build") || !strcmp(out[RE_TCP_DIV_CMD], "version"))
control_stream_printf(s, "Version: %s\n", RTPENGINE_VERSION);
streambuf_printf(s->outbuf, "Version: %s\n", RTPENGINE_VERSION);
else if (!strcmp(out[RE_TCP_DIV_CMD], "controls"))
control_list(c, s);
else if (!strcmp(out[RE_TCP_DIV_CMD], "quit") || !strcmp(out[RE_TCP_DIV_CMD], "exit"))
;
if (output) {
mutex_lock(&s->lock);
streambuf_write_str(s->outbuf, output);
mutex_unlock(&s->lock);
free(output);
}
pcre_free(out);
log_info_clear();
return -1;
return 1;
}
static void control_stream_timer(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
int i;
mutex_lock(&s->lock);
i = (poller_now - s->inbuf->active) >= 60 || (poller_now - s->outbuf->active) >= 60;
mutex_unlock(&s->lock);
if (i)
control_stream_closed(s->fd, s, 0);
static void control_stream_timer(struct streambuf_stream *s) {
if ((poller_now - s->inbuf->active) >= 60 || (poller_now - s->outbuf->active) >= 60)
control_stream_closed(s);
}
static void control_stream_readable(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
//static void control_stream_readable(int fd, void *p, uintptr_t u) {
static void control_stream_readable(struct streambuf_stream *s) {
char *line;
int ret;
mutex_lock(&s->lock);
if (streambuf_readable(s->inbuf))
goto close;
while ((line = streambuf_getline(s->inbuf))) {
mutex_unlock(&s->lock);
ilog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line);
ilog(LOG_DEBUG, "Got control line from %s: %s", s->addr, line);
ret = control_stream_parse(s, line);
free(line);
if (ret == 1) {
streambuf_stream_shutdown(s);
break;
}
if (ret)
goto close_nolock;
mutex_lock(&s->lock);
goto close;
}
if (streambuf_bufsize(s->inbuf) > 1024) {
ilog(LOG_WARNING, "Buffer length exceeded in control connection from " DF, DP(s->inaddr));
ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr);
goto close;
}
mutex_unlock(&s->lock);
return;
close:
mutex_unlock(&s->lock);
close_nolock:
control_stream_closed(fd, s, 0);
}
static void control_stream_writeable(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
if (streambuf_writeable(s->outbuf))
control_stream_closed(fd, s, 0);
}
static void control_closed(int fd, void *p, uintptr_t u) {
abort();
streambuf_stream_close(s);
}
static void control_stream_free(void *p) {
struct control_stream *s = p;
close(s->fd);
streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf);
mutex_destroy(&s->lock);
static void control_incoming(struct streambuf_stream *s) {
ilog(LOG_INFO, "New TCP control connection from %s", s->addr);
}
static void control_incoming(int fd, void *p, uintptr_t u) {
int nfd;
struct control_tcp *c = p;
struct control_stream *s;
struct poller_item i;
struct sockaddr_in sin;
socklen_t sinl;
next:
sinl = sizeof(sin);
nfd = accept(fd, (struct sockaddr *) &sin, &sinl);
if (nfd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
goto next;
}
nonblock(nfd);
ilog(LOG_INFO, "New control connection from " DF, DP(sin));
s = obj_alloc0("control_stream", sizeof(*s), control_stream_free);
s->fd = nfd;
s->control = c;
s->poller = c->poller;
s->inbuf = streambuf_new(c->poller, nfd);
s->outbuf = streambuf_new(c->poller, nfd);
memcpy(&s->inaddr, &sin, sizeof(s->inaddr));
mutex_init(&s->lock);
s->linked = 1;
ZERO(i);
i.fd = nfd;
i.closed = control_stream_closed;
i.readable = control_stream_readable;
i.writeable = control_stream_writeable;
i.timer = control_stream_timer;
i.obj = &s->obj;
if (poller_add_item(c->poller, &i))
goto fail;
mutex_lock(&c->lock);
/* let the list steal our own ref */
c->streams = g_list_prepend(c->streams, s);
mutex_unlock(&c->lock);
goto next;
fail:
obj_put(s);
goto next;
}
struct control_tcp *control_tcp_new(struct poller *p, const endpoint_t *ep, struct callmaster *m) {
socket_t sock;
struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep, struct callmaster *m) {
struct control_tcp *c;
struct poller_item i;
const char *errptr;
int erroff;
@ -285,14 +165,28 @@ struct control_tcp *control_tcp_new(struct poller *p, const endpoint_t *ep, stru
if (!m)
return NULL;
if (open_socket(&sock, SOCK_STREAM, ep->port, &ep->address))
return NULL;
c = obj_alloc0("control", sizeof(*c), NULL);
if (listen(sock.fd, 5))
if (streambuf_listener_init(&c->listeners[0], p, ep,
control_incoming, control_stream_readable,
control_stream_closed,
control_stream_timer,
&c->obj))
{
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
goto fail;
c = obj_alloc0("control", sizeof(*c), NULL);
}
if (ipv46_any_convert(ep)) {
if (streambuf_listener_init(&c->listeners[1], p, ep,
control_incoming, control_stream_readable,
control_stream_closed,
control_stream_timer,
&c->obj))
{
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
goto fail;
}
}
c->parse_re = pcre_compile(
/* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */
@ -300,36 +194,14 @@ struct control_tcp *control_tcp_new(struct poller *p, const endpoint_t *ep, stru
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
c->parse_ree = pcre_study(c->parse_re, 0, &errptr);
c->fd = sock.fd;
c->poller = p;
c->callmaster = m;
mutex_init(&c->lock);
ZERO(i);
i.fd = sock.fd;
i.closed = control_closed;
i.readable = control_incoming;
i.obj = &c->obj;
if (poller_add_item(p, &i))
goto fail2;
obj_put(c);
return c;
fail2:
obj_put(c);
fail:
close_socket(&sock);
// XXX streambuf_listener_close ...
obj_put(c);
return NULL;
}
void control_stream_printf(struct control_stream *s, const char *f, ...) {
va_list va;
va_start(va, f);
mutex_lock(&s->lock);
streambuf_vprintf(s->outbuf, f, va);
mutex_unlock(&s->lock);
va_end(va);
}

+ 2
- 3
daemon/control_tcp.h View File

@ -32,12 +32,11 @@
struct poller;
struct callmaster;
struct control_tcp;
struct control_stream;
struct streambuf_stream;
struct control_tcp *control_tcp_new(struct poller *, const endpoint_t *, struct callmaster *);
void control_stream_printf(struct control_stream *, const char *, ...) __attribute__ ((format (printf, 2, 3)));
struct control_tcp *control_tcp_new(struct poller *, endpoint_t *, struct callmaster *);


+ 5
- 5
daemon/control_udp.c View File

@ -22,7 +22,7 @@
static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr,
struct udp_listener *ul) {
socket_t *ul) {
struct control_udp *u = (void *) obj;
int ret;
int ovec[100];
@ -60,7 +60,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
iovlen = 2;
}
socket_sendiov(&ul->sock, iov, iovlen, sin);
socket_sendiov(ul, iov, iovlen, sin);
pcre_free(out);
@ -75,7 +75,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
reply = cookie_cache_lookup(&u->cookie_cache, &cookie);
if (reply) {
ilog(LOG_INFO, "Detected command from udp:%s as a duplicate", addr);
socket_sendto(&ul->sock, reply->s, reply->len, sin);
socket_sendto(ul, reply->s, reply->len, sin);
free(reply);
goto out;
}
@ -118,11 +118,11 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
iov[2].iov_len = 9;
iovlen++;
}
socket_sendiov(&ul->sock, iov, iovlen, sin);
socket_sendiov(ul, iov, iovlen, sin);
}
if (reply) {
socket_sendto(&ul->sock, reply->s, reply->len, sin);
socket_sendto(ul, reply->s, reply->len, sin);
cookie_cache_insert(&u->cookie_cache, &cookie, reply);
free(reply);
}


+ 1
- 1
daemon/control_udp.h View File

@ -50,7 +50,7 @@ struct control_udp {
struct callmaster *callmaster;
struct cookie_cache cookie_cache;
struct udp_listener udp_listeners[2];
socket_t udp_listeners[2];
pcre *parse_re;
pcre_extra *parse_ree;


+ 32
- 0
daemon/socket.c View File

@ -25,6 +25,8 @@ static int __ip4_is_specified(const sockaddr_t *a);
static int __ip6_is_specified(const sockaddr_t *a);
static int __ip_bind(socket_t *s, unsigned int, const sockaddr_t *);
static int __ip_connect(socket_t *s, const endpoint_t *);
static int __ip_listen(socket_t *s, int backlog);
static int __ip_accept(socket_t *s, socket_t *new_sock);
static int __ip_timestamping(socket_t *s);
static int __ip4_sockaddr2endpoint(endpoint_t *, const void *);
static int __ip6_sockaddr2endpoint(endpoint_t *, const void *);
@ -76,6 +78,8 @@ static struct socket_family __socket_families[__SF_LAST] = {
.addrport2sockaddr = __ip4_addrport2sockaddr,
.bind = __ip_bind,
.connect = __ip_connect,
.listen = __ip_listen,
.accept = __ip_accept,
.timestamping = __ip_timestamping,
.recvfrom = __ip_recvfrom,
.recvfrom_ts = __ip_recvfrom_ts,
@ -105,6 +109,8 @@ static struct socket_family __socket_families[__SF_LAST] = {
.addrport2sockaddr = __ip6_addrport2sockaddr,
.bind = __ip_bind,
.connect = __ip_connect,
.listen = __ip_listen,
.accept = __ip_accept,
.timestamping = __ip_timestamping,
.recvfrom = __ip_recvfrom,
.recvfrom_ts = __ip_recvfrom_ts,
@ -261,6 +267,30 @@ static int __ip_connect(socket_t *s, const endpoint_t *ep) {
}
return 0;
}
static int __ip_listen(socket_t *s, int backlog) {
return listen(s->fd, backlog);
}
static int __ip_accept(socket_t *s, socket_t *newsock) {
int nfd;
struct sockaddr_storage sin;
socklen_t sinlen;
ZERO(*newsock);
sinlen = sizeof(sin);
nfd = accept(s->fd, (struct sockaddr *) &sin, &sinlen);
if (nfd == -1) {
__C_DBG("accept fail, fd=%d, port=%d", s->fd, s->local.port);
return -1;
}
newsock->fd = nfd;
newsock->family = s->family;
newsock->local = s->local;
s->family->sockaddr2endpoint(&newsock->remote, &sin);
return 0;
}
static ssize_t __ip_recvfrom_ts(socket_t *s, void *buf, size_t len, endpoint_t *ep, struct timeval *tv) {
ssize_t ret;
struct sockaddr_storage sin;
@ -619,6 +649,8 @@ int open_socket(socket_t *r, int type, unsigned int port, const sockaddr_t *sa)
nonblock(r->fd);
reuseaddr(r->fd);
if (r->family->af == AF_INET6)
ipv6only(r->fd, 1);
if (port > 0xffff) {
__C_DBG("open socket fail, port=%d > 0xfffffd", port);


+ 2
- 0
daemon/socket.h View File

@ -64,6 +64,8 @@ struct socket_family {
int (*addrport2sockaddr)(void *, const sockaddr_t *, unsigned int);
int (*bind)(socket_t *, unsigned int, const sockaddr_t *);
int (*connect)(socket_t *, const endpoint_t *);
int (*listen)(socket_t *, int);
int (*accept)(socket_t *, socket_t *);
int (*timestamping)(socket_t *);
ssize_t (*recvfrom)(socket_t *, void *, size_t, endpoint_t *);
ssize_t (*recvfrom_ts)(socket_t *, void *, size_t, endpoint_t *, struct timeval *);


+ 46
- 12
daemon/streambuf.c View File

@ -13,12 +13,15 @@
struct streambuf *streambuf_new(struct poller *p, int fd) {
struct streambuf *b;
b = malloc(sizeof(*b));
ZERO(*b);
mutex_init(&b->lock);
b->buf = g_string_new("");
b->fd = fd;
b->poller = p;
@ -38,6 +41,8 @@ int streambuf_writeable(struct streambuf *b) {
int ret;
unsigned int out;
mutex_lock(&b->lock);
for (;;) {
if (!b->buf->len)
break;
@ -48,8 +53,10 @@ int streambuf_writeable(struct streambuf *b) {
if (ret < 0) {
if (errno == EINTR)
continue;
if (errno != EAGAIN && errno != EWOULDBLOCK)
if (errno != EAGAIN && errno != EWOULDBLOCK) {
mutex_unlock(&b->lock);
return -1;
}
ret = 0;
}
@ -64,6 +71,7 @@ int streambuf_writeable(struct streambuf *b) {
}
}
mutex_unlock(&b->lock);
return 0;
}
@ -71,16 +79,24 @@ int streambuf_readable(struct streambuf *b) {
int ret;
char buf[1024];
mutex_lock(&b->lock);
for (;;) {
ret = read(b->fd, buf, 1024);
if (ret == 0)
return -1;
if (ret == 0) {
// don't discard already read data in the buffer
b->eof = 1;
ret = b->buf->len ? -2 : -1;
mutex_unlock(&b->lock);
return ret;
}
if (ret < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
mutex_unlock(&b->lock);
return -1;
}
@ -88,15 +104,18 @@ int streambuf_readable(struct streambuf *b) {
b->active = poller_now;
}
mutex_unlock(&b->lock);
return 0;
}
char *streambuf_getline(struct streambuf *b) {
char *p;
int len;
int len, to_del;
char *s = NULL;
mutex_lock(&b->lock);
for (;;) {
if (s) {
free(s);
@ -104,19 +123,29 @@ char *streambuf_getline(struct streambuf *b) {
}
p = memchr(b->buf->str, '\n', b->buf->len);
if (!p)
return NULL;
len = p - b->buf->str;
if (len == 0) {
g_string_erase(b->buf, 0, 1);
continue;
if (!p) {
if (b->eof) {
// use entire string
len = b->buf->len;
to_del = len;
}
else
break;
}
else {
len = p - b->buf->str;
to_del = len + 1;
if (len == 0) {
// blank line, skip it
g_string_erase(b->buf, 0, 1);
continue;
}
}
s = malloc(len + 1);
memcpy(s, b->buf->str, len);
s[len] = '\0';
g_string_erase(b->buf, 0, len + 1);
g_string_erase(b->buf, 0, to_del);
if (s[--len] == '\r') {
if (len == 0)
@ -127,6 +156,7 @@ char *streambuf_getline(struct streambuf *b) {
break;
}
mutex_unlock(&b->lock);
return s;
}
@ -157,6 +187,8 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
unsigned int out;
int ret;
mutex_lock(&b->lock);
while (len && !poller_isblocked(b->poller, b->fd)) {
out = (len > 1024) ? 1024 : len;
ret = write(b->fd, s, out);
@ -183,4 +215,6 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
poller_error(b->poller, b->fd);
else if (len)
g_string_append_len(b->buf, s, len);
mutex_unlock(&b->lock);
}

+ 3
- 0
daemon/streambuf.h View File

@ -10,6 +10,7 @@
#include "compat.h"
#include "str.h"
#include "aux.h"
@ -18,10 +19,12 @@ struct poller;
struct streambuf {
mutex_t lock;
GString *buf;
int fd;
struct poller *poller;
time_t active;
int eof;
};


+ 227
- 0
daemon/tcp_listener.c View File

@ -0,0 +1,227 @@
#include "tcp_listener.h"
#include <errno.h>
#include "poller.h"
#include "obj.h"
#include "socket.h"
#include "aux.h"
#include "log.h"
#include "streambuf.h"
struct tcp_listener_callback {
struct obj obj;
tcp_listener_callback_t func;
socket_t *ul;
struct obj *p;
};
struct streambuf_callback {
struct obj obj;
streambuf_callback_t newconn_func;
streambuf_callback_t newdata_func;
streambuf_callback_t closed_func;
streambuf_callback_t timer_func;
struct streambuf_listener *listener;
struct obj *parent;
};
static void tcp_listener_incoming(int fd, void *p, uintptr_t x) {
struct tcp_listener_callback *cb = p;
int ret;
char addr[64];
socket_t *listener;
socket_t newsock;
listener = cb->ul;
for (;;) {
ret = listener->family->accept(listener, &newsock);
if (ret == -1) {
if (errno == EINTR)
continue;
if (errno != EWOULDBLOCK && errno != EAGAIN)
ilog(LOG_WARNING, "Error accepting TCP connection: %s", strerror(errno));
return;
}
nonblock(newsock.fd);
endpoint_print(&newsock.remote, addr, sizeof(addr));
cb->func(cb->p, &newsock, addr, listener);
}
}
static void tcp_listener_closed(int fd, void *p, uintptr_t u) {
abort();
}
int tcp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep,
tcp_listener_callback_t func, struct obj *obj)
{
struct poller_item i;
struct tcp_listener_callback *cb;
cb = obj_alloc("tcp_listener_callback", sizeof(*cb), NULL);
cb->func = func;
cb->p = obj_get_o(obj);
cb->ul = sock;
if (open_socket(sock, SOCK_STREAM, ep->port, &ep->address))
goto fail;
if (sock->family->listen(sock, 5))
goto fail;
ZERO(i);
i.fd = sock->fd;
i.closed = tcp_listener_closed;
i.readable = tcp_listener_incoming;
i.obj = &cb->obj;
if (poller_add_item(p, &i))
goto fail;
return 0;
fail:
close_socket(sock);
obj_put_o(obj);
obj_put(cb);
return -1;
}
static void streambuf_stream_free(void *p) {
struct streambuf_stream *s = p;
close_socket(&s->sock);
streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf);
obj_put(s->cb);
obj_put_o(s->parent);
free(s->addr);
}
static void streambuf_stream_closed(int fd, void *p, uintptr_t u) {
struct streambuf_stream *s = p;
if (s->sock.fd == -1)
return;
s->cb->closed_func(s);
struct streambuf_listener *l = s->listener;
mutex_lock(&l->lock);
int ret = g_hash_table_remove(l->streams, s);
mutex_unlock(&l->lock);
poller_del_item(s->listener->poller, s->sock.fd);
if (ret)
obj_put(s);
}
static void streambuf_stream_timer(int fd, void *p, uintptr_t u) {
struct streambuf_stream *s = p;
s->cb->timer_func(s);
}
static void streambuf_stream_readable(int fd, void *p, uintptr_t u) {
struct streambuf_stream *s = p;
int ret = streambuf_readable(s->inbuf);
if (ret == -1)
goto close;
s->cb->newdata_func(s);
if (ret == -2)
goto close;
return;
close:
streambuf_stream_closed(fd, s, 0);
}
static void streambuf_stream_writeable(int fd, void *p, uintptr_t u) {
struct streambuf_stream *s = p;
if (streambuf_writeable(s->outbuf))
streambuf_stream_closed(fd, s, 0);
}
static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *addr, socket_t *listener_sock) {
struct streambuf_callback *cb = (void *) p;
struct streambuf_stream *s;
struct streambuf_listener *listener;
struct poller_item i;
listener = cb->listener;
s = obj_alloc0("streambuf_stream", sizeof(*s), streambuf_stream_free);
s->sock = *newsock;
s->inbuf = streambuf_new(listener->poller, newsock->fd);
s->outbuf = streambuf_new(listener->poller, newsock->fd);
s->listener = listener;
s->cb = obj_get(cb);
s->parent = obj_get_o(cb->parent);
s->addr = strdup(addr);
ZERO(i);
i.fd = newsock->fd;
i.closed = streambuf_stream_closed;
i.readable = streambuf_stream_readable;
i.writeable = streambuf_stream_writeable;
i.timer = streambuf_stream_timer;
i.obj = &s->obj;
if (poller_add_item(listener->poller, &i))
goto fail;
if (cb->newconn_func)
cb->newconn_func(s);
mutex_lock(&listener->lock);
g_hash_table_insert(listener->streams, s, s); // hand over ref
mutex_unlock(&listener->lock);
return;
fail:
obj_put(s);
}
int streambuf_listener_init(struct streambuf_listener *listener, struct poller *p, const endpoint_t *ep,
streambuf_callback_t newconn_func,
streambuf_callback_t newdata_func,
streambuf_callback_t closed_func,
streambuf_callback_t timer_func,
struct obj *obj)
{
struct streambuf_callback *cb;
ZERO(*listener);
listener->poller = p;
mutex_init(&listener->lock);
listener->streams = g_hash_table_new(g_direct_hash, g_direct_equal);
cb = obj_alloc("streambuf_callback", sizeof(*cb), NULL);
cb->newconn_func = newconn_func;
cb->newdata_func = newdata_func;
cb->closed_func = closed_func;
cb->timer_func = timer_func;
cb->parent = obj_get_o(obj);
cb->listener = listener;
if (tcp_listener_init(&listener->listener, p, ep, streambuf_listener_newconn, &cb->obj))
goto fail;
return 0;
fail:
obj_put(cb);
return -1;
}
void streambuf_stream_close(struct streambuf_stream *s) {
streambuf_stream_closed(s->sock.fd, s, 0);
}
void streambuf_stream_shutdown(struct streambuf_stream *s) {
shutdown(s->sock.fd, SHUT_WR);
}

+ 47
- 0
daemon/tcp_listener.h View File

@ -0,0 +1,47 @@
#ifndef _TCP_LISTENER_H_
#define _TCP_LISTENER_H_
#include "socket.h"
#include "obj.h"
struct poller;
struct obj;
struct streambuf_callback;
struct streambuf_stream;
typedef void (*tcp_listener_callback_t)(struct obj *p, socket_t *sock, char *addr, socket_t *);
typedef void (*streambuf_callback_t)(struct streambuf_stream *);
struct streambuf_listener {
socket_t listener;
struct poller *poller;
mutex_t lock;
GHashTable *streams;
};
struct streambuf_stream {
struct obj obj;
socket_t sock;
struct streambuf_listener *listener;
struct streambuf_callback *cb;
struct obj *parent;
char *addr;
struct streambuf *inbuf,
*outbuf;
};
int tcp_listener_init(socket_t *, struct poller *p, const endpoint_t *, tcp_listener_callback_t, struct obj *);
int streambuf_listener_init(struct streambuf_listener *listener, struct poller *p, const endpoint_t *ep,
streambuf_callback_t newconn_func,
streambuf_callback_t newdata_func,
streambuf_callback_t closed_func,
streambuf_callback_t timer_func,
struct obj *obj);
void streambuf_stream_close(struct streambuf_stream *);
void streambuf_stream_shutdown(struct streambuf_stream *);
#endif

+ 8
- 12
daemon/udp_listener.c View File

@ -17,7 +17,7 @@
struct udp_listener_callback {
struct obj obj;
udp_listener_callback_t func;
struct udp_listener *ul;
socket_t *ul;
struct obj *p;
};
@ -31,13 +31,11 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) {
char buf[0x10000];
char addr[64];
str str;
struct udp_listener *ul;
socket_t *listener;
endpoint_t sin;
str.s = buf;
ul = cb->ul;
listener = &ul->sock;
listener = cb->ul;
for (;;) {
len = socket_recvfrom(listener, buf, sizeof(buf)-1, &sin);
@ -53,11 +51,11 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) {
endpoint_print(&sin, addr, sizeof(addr));
str.len = len;
cb->func(cb->p, &str, &sin, addr, ul);
cb->func(cb->p, &str, &sin, addr, listener);
}
}
int udp_listener_init(struct udp_listener *u, struct poller *p, const endpoint_t *ep,
int udp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep,
udp_listener_callback_t func, struct obj *obj)
{
struct poller_item i;
@ -66,15 +64,13 @@ int udp_listener_init(struct udp_listener *u, struct poller *p, const endpoint_t
cb = obj_alloc("udp_listener_callback", sizeof(*cb), NULL);
cb->func = func;
cb->p = obj_get_o(obj);
cb->ul = u;
cb->ul = sock;
if (open_socket(&u->sock, SOCK_DGRAM, ep->port, &ep->address))
if (open_socket(sock, SOCK_DGRAM, ep->port, &ep->address))
goto fail;
ipv6only(u->sock.fd, 1);
ZERO(i);
i.fd = u->sock.fd;
i.fd = sock->fd;
i.closed = udp_listener_closed;
i.readable = udp_listener_incoming;
i.obj = &cb->obj;
@ -84,7 +80,7 @@ int udp_listener_init(struct udp_listener *u, struct poller *p, const endpoint_t
return 0;
fail:
close_socket(&u->sock);
close_socket(sock);
obj_put_o(obj);
obj_put(cb);
return -1;


+ 2
- 9
daemon/udp_listener.h View File

@ -9,16 +9,9 @@
struct poller;
struct obj;
struct udp_listener;
typedef void (*udp_listener_callback_t)(struct obj *p, str *buf, const endpoint_t *ep, char *addr,
struct udp_listener *);
typedef void (*udp_listener_callback_t)(struct obj *p, str *buf, const endpoint_t *ep, char *addr, socket_t *);
struct udp_listener {
socket_t sock;
struct poller *poller;
};
int udp_listener_init(struct udp_listener *, struct poller *p, const endpoint_t *, udp_listener_callback_t, struct obj *);
int udp_listener_init(socket_t *, struct poller *p, const endpoint_t *, udp_listener_callback_t, struct obj *);
#endif

Loading…
Cancel
Save