diff --git a/daemon/call.c b/daemon/call.c index 8dd1e3ca2..3023c3068 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -83,13 +83,16 @@ static void unkernelize(struct peer *); -static void stream_closed(int fd, void *p) { - struct streamrelay *r = p; +static void stream_closed(int fd, void *p, uintptr_t u) { + struct callstream *cs = p; + struct streamrelay *r; struct call *c; int i; socklen_t j; - c = r->up->up->call; + r = &cs->peers[u >> 1].rtps[u & 1]; + assert(r->fd == fd); + c = cs->call; j = sizeof(i); getsockopt(fd, SOL_SOCKET, SO_ERROR, &i, &j); @@ -196,7 +199,7 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ if (pe->confirmed || !pe->filled || r->idx != 0) goto forward; - if (!c->lookup_done || m->poller->now <= c->lookup_done + 3) + if (!c->lookup_done || poller_now(m->poller) <= c->lookup_done + 3) goto peerinfo; mylog(LOG_DEBUG, LOG_PREFIX_C "Confirmed peer information for port %u - %s:%u", @@ -302,7 +305,7 @@ drop: r->stats.bytes += l; m->statsps.packets++; m->statsps.bytes += l; - r->last = m->poller->now; + r->last = poller_now(m->poller); return 0; } @@ -310,8 +313,9 @@ drop: -static void stream_readable(int fd, void *p) { - struct streamrelay *r = p; +static void stream_readable(int fd, void *p, uintptr_t u) { + struct callstream *cs = p; + struct streamrelay *r; char buf[8192]; int ret; struct sockaddr_storage ss; @@ -320,6 +324,9 @@ static void stream_readable(int fd, void *p) { unsigned int sinlen; void *sinp; + r = &cs->peers[u >> 1].rtps[u & 1]; + assert(r->fd == fd); + for (;;) { sinlen = sizeof(ss); ret = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *) &ss, &sinlen); @@ -327,7 +334,7 @@ static void stream_readable(int fd, void *p) { if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) break; - stream_closed(fd, r); + stream_closed(fd, r, 0); break; } if (ret >= sizeof(buf)) @@ -467,7 +474,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { else if (IN6_IS_ADDR_UNSPECIFIED(&sr->peer.ip46)) check = cm->silent_timeout; - if (po->now - sr->last < check) + if (poller_now(po) - sr->last < check) goto good; } } @@ -568,7 +575,7 @@ static void callmaster_timer(void *ptr) { DS(errors); if (ke->stats.packets != sr->kstats.packets) - sr->last = po->now; + sr->last = poller_now(po); sr->kstats.packets = ke->stats.packets; sr->kstats.bytes = ke->stats.bytes; @@ -595,19 +602,20 @@ next: struct callmaster *callmaster_new(struct poller *p) { struct callmaster *c; - c = g_slice_alloc0(sizeof(*c)); + c = obj_alloc0("callmaster", sizeof(*c), NULL); c->callhash = g_hash_table_new(g_str_hash, g_str_equal); if (!c->callhash) goto fail; c->poller = p; - poller_timer(p, callmaster_timer, c); + poller_timer(p, callmaster_timer, &c->obj); + obj_put(&c->obj); return c; fail: - g_slice_free1(sizeof(*c), c); + obj_put(&c->obj); return NULL; } @@ -876,7 +884,8 @@ static void steal_peer(struct peer *dest, struct peer *src) { srs->peer_advertised.port = 0; pi.fd = sr->fd; - pi.ptr = sr; + pi.obj = &sr->up->up->obj; + pi.uintp = i | (dest->idx << 1); pi.readable = stream_readable; pi.closed = stream_closed; @@ -894,10 +903,9 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 po = ca->callmaster->poller; - ZERO(*s); ZERO(pi); - s->call = ca; + s->call = obj_get(&ca->obj); DBG("setting new callstream num to %i", num); s->num = num; @@ -915,7 +923,7 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 r->fd = -1; r->idx = j; r->up = p; - r->last = po->now; + r->last = poller_now(po); } tport = (i == 0) ? port1 : port2; @@ -927,7 +935,8 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 r = &p->rtps[j]; pi.fd = r->fd; - pi.ptr = r; + pi.obj = &s->obj; + pi.uintp = (i << 1) | j; pi.readable = stream_readable; pi.closed = stream_closed; @@ -939,6 +948,31 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 +static void callstream_free(void *ptr) { + struct callstream *s = ptr; + int i, j; + struct peer *p; + struct streamrelay *r; + + for (i = 0; i < 2; i++) { + p = &s->peers[i]; + + free(p->tag); + free(p->mediatype); + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; + + if (r->fd != -1) { + close(r->fd); + bit_array_clear(ports_used, r->localport); + r->fd = -1; + } + } + } + obj_put(&s->call->obj); +} + static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { GQueue *q; GList *i, *l; @@ -989,7 +1023,7 @@ found: if (!opmode) { /* request */ DBG("creating new callstream"); - cs = g_slice_alloc(sizeof(*cs)); + cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); if (!r) { /* nothing found to re-use, open new ports */ @@ -1012,7 +1046,7 @@ found: } } - g_queue_push_tail(q, cs); + g_queue_push_tail(q, cs); /* hand over the ref */ ZERO(c->lookup_done); continue; } @@ -1070,12 +1104,12 @@ got_cs: /* need a new call stream after all */ DBG("case 4"); cs_o = cs; - cs = g_slice_alloc(sizeof(*cs)); + cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); callstream_init(cs, c, 0, 0, t->num); steal_peer(&cs->peers[0], &cs_o->peers[0]); p = &cs->peers[1]; setup_peer(p, t, tag); - g_queue_push_tail(c->callstreams, cs_o); + g_queue_push_tail(c->callstreams, cs_o); /* hand over ref XXX? */ } time(&c->lookup_done); @@ -1136,39 +1170,24 @@ static void kill_callstream(struct callstream *s) { unkernelize(p); - free(p->tag); - free(p->mediatype); - for (j = 0; j < 2; j++) { r = &p->rtps[j]; - if (r->fd != -1) { + if (r->fd != -1) poller_del_item(s->call->callmaster->poller, r->fd); - close(r->fd); - bit_array_clear(ports_used, r->localport); - } } } - - g_slice_free1(sizeof(*s), s); } static void call_destroy(struct call *c) { struct callmaster *m = c->callmaster; struct callstream *s; - g_hash_table_remove(m->callhash, c->callid); + g_hash_table_remove(m->callhash, c->callid); /* steal this ref */ if (redis_delete) redis_delete(c); - g_hash_table_destroy(c->infohash); - g_hash_table_destroy(c->branches); - if (c->calling_agent) - free(c->calling_agent); - if (c->called_agent) - free(c->called_agent); - mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); while (c->callstreams->head) { s = g_queue_pop_head(c->callstreams); @@ -1190,11 +1209,10 @@ static void call_destroy(struct call *c) { s->peers[1].rtps[1].localport, s->peers[1].rtps[1].stats.packets, s->peers[1].rtps[1].stats.bytes, s->peers[1].rtps[1].stats.errors); kill_callstream(s); + obj_put(&s->obj); } - g_queue_free(c->callstreams); - free(c->callid); - g_slice_free1(sizeof(*c), c); + obj_put(&c->obj); } @@ -1280,16 +1298,29 @@ static guint g_str_hash0(gconstpointer v) { return g_str_hash(v); } +static void call_free(void *p) { + struct call *c = p; + + g_hash_table_destroy(c->infohash); + g_hash_table_destroy(c->branches); + if (c->calling_agent) + free(c->calling_agent); + if (c->called_agent) + free(c->called_agent); + g_queue_free(c->callstreams); + free(c->callid); +} + static struct call *call_create(const char *callid, struct callmaster *m) { struct call *c; mylog(LOG_NOTICE, LOG_PREFIX_C "Creating new call", callid); /* XXX will spam syslog on recovery from DB */ - c = g_slice_alloc0(sizeof(*c)); + c = obj_alloc0("call", sizeof(*c), call_free); c->callmaster = m; c->callid = strdup(callid); c->callstreams = g_queue_new(); - c->created = m->poller->now; + c->created = poller_now(m->poller); c->infohash = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); c->branches = g_hash_table_new_full(g_str_hash0, g_str_equal0, free, NULL); return c; @@ -1302,8 +1333,10 @@ struct call *call_get_or_create(const char *callid, const char *viabranch, struc if (!c) { /* completely new call-id, create call */ c = call_create(callid, m); - g_hash_table_insert(m->callhash, c->callid, c); + g_hash_table_insert(m->callhash, c->callid, obj_get(&c->obj)); } + else + obj_hold(&c->obj); if (viabranch && !g_hash_table_lookup(c->branches, viabranch)) g_hash_table_insert(c->branches, strdup(viabranch), (void *) 0x1); @@ -1382,12 +1415,14 @@ char *call_update_udp(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); c->log_info = NULL; + obj_put(&c->obj); return ret; fail: mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); c->log_info = NULL; + obj_put(&c->obj); return ret; } @@ -1405,6 +1440,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) { asprintf(&ret, "%s 0 " IPF "\n", out[RE_UDP_COOKIE], IPP(m->ipv4)); return ret; } + obj_hold(&c->obj); c->log_info = out[RE_UDP_UL_CALLID]; strdupfree(&c->called_agent, "UNKNOWN(udp)"); @@ -1423,12 +1459,14 @@ char *call_lookup_udp(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); c->log_info = NULL; + obj_put(&c->obj); return ret; fail: mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); c->log_info = NULL; + obj_put(&c->obj); return ret; } @@ -1451,6 +1489,7 @@ char *call_request(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); + obj_put(&c->obj); return ret; } @@ -1466,6 +1505,8 @@ char *call_lookup(const char **out, struct callmaster *m) { return NULL; } + obj_hold(&c->obj); + strdupfree(&c->called_agent, out[RE_TCP_RL_AGENT] ? : "UNKNOWN"); info_parse(out[RE_TCP_RL_INFO], &c->infohash); s = streams_parse(out[RE_TCP_RL_STREAMS]); @@ -1477,6 +1518,7 @@ char *call_lookup(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); + obj_put(&c->obj); return ret; } @@ -1496,6 +1538,7 @@ char *call_delete_udp(const char **out, struct callmaster *m) { mylog(LOG_INFO, LOG_PREFIX_C "Call-ID to delete not found", out[RE_UDP_D_CALLID]); goto err; } + obj_hold(&c->obj); c->log_info = out[RE_UDP_D_VIABRANCH]; if (out[RE_UDP_D_FROMTAG] && *out[RE_UDP_D_FROMTAG]) { @@ -1552,8 +1595,10 @@ err: goto out; out: - if (c) + if (c) { c->log_info = NULL; + obj_put(&c->obj); + } return ret; } @@ -1564,8 +1609,10 @@ void call_delete(const char **out, struct callmaster *m) { if (!c) return; + obj_hold(&c->obj); /* delete whole list, as we don't have branches in tcp controller */ call_destroy(c); + obj_put(&c->obj); } @@ -1590,7 +1637,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { (char *) g_hash_table_lookup(c->infohash, "from"), (char *) g_hash_table_lookup(c->infohash, "to"), c->calling_agent, c->called_agent, - (int) (m->poller->now - c->created)); + (int) (poller_now(m->poller) - c->created)); for (l = c->callstreams->head; l; l = l->next) { cs = l->data; @@ -1620,7 +1667,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { (long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes, "active", p->codec ? : "unknown", - p->mediatype, (int) (m->poller->now - r1->last)); + p->mediatype, (int) (poller_now(m->poller) - r1->last)); } } diff --git a/daemon/call.h b/daemon/call.h index c41dff302..d1f7c4e07 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -10,6 +10,7 @@ #include "control.h" #include "control_udp.h" +#include "obj.h" struct poller; struct control_stream; @@ -67,12 +68,15 @@ struct peer { int confirmed:1; }; struct callstream { + struct obj obj; struct peer peers[2]; struct call *call; int num; }; struct call { + struct obj obj; + struct callmaster *callmaster; GQueue *callstreams; @@ -90,6 +94,8 @@ struct call { }; struct callmaster { + struct obj obj; + GHashTable *callhash; u_int16_t lastport; struct stats statsps; diff --git a/daemon/control.c b/daemon/control.c index eb819bad0..c61aef78a 100644 --- a/daemon/control.c +++ b/daemon/control.c @@ -12,10 +12,13 @@ #include "log.h" #include "call.h" + + + static pcre *parse_re; static pcre_extra *parse_ree; -static void control_stream_closed(int fd, void *p) { +static void control_stream_closed(int fd, void *p, uintptr_t u) { struct control_stream *s = p; struct control *c; @@ -23,23 +26,22 @@ static void control_stream_closed(int fd, void *p) { c = s->control; - c->stream_head = g_list_remove_link(c->stream_head, &s->link); + c->streams = g_list_remove(c->streams, s); + obj_put(&s->obj); if (poller_del_item(s->poller, fd)) abort(); - close(fd); - - streambuf_destroy(s->inbuf); - streambuf_destroy(s->outbuf); - free(s); } static void control_list(struct control *c, struct control_stream *s) { struct control_stream *i; + GList *l; - for (i = (void *) c->stream_head; i; i = (void *) i->link.next) - streambuf_printf(s->outbuf, DF "\n", DP(s->inaddr)); + for (l = c->streams; l; l = l->next) { + i = l->data; + streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr)); + } streambuf_printf(s->outbuf, "End.\n"); } @@ -98,16 +100,16 @@ static int control_stream_parse(struct control_stream *s, char *line) { } -static void control_stream_timer(int fd, void *p) { +static void control_stream_timer(int fd, void *p, uintptr_t u) { struct control_stream *s = p; struct poller *o = s->poller; - if ((o->now - s->inbuf->active) >= 60 || (o->now - s->outbuf->active) >= 60) - control_stream_closed(s->fd, s); + if ((poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60) + control_stream_closed(s->fd, s, 0); } -static void control_stream_readable(int fd, void *p) { +static void control_stream_readable(int fd, void *p, uintptr_t u) { struct control_stream *s = p; char *line; int ret; @@ -131,21 +133,29 @@ static void control_stream_readable(int fd, void *p) { return; close: - control_stream_closed(fd, s); + control_stream_closed(fd, s, 0); } -static void control_stream_writeable(int fd, void *p) { +static void control_stream_writeable(int fd, void *p, uintptr_t u) { struct control_stream *s = p; if (streambuf_writeable(s->outbuf)) - control_stream_closed(fd, s); + control_stream_closed(fd, s, 0); } -static void control_closed(int fd, void *p) { +static void control_closed(int fd, void *p, uintptr_t u) { abort(); } -static void control_incoming(int fd, void *p) { +static void control_stream_free(void *p) { + struct control_stream *s = p; + + close(s->fd); + streambuf_destroy(s->inbuf); + streambuf_destroy(s->outbuf); +} + +static void control_incoming(int fd, void *p, uintptr_t u) { int nfd; struct control *c = p; struct control_stream *s; @@ -161,8 +171,14 @@ static void control_incoming(int fd, void *p) { mylog(LOG_INFO, "New control connection from " DF, DP(sin)); - s = malloc(sizeof(*s)); - ZERO(*s); + s = obj_alloc0("control_stream", sizeof(*s), control_stream_free); + + s->fd = nfd; + s->control = c; + s->poller = c->poller; + s->inbuf = streambuf_new(c->poller, nfd); + s->outbuf = streambuf_new(c->poller, nfd); + memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); ZERO(i); i.fd = nfd; @@ -170,23 +186,18 @@ static void control_incoming(int fd, void *p) { i.readable = control_stream_readable; i.writeable = control_stream_writeable; i.timer = control_stream_timer; - i.ptr = s; + i.obj = &s->obj; + if (poller_add_item(c->poller, &i)) goto fail; - s->fd = nfd; - s->control = c; - s->poller = c->poller; - s->inbuf = streambuf_new(c->poller, nfd); - s->outbuf = streambuf_new(c->poller, nfd); - memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); - c->stream_head = g_list_link(c->stream_head, &s->link); + /* let the list steal our own ref */ + c->streams = g_list_prepend(c->streams, s); return; fail: - free(s); - close(nfd); + obj_put(&s->obj); } @@ -219,8 +230,7 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru goto fail; - c = malloc(sizeof(*c)); - ZERO(*c); + c = obj_alloc0("control", sizeof(*c), NULL); c->fd = fd; c->poller = p; @@ -230,14 +240,15 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru i.fd = fd; i.closed = control_closed; i.readable = control_incoming; - i.ptr = c; + i.obj = &c->obj; if (poller_add_item(p, &i)) goto fail2; + obj_put(&c->obj); return c; fail2: - free(c); + obj_put(&c->obj); fail: close(fd); return NULL; diff --git a/daemon/control.h b/daemon/control.h index 0f83119b8..623ac04a4 100644 --- a/daemon/control.h +++ b/daemon/control.h @@ -9,6 +9,9 @@ #include #include +#include "obj.h" + + #define RE_TCP_RL_CMD 1 #define RE_TCP_RL_CALLID 2 #define RE_TCP_RL_STREAMS 3 @@ -31,8 +34,9 @@ struct callmaster; + struct control_stream { - GList link; /* must be first */ + struct obj obj; int fd; struct streambuf *inbuf; @@ -45,15 +49,18 @@ struct control_stream { struct control { + struct obj obj; + int fd; - GList *stream_head; + GList *streams; struct poller *poller; struct callmaster *callmaster; }; + struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *); diff --git a/daemon/control_udp.c b/daemon/control_udp.c index 31ec331ad..6f6d5b3b5 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -15,11 +15,11 @@ #include "call.h" -static void control_udp_closed(int fd, void *p) { +static void control_udp_closed(int fd, void *p, uintptr_t x) { abort(); } -static void control_udp_incoming(int fd, void *p) { +static void control_udp_incoming(int fd, void *p, uintptr_t x) { struct control_udp *u = p; int ret, len; char buf[8192]; @@ -87,7 +87,7 @@ static void control_udp_incoming(int fd, void *p) { pcre_get_substring_list(buf, ovec, ret, &out); - if (u->poller->now - u->oven_time >= 30) { + if (poller_now(u->poller) - u->oven_time >= 30) { g_hash_table_remove_all(u->stale_cookies); #if GLIB_CHECK_VERSION(2,14,0) g_string_chunk_clear(u->stale_chunks); @@ -98,7 +98,7 @@ static void control_udp_incoming(int fd, void *p) { u->fresh_chunks = g_string_chunk_new(4 * 1024); #endif swap_ptrs(&u->stale_cookies, &u->fresh_cookies); - u->oven_time = u->poller->now; /* baked new cookies! */ + u->oven_time = poller_now(u->poller); /* baked new cookies! */ } /* XXX better hashing */ @@ -187,8 +187,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 goto fail; - c = malloc(sizeof(*c)); - ZERO(*c); + c = obj_alloc0("control_udp", sizeof(*c), NULL); c->fd = fd; c->poller = p; @@ -197,7 +196,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 c->stale_cookies = g_hash_table_new(g_str_hash, g_str_equal); c->fresh_chunks = g_string_chunk_new(4 * 1024); c->stale_chunks = g_string_chunk_new(4 * 1024); - c->oven_time = p->now; + c->oven_time = poller_now(p); c->parse_re = pcre_compile( /* cookie cmd flags callid viabranch:5 */ "^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \ @@ -221,7 +220,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 i.fd = fd; i.closed = control_udp_closed; i.readable = control_udp_incoming; - i.ptr = c; + i.obj = &c->obj; if (poller_add_item(p, &i)) goto fail2; diff --git a/daemon/control_udp.h b/daemon/control_udp.h index 815121872..5aae4183a 100644 --- a/daemon/control_udp.h +++ b/daemon/control_udp.h @@ -9,6 +9,9 @@ #include #include #include +#include "obj.h" + + #define RE_UDP_COOKIE 1 #define RE_UDP_UL_CMD 2 @@ -39,6 +42,8 @@ struct callmaster; struct control_udp { + struct obj obj; + int fd; struct poller *poller; diff --git a/daemon/obj.h b/daemon/obj.h new file mode 100644 index 000000000..37fd45172 --- /dev/null +++ b/daemon/obj.h @@ -0,0 +1,157 @@ +#ifndef _OBJ_H_ +#define _OBJ_H_ + + + +#include +#include +#include +#include +#include + +#include "log.h" + + + + +#define OBJ_DEBUG 1 + + + + +struct obj { +#if OBJ_DEBUG + u_int32_t magic; + char *type; +#endif + volatile gint ref; + void (*free_func)(void *); + unsigned int size; +}; + + + + + +#if OBJ_DEBUG + +#define OBJ_MAGIC 0xf1eef1ee + +#define obj_alloc(t,a,b) __obj_alloc(a,b,t,__FILE__,__LINE__) +#define obj_alloc0(t,a,b) __obj_alloc0(a,b,t,__FILE__,__LINE__) +#define obj_hold(a) __obj_hold(a,__FILE__,__LINE__) +#define obj_get(a) __obj_get(a,__FILE__,__LINE__) +#define obj_put(a) __obj_put(a,__FILE__,__LINE__) + +#else + +#define obj_alloc(t,a,b) __obj_alloc(a,b) +#define obj_alloc0(t,a,b) __obj_alloc0(a,b) +#define obj_hold(a) __obj_hold(a) +#define obj_get(a) __obj_get(a) +#define obj_put(a) __obj_put(a) + +#endif + +static inline void __obj_init(struct obj *o, unsigned int size, void (*free_func)(void *) +#if OBJ_DEBUG +, const char *type, const char *file, unsigned int line +#endif +) { +#if OBJ_DEBUG + o->magic = OBJ_MAGIC; + o->type = strdup(type); + mylog(LOG_DEBUG, "obj_allocX(\"%s\", size %u) -> %p [%s:%u]", type, size, o, file, line); +#endif + o->ref = 1; + o->free_func = free_func; + o->size = size; +} + +static inline void *__obj_alloc(unsigned int size, void (*free_func)(void *) +#if OBJ_DEBUG +, const char *type, const char *file, unsigned int line +#endif +) { + struct obj *r; + + r = g_slice_alloc(size); + __obj_init(r, size, free_func +#if OBJ_DEBUG + , type, file, line +#endif + ); + return r; +} + +static inline void *__obj_alloc0(unsigned int size, void (*free_func)(void *) +#if OBJ_DEBUG +, const char *type, const char *file, unsigned int line +#endif +) { + struct obj *r; + + r = g_slice_alloc0(size); + __obj_init(r, size, free_func +#if OBJ_DEBUG + , type, file, line +#endif + ); + return r; +} + +static inline struct obj *__obj_hold(struct obj *o +#if OBJ_DEBUG +, const char *file, unsigned int line +#endif +) { +#if OBJ_DEBUG + assert(o->magic == OBJ_MAGIC); + mylog(LOG_DEBUG, "obj_hold(%p, \"%s\", size %u), refcnt before %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); +#endif + g_atomic_int_inc(&o->ref); +#if OBJ_DEBUG + mylog(LOG_DEBUG, "obj_hold(%p, \"%s\", size %u), refcnt after %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); +#endif + return o; +} + +static inline void *__obj_get(struct obj *o +#if OBJ_DEBUG +, const char *file, unsigned int line +#endif +) { + return __obj_hold(o +#if OBJ_DEBUG + , file, line +#endif + ); +} + +static inline void __obj_put(struct obj *o +#if OBJ_DEBUG +, const char *file, unsigned int line +#endif +) { +#if OBJ_DEBUG + assert(o->magic == OBJ_MAGIC); + mylog(LOG_DEBUG, "obj_put(%p, \"%s\", size %u), refcnt before %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); +#endif + if (!g_atomic_int_dec_and_test(&o->ref)) + return; +#if OBJ_DEBUG + mylog(LOG_DEBUG, "obj_put(%p, \"%s\", size %u), refcnt after %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); + free(o->type); +#endif + if (o->free_func) + o->free_func(o); + g_slice_free1(o->size, o); +} + + + +#endif diff --git a/daemon/poller.c b/daemon/poller.c index b5525e672..fcd0d4e9e 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -12,15 +12,37 @@ #include "poller.h" #include "aux.h" +#include "obj.h" struct timer_item { - void (*func)(void *); - void *ptr; + struct obj obj; + void (*func)(void *); + struct obj *obj_ptr; }; +struct poller_item_int { + struct obj obj; + struct poller_item item; + + int blocked:1; + int error:1; +}; + +struct poller { + int fd; + GMutex lock; + struct poller_item_int **items; + unsigned int items_size; + GList *timers; + + time_t now; +}; + + + @@ -34,42 +56,58 @@ struct poller *poller_new(void) { p->fd = epoll_create1(0); if (p->fd == -1) abort(); + g_mutex_init(&p->lock); return p; } -static int epoll_events(struct poller_item *i) { - return EPOLLHUP | EPOLLERR | ((i->writeable && i->blocked) ? EPOLLOUT : 0) | (i->readable ? EPOLLIN : 0); +static int epoll_events(struct poller_item *it, struct poller_item_int *ii) { + if (!it) + it = &ii->item; + return EPOLLHUP | EPOLLERR | + ((it->writeable && ii && ii->blocked) ? EPOLLOUT : 0) | + (it->readable ? EPOLLIN : 0); } static void poller_fd_timer(void *p) { - struct poller_item *it = p; + struct poller_item_int *it = p; - if (it->timer) - it->timer(it->fd, it->ptr); + if (it->item.timer) + it->item.timer(it->item.fd, it->item.obj, it->item.uintp); } -int poller_add_item(struct poller *p, struct poller_item *i) { - struct poller_item *ip; +static void poller_item_free(void *p) { + struct poller_item_int *i = p; + obj_put(i->item.obj); +} + + +/* unlocks on return */ +static int __poller_add_item(struct poller *p, struct poller_item *i, int has_lock) { + struct poller_item_int *ip; unsigned int u; struct epoll_event e; if (!p || !i) - return -1; + goto fail_lock; if (i->fd < 0) - return -1; + goto fail_lock; if (!i->readable && !i->writeable) - return -1; + goto fail_lock; if (!i->closed) - return -1; + goto fail_lock; + + if (!has_lock) + g_mutex_lock(&p->lock); if (i->fd < p->items_size && p->items[i->fd]) - return -1; + goto fail; - e.events = epoll_events(i); + ZERO(e); + e.events = epoll_events(i, NULL); e.data.fd = i->fd; if (epoll_ctl(p->fd, EPOLL_CTL_ADD, i->fd, &e)) abort(); @@ -81,58 +119,92 @@ int poller_add_item(struct poller *p, struct poller_item *i) { memset(p->items + u, 0, sizeof(*p->items) * (p->items_size - u - 1)); } - ip = g_slice_alloc(sizeof(*ip)); - memcpy(ip, i, sizeof(*ip)); - p->items[i->fd] = ip; + ip = obj_alloc0("poller_item_int", sizeof(*ip), poller_item_free); + memcpy(&ip->item, i, sizeof(*i)); + obj_hold(ip->item.obj); /* new ref in *ip */ + p->items[i->fd] = obj_get(&ip->obj); + + g_mutex_unlock(&p->lock); if (i->timer) - poller_timer(p, poller_fd_timer, ip); + poller_timer(p, poller_fd_timer, &ip->obj); + + obj_put(&ip->obj); return 0; + +fail: + g_mutex_unlock(&p->lock); + return -1; +fail_lock: + if (has_lock) + g_mutex_unlock(&p->lock); + return -1; +} + + +int poller_add_item(struct poller *p, struct poller_item *i) { + return __poller_add_item(p, i, 0); } int poller_find_timer(gconstpointer a, gconstpointer b) { const struct timer_item *it = a; - const struct poller_item *x = b; + const struct obj *x = b; - if (it->ptr == x) + if (it->obj_ptr == x) return 0; return 1; } int poller_del_item(struct poller *p, int fd) { - struct poller_item *it; + struct poller_item_int *it; GList *l; + struct timer_item *ti; if (!p || fd < 0) return -1; + + g_mutex_lock(&p->lock); + if (fd >= p->items_size) - return -1; + goto fail; if (!p->items || !(it = p->items[fd])) - return -1; + goto fail; if (epoll_ctl(p->fd, EPOLL_CTL_DEL, fd, NULL)) abort(); - if (it->timer) { - l = g_list_find_custom(p->timers, it, poller_find_timer); - if (!l) - abort(); - g_slice_free1(sizeof(struct timer_item), l->data); - p->timers = g_list_delete_link(p->timers, l); + p->items[fd] = NULL; /* stealing the ref */ + + g_mutex_unlock(&p->lock); + + if (it->item.timer) { + while (1) { + /* rare but possible race with poller_add_item above */ + l = g_list_find_custom(p->timers, &it->obj, poller_find_timer); + if (l) + break; + } + p->timers = g_list_remove_link(p->timers, l); + ti = l->data; + obj_put(&ti->obj); + g_list_free_1(l); } - g_slice_free1(sizeof(*it), it); - p->items[fd] = NULL; + obj_put(&it->obj); return 0; + +fail: + g_mutex_unlock(&p->lock); + return -1; } int poller_update_item(struct poller *p, struct poller_item *i) { - struct poller_item *np; + struct poller_item_int *np; if (!p || !i) return -1; @@ -143,14 +215,21 @@ int poller_update_item(struct poller *p, struct poller_item *i) { if (!i->closed) return -1; + g_mutex_lock(&p->lock); + if (i->fd >= p->items_size || !(np = p->items[i->fd])) - return poller_add_item(p, i); + return __poller_add_item(p, i, 1); + + obj_hold(i->obj); + obj_put(np->item.obj); + np->item.obj = i->obj; + np->item.uintp = i->uintp; + np->item.readable = i->readable; + np->item.writeable = i->writeable; + np->item.closed = i->closed; + /* updating timer is not supported */ - np->ptr = i->ptr; - np->readable = i->readable; - np->writeable = i->writeable; - np->closed = i->closed; - np->timer = i->timer; + g_mutex_unlock(&p->lock); return 0; } @@ -158,7 +237,7 @@ int poller_update_item(struct poller *p, struct poller_item *i) { int poller_poll(struct poller *p, int timeout) { int ret, i; - struct poller_item *it; + struct poller_item_int *it; time_t last; GList *li, *ne; struct timer_item *ti; @@ -166,8 +245,12 @@ int poller_poll(struct poller *p, int timeout) { if (!p) return -1; + + g_mutex_lock(&p->lock); + + ret = -1; if (!p->items || !p->items_size) - return -1; + goto out; last = p->now; p->now = time(NULL); @@ -175,13 +258,19 @@ int poller_poll(struct poller *p, int timeout) { for (li = p->timers; li; li = ne) { ne = li->next; ti = li->data; - ti->func(ti->ptr); + /* XXX not safe */ + g_mutex_unlock(&p->lock); + ti->func(ti->obj_ptr); + g_mutex_lock(&p->lock); } - return p->items_size; + ret = p->items_size; + goto out; } + g_mutex_unlock(&p->lock); errno = 0; ret = epoll_wait(p->fd, evs, sizeof(evs) / sizeof(*evs), timeout); + g_mutex_lock(&p->lock); if (errno == EINTR) ret = 0; @@ -198,33 +287,44 @@ int poller_poll(struct poller *p, int timeout) { if (!it) continue; + obj_hold(&it->obj); + g_mutex_unlock(&p->lock); + if (it->error) { - it->closed(it->fd, it->ptr); - continue; + it->item.closed(it->item.fd, it->item.obj, it->item.uintp); + goto next; } if ((ev->events & (POLLERR | POLLHUP))) - it->closed(it->fd, it->ptr); + it->item.closed(it->item.fd, it->item.obj, it->item.uintp); else if ((ev->events & POLLOUT)) { + g_mutex_lock(&p->lock); it->blocked = 0; - e.events = epoll_events(it); - e.data.fd = it->fd; - if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->fd, &e)) + ZERO(e); + e.events = epoll_events(NULL, it); + e.data.fd = it->item.fd; + if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->item.fd, &e)) abort(); - it->writeable(it->fd, it->ptr); + g_mutex_unlock(&p->lock); + it->item.writeable(it->item.fd, it->item.obj, it->item.uintp); } else if ((ev->events & POLLIN)) - it->readable(it->fd, it->ptr); + it->item.readable(it->item.fd, it->item.obj, it->item.uintp); else if (!ev->events) - continue; + goto next; else abort(); + +next: + obj_put(&it->obj); + g_mutex_lock(&p->lock); } out: + g_mutex_unlock(&p->lock); return ret; } @@ -234,63 +334,94 @@ void poller_blocked(struct poller *p, int fd) { if (!p || fd < 0) return; + + g_mutex_lock(&p->lock); + if (fd >= p->items_size) - return; + goto fail; if (!p->items || !p->items[fd]) - return; - if (!p->items[fd]->writeable) - return; + goto fail; + if (!p->items[fd]->item.writeable) + goto fail; p->items[fd]->blocked = 1; - e.events = epoll_events(p->items[fd]); + ZERO(e); + e.events = epoll_events(NULL, p->items[fd]); e.data.fd = fd; if (epoll_ctl(p->fd, EPOLL_CTL_MOD, fd, &e)) abort(); + +fail: + g_mutex_unlock(&p->lock); } void poller_error(struct poller *p, int fd) { if (!p || fd < 0) return; + + g_mutex_lock(&p->lock); + if (fd >= p->items_size) - return; + goto fail; if (!p->items || !p->items[fd]) - return; - if (!p->items[fd]->writeable) - return; + goto fail; + if (!p->items[fd]->item.writeable) + goto fail; p->items[fd]->error = 1; p->items[fd]->blocked = 1; + +fail: + g_mutex_unlock(&p->lock); } int poller_isblocked(struct poller *p, int fd) { + int ret; + if (!p || fd < 0) return -1; + + g_mutex_lock(&p->lock); + + ret = -1; if (fd >= p->items_size) - return -1; + goto out; if (!p->items || !p->items[fd]) - return -1; - if (!p->items[fd]->writeable) - return -1; + goto out; + if (!p->items[fd]->item.writeable) + goto out; - return p->items[fd]->blocked; + ret = p->items[fd]->blocked ? 1 : 0; + +out: + g_mutex_unlock(&p->lock); + return ret; } +static void timer_item_free(void *p) { + struct timer_item *i = p; + obj_put(i->obj_ptr); +} -int poller_timer(struct poller *p, void (*f)(void *), void *ptr) { +int poller_timer(struct poller *p, void (*f)(void *), struct obj *o) { struct timer_item *i; - if (!p || !f) + if (!o || !f) return -1; - i = g_slice_alloc0(sizeof(*i)); + i = obj_alloc0("timer_item", sizeof(*i), timer_item_free); i->func = f; - i->ptr = ptr; + i->obj_ptr = obj_hold(o); p->timers = g_list_prepend(p->timers, i); return 0; } + +time_t poller_now(struct poller *p) { + return p->now; +} diff --git a/daemon/poller.h b/daemon/poller.h index 01b79d359..ca0386b63 100644 --- a/daemon/poller.h +++ b/daemon/poller.h @@ -4,32 +4,27 @@ #include +#include #include #include +#include "obj.h" struct poller_item { int fd; - void *ptr; + struct obj *obj; + uintptr_t uintp; - void (*readable)(int, void *); - void (*writeable)(int, void *); - void (*closed)(int, void *); - void (*timer)(int, void *); - - int blocked:1; - int error:1; + void (*readable)(int, void *, uintptr_t); + void (*writeable)(int, void *, uintptr_t); + void (*closed)(int, void *, uintptr_t); + void (*timer)(int, void *, uintptr_t); }; -struct poller { - int fd; - struct poller_item **items; - unsigned int items_size; - GList *timers; +struct poller; + - time_t now; -}; struct poller *poller_new(void); @@ -40,8 +35,9 @@ int poller_poll(struct poller *, int); void poller_blocked(struct poller *, int); int poller_isblocked(struct poller *, int); void poller_error(struct poller *, int); +time_t poller_now(struct poller *); -int poller_timer(struct poller *, void (*)(void *), void *); +int poller_timer(struct poller *, void (*)(void *), struct obj *); #endif diff --git a/daemon/streambuf.c b/daemon/streambuf.c index ccb55c0fe..4d8f86787 100644 --- a/daemon/streambuf.c +++ b/daemon/streambuf.c @@ -21,7 +21,7 @@ struct streambuf *streambuf_new(struct poller *p, int fd) { b->buf = g_string_new(""); b->fd = fd; b->poller = p; - b->active = p->now; + b->active = poller_now(p); return b; } @@ -52,7 +52,7 @@ int streambuf_writeable(struct streambuf *b) { if (ret > 0) { g_string_erase(b->buf, 0, ret); - b->active = b->poller->now; + b->active = poller_now(b->poller); } if (ret != out) { @@ -80,7 +80,7 @@ int streambuf_readable(struct streambuf *b) { } g_string_append_len(b->buf, buf, ret); - b->active = b->poller->now; + b->active = poller_now(b->poller); } return 0; @@ -162,7 +162,7 @@ void streambuf_write(struct streambuf *b, char *s, unsigned int len) { s += ret; len -= ret; - b->active = b->poller->now; + b->active = poller_now(b->poller); } if (b->buf->len > 5242880)