diff --git a/lib/auxlib.h b/lib/auxlib.h index ca173b326..16222568a 100644 --- a/lib/auxlib.h +++ b/lib/auxlib.h @@ -641,8 +641,19 @@ INLINE double atomic64_div(const atomic64 *n, const atomic64 *d) { } #define atomic_get_na(x) __atomic_load_n(x, __ATOMIC_RELAXED) +#define atomic_get(x) __atomic_load_n(x, __ATOMIC_SEQ_CST) #define atomic_set_na(x,y) __atomic_store_n(x, y, __ATOMIC_RELAXED) -#define atomic_inc_na(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED); +#define atomic_set(x,y) __atomic_store_n(x, y, __ATOMIC_SEQ_CST) +#define atomic_inc_na(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED) +#define atomic_inc(x) __atomic_fetch_add(x, 1, __ATOMIC_SEQ_CST) +#define atomic_dec(x) __atomic_fetch_sub(x, 1, __ATOMIC_SEQ_CST) +#define atomic_add(x,y) __atomic_fetch_add(x, y, __ATOMIC_SEQ_CST) +#define atomic_add_na(x,y) __atomic_fetch_add(x, y, __ATOMIC_RELAXED) +#define atomic_sub(x,y) __atomic_fetch_sub(x, y, __ATOMIC_SEQ_CST) +#define atomic_sub_na(x,y) __atomic_fetch_sub(x, y, __ATOMIC_RELAXED) +#define atomic_exchange(x,y) __atomic_exchange_n(x, y, __ATOMIC_SEQ_CST) +#define atomic_exchange_na(x,y) __atomic_exchange_n(x, y, __ATOMIC_RELAXED) +#define atomic_compare_exchange(x,y) __atomic_exchange_n(x, y, __ATOMIC_SEQ_CST) /*** ATOMIC BITFIELD OPERATIONS ***/ diff --git a/lib/bufferpool.c b/lib/bufferpool.c index b54addbb6..62d23bf4d 100644 --- a/lib/bufferpool.c +++ b/lib/bufferpool.c @@ -6,15 +6,18 @@ static_assert((BUFFERPOOL_SHARD_SIZE & (BUFFERPOOL_SHARD_SIZE - 1)) == 0, "BUFFERPOOL_SHARD_SIZE is not a power of two"); -TYPED_GQUEUE(shard, struct bpool_shard) +struct bpool_shard; struct bufferpool { void *(*alloc)(void); void (*dealloc)(void *); - mutex_t lock; - shard_q empty_shards; - shard_q full_shards; - bool destroy; + unsigned int refs; // sum of all refs from shards, plus the handle itself + + rwlock_t shards_lock; + struct bpool_shard **shards; + unsigned int num_shards; + unsigned int max_shards; + unsigned int empty_shard_idx; }; struct bpool_shard { @@ -24,35 +27,37 @@ struct bpool_shard { void *empty; // head of usable buffer, head == empty if empty void *end; void *head; - bool full; - shard_list link; unsigned int (*recycle)(void *); void *arg; + bool full; }; struct bufferpool *bufferpool_new(void *(*alloc)(void), void (*dealloc)(void *)) { struct bufferpool *ret = g_new0(__typeof(*ret), 1); ret->alloc = alloc; - mutex_init(&ret->lock); - t_queue_init(&ret->empty_shards); - t_queue_init(&ret->full_shards); ret->dealloc = dealloc; + ret->refs = 1; // count the bufferpool handle itself as a reference + rwlock_init(&ret->shards_lock); + ret->max_shards = 8; + ret->shards = g_new0(struct bpool_shard *, ret->max_shards); return ret; } -// bufferpool is locked and shard is in "full" list but with zero refs +// shard has zero refs and is marked as full static void bufferpool_recycle(struct bpool_shard *shard) { struct bufferpool *bp = shard->bp; - shard->head = shard->empty; + atomic_set_na(&shard->head, shard->empty); + unsigned int refs = 0; if (shard->recycle) - shard->refs += shard->recycle(shard->arg); + refs = shard->recycle(shard->arg); - if (shard->refs == 0) { - shard->full = false; - t_queue_unlink(&bp->full_shards, &shard->link); - t_queue_push_tail_link(&bp->empty_shards, &shard->link); + if (refs) { + atomic_add(&shard->refs, refs); + atomic_add(&bp->refs, refs); } + else + atomic_set(&shard->full, false); } static void bufferpool_dealloc(struct bpool_shard *shard) { @@ -60,14 +65,6 @@ static void bufferpool_dealloc(struct bpool_shard *shard) { bp->dealloc(shard->buf); } -// bufferpool is locked -static void shard_check_full(struct bpool_shard *shard) { - if (shard->refs != 0 || !shard->full) - return; - - bufferpool_recycle(shard); -} - static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) { void *buf = bp->alloc(); if (!buf) @@ -80,7 +77,6 @@ static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) { ret->bp = bp; ret->buf = buf; ret->end = buf + BUFFERPOOL_SHARD_SIZE; - ret->link.data = ret; struct bpool_shard **head = buf; *head = ret; @@ -95,56 +91,207 @@ static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) { return ret; } -void *bufferpool_alloc(struct bufferpool *bp, size_t len) { - if (len > BUFFERPOOL_SHARD_SIZE) - return NULL; +static void bpool_shard_destroy(struct bpool_shard *shard) { + bufferpool_dealloc(shard); + g_free(shard); +} + +// called when references drop to zero +static void __bufferpool_destroy(struct bufferpool *bp) { + for (unsigned int i = 0; i < bp->num_shards; i++) { + struct bpool_shard *shard = bp->shards[i]; + bpool_shard_destroy(shard); + } + + g_free(bp->shards); + rwlock_destroy(&bp->shards_lock); + g_free(bp); +} + +// may destroy bufferpool +static inline void __bufferpool_unref_n(struct bufferpool *bp, unsigned int n) { + assert(atomic_get_na(&bp->refs) >= n); - LOCK(&bp->lock); + unsigned int refs = atomic_sub(&bp->refs, n); + if (refs != n) + return; + + // no more references + __bufferpool_destroy(bp); +} - // check existing shards if one has enough room. if not, create a new one +static inline void __bufferpool_unref(struct bufferpool *bp) { + __bufferpool_unref_n(bp, 1); +} + +static void bufferpool_shard_unref(struct bpool_shard *shard) { + assert(atomic_get_na(&shard->refs) != 0); - struct bpool_shard *shard; + bool full = atomic_get(&shard->full); + unsigned int refs = atomic_dec(&shard->refs); + // if shard was set to full and this was the last reference, we can recycle + if (!full || refs != 1) + return; + + // return shard to empty list (or reserve again) + bufferpool_recycle(shard); +} + +// must hold reference on the bufferpool +// must hold the lock in R, may intermittently be released +static struct bpool_shard *bufferpool_make_shard(struct bufferpool *bp) { + struct bpool_shard *shard = bufferpool_new_shard(bp); + if (!shard) // epic fail + return NULL; + + // Find a place to insert it while (true) { - if (!bp->empty_shards.length) { - shard = bufferpool_new_shard(bp); - t_queue_push_tail_link(&bp->empty_shards, &shard->link); - break; + unsigned int idx = atomic_get_na(&bp->num_shards); + + // Is there room to insert? + if (idx < bp->max_shards) { + // Attempt to insert. Slot must be empty + struct bpool_shard *expected = NULL; + if (!__atomic_compare_exchange_n(&bp->shards[idx], &expected, shard, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; // Somebody beat us to it. Try again + + // Success. Record the new count + atomic_set(&bp->num_shards, idx + 1); + + // We now definitely have a new empty shard. Tell everybody to use it + // and return success + atomic_set_na(&bp->empty_shard_idx, idx); + + return shard; } - shard = bp->empty_shards.head->data; - if (shard->head + len <= shard->end) - break; - t_queue_unlink(&bp->empty_shards, &shard->link); - t_queue_push_tail_link(&bp->full_shards, &shard->link); + // Out of room. Now it gets difficult. We must resize + unsigned int old_size = bp->max_shards; + + rwlock_unlock_r(&bp->shards_lock); + + // Allocate new array first + unsigned int new_size = old_size * 2; + struct bpool_shard **new_shards = g_new(struct bpool_shard *, new_size); + + rwlock_lock_w(&bp->shards_lock); + + // Double check, somebody might have beaten us + if (bp->max_shards != old_size) { + // OK, just try again + rwlock_unlock_w(&bp->shards_lock); + g_free(new_shards); + rwlock_lock_r(&bp->shards_lock); + continue; + } - shard->full = true; - shard_check_full(shard); + // Copy, initialise, and swap + memcpy(new_shards, bp->shards, sizeof(*bp->shards) * old_size); + memset(new_shards + old_size, 0, sizeof(*bp->shards) * (new_size - old_size)); + struct bpool_shard **old_shards = bp->shards; + bp->shards = new_shards; + bp->max_shards = new_size; + + rwlock_unlock_w(&bp->shards_lock); + + g_free(old_shards); + + // OK, now try again + rwlock_lock_r(&bp->shards_lock); } +} - // allocate buffer from shard +void *bufferpool_alloc(struct bufferpool *bp, size_t len) { + len = BUFFERPOOL_ALIGN(len); - void *ret = shard->head; - shard->refs++; - shard->head += BUFFERPOOL_ALIGN(len); - return ret; + if (len > BUFFERPOOL_SHARD_SIZE - BUFFERPOOL_OVERHEAD) + return NULL; + + atomic_inc(&bp->refs); + + // Check existing shards if one has enough room. If not, create a new one + + rwlock_lock_r(&bp->shards_lock); + + // Outer loop: To retry after creating a new shard if it was needed + while (true) { + unsigned int idx = atomic_get_na(&bp->empty_shard_idx); + unsigned int start = idx; + + // Inner loop: To cycle through all existing shards, looking for room + while (true) { + if (idx >= atomic_get_na(&bp->num_shards)) { + if (idx == 0) + break; // we don't have any shards + if (start == 0) + break; // circled around, found nothing + idx = 0; + } + + struct bpool_shard *shard = atomic_get_na(&bp->shards[idx]); + + // Only attempt allocation if known not to be full. This comes first + if (!atomic_get(&shard->full)) { + // Register as a reference + atomic_inc(&shard->refs); + + // Attempt to allocate + void *ret = atomic_add(&shard->head, len); + + // Was the allocation successful? (Shard not full) + if (ret + len <= shard->end) { + rwlock_unlock_r(&bp->shards_lock); + + // remember empty index for next user + if (idx != start) + atomic_set_na(&bp->empty_shard_idx, idx); + return ret; + } + + // Shard full. Go to next one and try again + // Set to full first, then drop reference + atomic_set(&shard->full, true); + bufferpool_shard_unref(shard); + } + + idx++; + if (idx == start) + break; // exhausted all our options + } + + // Found nothing. Must create new shard and put it into the array + if (!bufferpool_make_shard(bp)) { + // disaster struck + rwlock_unlock_r(&bp->shards_lock); + __bufferpool_unref(bp); + return NULL; + } + } } +// Get a completely empty shard. Create one if needed. +// XXX can be improved to avoid always using entire shards? +// XXX doesn't currently mix with alloc/unref because of "full" being racy void *bufferpool_reserve(struct bufferpool *bp, unsigned int refs, unsigned int (*recycle)(void *), void *arg) { - LOCK(&bp->lock); + atomic_add(&bp->refs, refs); + rwlock_lock_r(&bp->shards_lock); - // get a completely empty shard. create one if needed + struct bpool_shard *shard = bufferpool_make_shard(bp); - struct bpool_shard *shard = t_queue_peek_head(&bp->empty_shards); - if (shard && shard->head == shard->empty && shard->refs == 0) - t_queue_unlink(&bp->empty_shards, &shard->link); - else - shard = bufferpool_new_shard(bp); + if (!shard) { + // disaster struck + rwlock_unlock_r(&bp->shards_lock); + __bufferpool_unref(bp); + return NULL; + } - // set references, set recycle callback, move to full list - shard->refs = refs; - shard->full = true; - t_queue_push_tail_link(&bp->full_shards, &shard->link); + // set references, set recycle callback + assert(atomic_get_na(&shard->refs) == 0); + atomic_set(&shard->refs, refs); + + atomic_set(&shard->full, true); shard->recycle = recycle; shard->arg = arg; @@ -156,23 +303,6 @@ static struct bpool_shard *bpool_find_shard(void *p) { return *head; } -static void bpool_shard_destroy(struct bpool_shard *shard) { - bufferpool_dealloc(shard); - g_free(shard); -} - -static void bpool_shard_delayed_destroy(struct bufferpool *bp, struct bpool_shard *shard) { - if (shard->full) { - shard_list *link = t_queue_find(&bp->full_shards, shard); - t_queue_unlink(&bp->full_shards, link); - } - else { - shard_list *link = t_queue_find(&bp->empty_shards, shard); - t_queue_unlink(&bp->empty_shards, link); - } - bpool_shard_destroy(shard); -} - void bufferpool_unref(void *p) { if (!p) return; @@ -180,38 +310,21 @@ void bufferpool_unref(void *p) { struct bpool_shard *shard = bpool_find_shard(p); if (!shard) // should only happen during shutdown return; - struct bufferpool *bpool = shard->bp; - - { - LOCK(&bpool->lock); - assert(shard->refs != 0); - shard->refs--; - // handle delayed destruction - if (!bpool->destroy) { - shard_check_full(shard); - return; - } - // wait for refs to drop to zero, then remove/free shard, and destroy pool if no shards left - if (shard->refs > 0) - return; - bpool_shard_delayed_destroy(bpool, shard); - if (bpool->full_shards.length || bpool->empty_shards.length) - return; // still some left - } - // no shards left, can destroy now - bufferpool_destroy(bpool); + + bufferpool_shard_unref(shard); + __bufferpool_unref(shard->bp); } +// currently called from synchronous context relative to bufferpool_destroy, so no need +// to check for delayed destruction void bufferpool_release(void *p) { if (!p) return; struct bpool_shard *shard = bpool_find_shard(p); - struct bufferpool *bpool = shard->bp; - LOCK(&bpool->lock); - assert(shard->refs != 0); - shard->refs = 0; + unsigned int refs = atomic_exchange_na(&shard->refs, 0); + __bufferpool_unref_n(shard->bp, refs); } void *bufferpool_ref(void *p) { @@ -219,39 +332,17 @@ void *bufferpool_ref(void *p) { return NULL; struct bpool_shard *shard = bpool_find_shard(p); - struct bufferpool *bpool = shard->bp; - LOCK(&bpool->lock); - assert(shard->refs != 0); - shard->refs++; - return p; -} + assert(atomic_get_na(&shard->refs) != 0); -static void bpool_destroy_shards(shard_q *q) { - shard_list *l = q->head; - while (l) { - shard_list *n = l->next; - struct bpool_shard *shard = l->data; - if (shard->refs == 0) { - t_queue_unlink(q, l); - bpool_shard_destroy(shard); - } - l = n; - } + atomic_inc(&shard->refs); + atomic_inc(&shard->bp->refs); + + return p; } void bufferpool_destroy(struct bufferpool *bp) { - { - LOCK(&bp->lock); - bpool_destroy_shards(&bp->full_shards); - bpool_destroy_shards(&bp->empty_shards); - if (bp->full_shards.length || bp->empty_shards.length) { - // deferred destruction - bp->destroy = true; - return; - } - } - g_free(bp); + __bufferpool_unref(bp); } void bufferpool_init(void) {