Browse Source

MT#55283 make bufferpool (mostly) lock-free

Change-Id: I9c6803c3b01deacafc20bcca4c6ddd89fd69d3e9
pull/1923/head
Richard Fuchs 9 months ago
parent
commit
bcbe751dec
2 changed files with 231 additions and 129 deletions
  1. +12
    -1
      lib/auxlib.h
  2. +219
    -128
      lib/bufferpool.c

+ 12
- 1
lib/auxlib.h View File

@ -641,8 +641,19 @@ INLINE double atomic64_div(const atomic64 *n, const atomic64 *d) {
}
#define atomic_get_na(x) __atomic_load_n(x, __ATOMIC_RELAXED)
#define atomic_get(x) __atomic_load_n(x, __ATOMIC_SEQ_CST)
#define atomic_set_na(x,y) __atomic_store_n(x, y, __ATOMIC_RELAXED)
#define atomic_inc_na(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED);
#define atomic_set(x,y) __atomic_store_n(x, y, __ATOMIC_SEQ_CST)
#define atomic_inc_na(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED)
#define atomic_inc(x) __atomic_fetch_add(x, 1, __ATOMIC_SEQ_CST)
#define atomic_dec(x) __atomic_fetch_sub(x, 1, __ATOMIC_SEQ_CST)
#define atomic_add(x,y) __atomic_fetch_add(x, y, __ATOMIC_SEQ_CST)
#define atomic_add_na(x,y) __atomic_fetch_add(x, y, __ATOMIC_RELAXED)
#define atomic_sub(x,y) __atomic_fetch_sub(x, y, __ATOMIC_SEQ_CST)
#define atomic_sub_na(x,y) __atomic_fetch_sub(x, y, __ATOMIC_RELAXED)
#define atomic_exchange(x,y) __atomic_exchange_n(x, y, __ATOMIC_SEQ_CST)
#define atomic_exchange_na(x,y) __atomic_exchange_n(x, y, __ATOMIC_RELAXED)
#define atomic_compare_exchange(x,y) __atomic_exchange_n(x, y, __ATOMIC_SEQ_CST)
/*** ATOMIC BITFIELD OPERATIONS ***/


+ 219
- 128
lib/bufferpool.c View File

@ -6,15 +6,18 @@
static_assert((BUFFERPOOL_SHARD_SIZE & (BUFFERPOOL_SHARD_SIZE - 1)) == 0,
"BUFFERPOOL_SHARD_SIZE is not a power of two");
TYPED_GQUEUE(shard, struct bpool_shard)
struct bpool_shard;
struct bufferpool {
void *(*alloc)(void);
void (*dealloc)(void *);
mutex_t lock;
shard_q empty_shards;
shard_q full_shards;
bool destroy;
unsigned int refs; // sum of all refs from shards, plus the handle itself
rwlock_t shards_lock;
struct bpool_shard **shards;
unsigned int num_shards;
unsigned int max_shards;
unsigned int empty_shard_idx;
};
struct bpool_shard {
@ -24,35 +27,37 @@ struct bpool_shard {
void *empty; // head of usable buffer, head == empty if empty
void *end;
void *head;
bool full;
shard_list link;
unsigned int (*recycle)(void *);
void *arg;
bool full;
};
struct bufferpool *bufferpool_new(void *(*alloc)(void), void (*dealloc)(void *)) {
struct bufferpool *ret = g_new0(__typeof(*ret), 1);
ret->alloc = alloc;
mutex_init(&ret->lock);
t_queue_init(&ret->empty_shards);
t_queue_init(&ret->full_shards);
ret->dealloc = dealloc;
ret->refs = 1; // count the bufferpool handle itself as a reference
rwlock_init(&ret->shards_lock);
ret->max_shards = 8;
ret->shards = g_new0(struct bpool_shard *, ret->max_shards);
return ret;
}
// bufferpool is locked and shard is in "full" list but with zero refs
// shard has zero refs and is marked as full
static void bufferpool_recycle(struct bpool_shard *shard) {
struct bufferpool *bp = shard->bp;
shard->head = shard->empty;
atomic_set_na(&shard->head, shard->empty);
unsigned int refs = 0;
if (shard->recycle)
shard->refs += shard->recycle(shard->arg);
refs = shard->recycle(shard->arg);
if (shard->refs == 0) {
shard->full = false;
t_queue_unlink(&bp->full_shards, &shard->link);
t_queue_push_tail_link(&bp->empty_shards, &shard->link);
if (refs) {
atomic_add(&shard->refs, refs);
atomic_add(&bp->refs, refs);
}
else
atomic_set(&shard->full, false);
}
static void bufferpool_dealloc(struct bpool_shard *shard) {
@ -60,14 +65,6 @@ static void bufferpool_dealloc(struct bpool_shard *shard) {
bp->dealloc(shard->buf);
}
// bufferpool is locked
static void shard_check_full(struct bpool_shard *shard) {
if (shard->refs != 0 || !shard->full)
return;
bufferpool_recycle(shard);
}
static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) {
void *buf = bp->alloc();
if (!buf)
@ -80,7 +77,6 @@ static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) {
ret->bp = bp;
ret->buf = buf;
ret->end = buf + BUFFERPOOL_SHARD_SIZE;
ret->link.data = ret;
struct bpool_shard **head = buf;
*head = ret;
@ -95,56 +91,207 @@ static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) {
return ret;
}
void *bufferpool_alloc(struct bufferpool *bp, size_t len) {
if (len > BUFFERPOOL_SHARD_SIZE)
return NULL;
static void bpool_shard_destroy(struct bpool_shard *shard) {
bufferpool_dealloc(shard);
g_free(shard);
}
// called when references drop to zero
static void __bufferpool_destroy(struct bufferpool *bp) {
for (unsigned int i = 0; i < bp->num_shards; i++) {
struct bpool_shard *shard = bp->shards[i];
bpool_shard_destroy(shard);
}
g_free(bp->shards);
rwlock_destroy(&bp->shards_lock);
g_free(bp);
}
// may destroy bufferpool
static inline void __bufferpool_unref_n(struct bufferpool *bp, unsigned int n) {
assert(atomic_get_na(&bp->refs) >= n);
LOCK(&bp->lock);
unsigned int refs = atomic_sub(&bp->refs, n);
if (refs != n)
return;
// no more references
__bufferpool_destroy(bp);
}
// check existing shards if one has enough room. if not, create a new one
static inline void __bufferpool_unref(struct bufferpool *bp) {
__bufferpool_unref_n(bp, 1);
}
static void bufferpool_shard_unref(struct bpool_shard *shard) {
assert(atomic_get_na(&shard->refs) != 0);
struct bpool_shard *shard;
bool full = atomic_get(&shard->full);
unsigned int refs = atomic_dec(&shard->refs);
// if shard was set to full and this was the last reference, we can recycle
if (!full || refs != 1)
return;
// return shard to empty list (or reserve again)
bufferpool_recycle(shard);
}
// must hold reference on the bufferpool
// must hold the lock in R, may intermittently be released
static struct bpool_shard *bufferpool_make_shard(struct bufferpool *bp) {
struct bpool_shard *shard = bufferpool_new_shard(bp);
if (!shard) // epic fail
return NULL;
// Find a place to insert it
while (true) {
if (!bp->empty_shards.length) {
shard = bufferpool_new_shard(bp);
t_queue_push_tail_link(&bp->empty_shards, &shard->link);
break;
unsigned int idx = atomic_get_na(&bp->num_shards);
// Is there room to insert?
if (idx < bp->max_shards) {
// Attempt to insert. Slot must be empty
struct bpool_shard *expected = NULL;
if (!__atomic_compare_exchange_n(&bp->shards[idx], &expected, shard,
false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue; // Somebody beat us to it. Try again
// Success. Record the new count
atomic_set(&bp->num_shards, idx + 1);
// We now definitely have a new empty shard. Tell everybody to use it
// and return success
atomic_set_na(&bp->empty_shard_idx, idx);
return shard;
}
shard = bp->empty_shards.head->data;
if (shard->head + len <= shard->end)
break;
t_queue_unlink(&bp->empty_shards, &shard->link);
t_queue_push_tail_link(&bp->full_shards, &shard->link);
// Out of room. Now it gets difficult. We must resize
unsigned int old_size = bp->max_shards;
rwlock_unlock_r(&bp->shards_lock);
// Allocate new array first
unsigned int new_size = old_size * 2;
struct bpool_shard **new_shards = g_new(struct bpool_shard *, new_size);
rwlock_lock_w(&bp->shards_lock);
// Double check, somebody might have beaten us
if (bp->max_shards != old_size) {
// OK, just try again
rwlock_unlock_w(&bp->shards_lock);
g_free(new_shards);
rwlock_lock_r(&bp->shards_lock);
continue;
}
shard->full = true;
shard_check_full(shard);
// Copy, initialise, and swap
memcpy(new_shards, bp->shards, sizeof(*bp->shards) * old_size);
memset(new_shards + old_size, 0, sizeof(*bp->shards) * (new_size - old_size));
struct bpool_shard **old_shards = bp->shards;
bp->shards = new_shards;
bp->max_shards = new_size;
rwlock_unlock_w(&bp->shards_lock);
g_free(old_shards);
// OK, now try again
rwlock_lock_r(&bp->shards_lock);
}
}
// allocate buffer from shard
void *bufferpool_alloc(struct bufferpool *bp, size_t len) {
len = BUFFERPOOL_ALIGN(len);
void *ret = shard->head;
shard->refs++;
shard->head += BUFFERPOOL_ALIGN(len);
return ret;
if (len > BUFFERPOOL_SHARD_SIZE - BUFFERPOOL_OVERHEAD)
return NULL;
atomic_inc(&bp->refs);
// Check existing shards if one has enough room. If not, create a new one
rwlock_lock_r(&bp->shards_lock);
// Outer loop: To retry after creating a new shard if it was needed
while (true) {
unsigned int idx = atomic_get_na(&bp->empty_shard_idx);
unsigned int start = idx;
// Inner loop: To cycle through all existing shards, looking for room
while (true) {
if (idx >= atomic_get_na(&bp->num_shards)) {
if (idx == 0)
break; // we don't have any shards
if (start == 0)
break; // circled around, found nothing
idx = 0;
}
struct bpool_shard *shard = atomic_get_na(&bp->shards[idx]);
// Only attempt allocation if known not to be full. This comes first
if (!atomic_get(&shard->full)) {
// Register as a reference
atomic_inc(&shard->refs);
// Attempt to allocate
void *ret = atomic_add(&shard->head, len);
// Was the allocation successful? (Shard not full)
if (ret + len <= shard->end) {
rwlock_unlock_r(&bp->shards_lock);
// remember empty index for next user
if (idx != start)
atomic_set_na(&bp->empty_shard_idx, idx);
return ret;
}
// Shard full. Go to next one and try again
// Set to full first, then drop reference
atomic_set(&shard->full, true);
bufferpool_shard_unref(shard);
}
idx++;
if (idx == start)
break; // exhausted all our options
}
// Found nothing. Must create new shard and put it into the array
if (!bufferpool_make_shard(bp)) {
// disaster struck
rwlock_unlock_r(&bp->shards_lock);
__bufferpool_unref(bp);
return NULL;
}
}
}
// Get a completely empty shard. Create one if needed.
// XXX can be improved to avoid always using entire shards?
// XXX doesn't currently mix with alloc/unref because of "full" being racy
void *bufferpool_reserve(struct bufferpool *bp, unsigned int refs, unsigned int (*recycle)(void *), void *arg) {
LOCK(&bp->lock);
atomic_add(&bp->refs, refs);
rwlock_lock_r(&bp->shards_lock);
// get a completely empty shard. create one if needed
struct bpool_shard *shard = bufferpool_make_shard(bp);
struct bpool_shard *shard = t_queue_peek_head(&bp->empty_shards);
if (shard && shard->head == shard->empty && shard->refs == 0)
t_queue_unlink(&bp->empty_shards, &shard->link);
else
shard = bufferpool_new_shard(bp);
if (!shard) {
// disaster struck
rwlock_unlock_r(&bp->shards_lock);
__bufferpool_unref(bp);
return NULL;
}
// set references, set recycle callback, move to full list
shard->refs = refs;
shard->full = true;
t_queue_push_tail_link(&bp->full_shards, &shard->link);
// set references, set recycle callback
assert(atomic_get_na(&shard->refs) == 0);
atomic_set(&shard->refs, refs);
atomic_set(&shard->full, true);
shard->recycle = recycle;
shard->arg = arg;
@ -156,23 +303,6 @@ static struct bpool_shard *bpool_find_shard(void *p) {
return *head;
}
static void bpool_shard_destroy(struct bpool_shard *shard) {
bufferpool_dealloc(shard);
g_free(shard);
}
static void bpool_shard_delayed_destroy(struct bufferpool *bp, struct bpool_shard *shard) {
if (shard->full) {
shard_list *link = t_queue_find(&bp->full_shards, shard);
t_queue_unlink(&bp->full_shards, link);
}
else {
shard_list *link = t_queue_find(&bp->empty_shards, shard);
t_queue_unlink(&bp->empty_shards, link);
}
bpool_shard_destroy(shard);
}
void bufferpool_unref(void *p) {
if (!p)
return;
@ -180,38 +310,21 @@ void bufferpool_unref(void *p) {
struct bpool_shard *shard = bpool_find_shard(p);
if (!shard) // should only happen during shutdown
return;
struct bufferpool *bpool = shard->bp;
{
LOCK(&bpool->lock);
assert(shard->refs != 0);
shard->refs--;
// handle delayed destruction
if (!bpool->destroy) {
shard_check_full(shard);
return;
}
// wait for refs to drop to zero, then remove/free shard, and destroy pool if no shards left
if (shard->refs > 0)
return;
bpool_shard_delayed_destroy(bpool, shard);
if (bpool->full_shards.length || bpool->empty_shards.length)
return; // still some left
}
// no shards left, can destroy now
bufferpool_destroy(bpool);
bufferpool_shard_unref(shard);
__bufferpool_unref(shard->bp);
}
// currently called from synchronous context relative to bufferpool_destroy, so no need
// to check for delayed destruction
void bufferpool_release(void *p) {
if (!p)
return;
struct bpool_shard *shard = bpool_find_shard(p);
struct bufferpool *bpool = shard->bp;
LOCK(&bpool->lock);
assert(shard->refs != 0);
shard->refs = 0;
unsigned int refs = atomic_exchange_na(&shard->refs, 0);
__bufferpool_unref_n(shard->bp, refs);
}
void *bufferpool_ref(void *p) {
@ -219,39 +332,17 @@ void *bufferpool_ref(void *p) {
return NULL;
struct bpool_shard *shard = bpool_find_shard(p);
struct bufferpool *bpool = shard->bp;
LOCK(&bpool->lock);
assert(shard->refs != 0);
shard->refs++;
return p;
}
assert(atomic_get_na(&shard->refs) != 0);
static void bpool_destroy_shards(shard_q *q) {
shard_list *l = q->head;
while (l) {
shard_list *n = l->next;
struct bpool_shard *shard = l->data;
if (shard->refs == 0) {
t_queue_unlink(q, l);
bpool_shard_destroy(shard);
}
l = n;
}
atomic_inc(&shard->refs);
atomic_inc(&shard->bp->refs);
return p;
}
void bufferpool_destroy(struct bufferpool *bp) {
{
LOCK(&bp->lock);
bpool_destroy_shards(&bp->full_shards);
bpool_destroy_shards(&bp->empty_shards);
if (bp->full_shards.length || bp->empty_shards.length) {
// deferred destruction
bp->destroy = true;
return;
}
}
g_free(bp);
__bufferpool_unref(bp);
}
void bufferpool_init(void) {


Loading…
Cancel
Save