diff options
Diffstat (limited to 'src/ioq.c')
-rw-r--r-- | src/ioq.c | 279 |
1 files changed, 201 insertions, 78 deletions
@@ -2,14 +2,17 @@ // SPDX-License-Identifier: 0BSD #include "ioq.h" +#include "atomic.h" +#include "bfstd.h" +#include "bit.h" +#include "config.h" +#include "diag.h" #include "dir.h" -#include "list.h" #include "lock.h" #include "sanity.h" #include <assert.h> #include <errno.h> #include <pthread.h> -#include <stdbool.h> #include <stdlib.h> /** @@ -28,102 +31,225 @@ struct ioq_req { /** * An I/O queue command. */ -struct ioq_cmd { - union { - struct ioq_req req; - struct ioq_res res; - }; - - struct ioq_cmd *next; +union ioq_cmd { + struct ioq_req req; + struct ioq_res res; }; /** - * An MPMC queue of I/O commands. + * A monitor for an I/O queue slot. */ -struct ioqq { - pthread_mutex_t mutex; - pthread_cond_t cond; - - bool stop; - - struct ioq_cmd *head; - struct ioq_cmd **tail; +struct ioq_monitor { + cache_align pthread_mutex_t mutex; + pthread_cond_t full; + pthread_cond_t empty; }; -static struct ioqq *ioqq_create(void) { - struct ioqq *ioqq = malloc(sizeof(*ioqq)); - if (!ioqq) { +/** Initialize an ioq_monitor. */ +static int ioq_monitor_init(struct ioq_monitor *monitor) { + if (mutex_init(&monitor->mutex, NULL) != 0) { goto fail; } - if (mutex_init(&ioqq->mutex, NULL) != 0) { - goto fail_free; + if (cond_init(&monitor->full, NULL) != 0) { + goto fail_mutex; } - if (cond_init(&ioqq->cond, NULL) != 0) { - goto fail_mutex; + if (cond_init(&monitor->empty, NULL) != 0) { + goto fail_full; } - ioqq->stop = false; - SLIST_INIT(ioqq); - return ioqq; + return 0; +fail_full: + cond_destroy(&monitor->full); fail_mutex: - mutex_destroy(&ioqq->mutex); -fail_free: - free(ioqq); + mutex_destroy(&monitor->mutex); fail: - return NULL; + return -1; } -/** Push a command onto the queue. */ -static void ioqq_push(struct ioqq *ioqq, struct ioq_cmd *cmd) { - mutex_lock(&ioqq->mutex); - SLIST_APPEND(ioqq, cmd); - mutex_unlock(&ioqq->mutex); - cond_signal(&ioqq->cond); +/** Destroy an ioq_monitor. */ +static void ioq_monitor_destroy(struct ioq_monitor *monitor) { + cond_destroy(&monitor->empty); + cond_destroy(&monitor->full); + mutex_destroy(&monitor->mutex); } -/** Pop a command from a queue. */ -static struct ioq_cmd *ioqq_pop(struct ioqq *ioqq) { - mutex_lock(&ioqq->mutex); +/** + * A slot in an I/O queue. + */ +struct ioq_slot { + struct ioq_monitor *monitor; + union ioq_cmd *cmd; +}; + +/** Initialize an ioq_slot. */ +static void ioq_slot_init(struct ioq_slot *slot, struct ioq_monitor *monitor) { + slot->monitor = monitor; + slot->cmd = NULL; +} - while (!ioqq->stop && !ioqq->head) { - cond_wait(&ioqq->cond, &ioqq->mutex); +/** Push a command into a slot. */ +static void ioq_slot_push(struct ioq_slot *slot, union ioq_cmd *cmd) { + struct ioq_monitor *monitor = slot->monitor; + + mutex_lock(&monitor->mutex); + while (slot->cmd) { + cond_wait(&monitor->empty, &monitor->mutex); } + slot->cmd = cmd; + mutex_unlock(&monitor->mutex); - struct ioq_cmd *cmd = SLIST_POP(ioqq); - mutex_unlock(&ioqq->mutex); - return cmd; + cond_broadcast(&monitor->full); } -/** Pop a command from a queue without blocking. */ -static struct ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { - if (!mutex_trylock(&ioqq->mutex)) { - return NULL; +/** Pop a command from a slot. */ +static union ioq_cmd *ioq_slot_pop(struct ioq_slot *slot) { + struct ioq_monitor *monitor = slot->monitor; + + mutex_lock(&monitor->mutex); + while (!slot->cmd) { + cond_wait(&monitor->full, &monitor->mutex); } + union ioq_cmd *ret = slot->cmd; + slot->cmd = NULL; + mutex_unlock(&monitor->mutex); - struct ioq_cmd *cmd = SLIST_POP(ioqq); - mutex_unlock(&ioqq->mutex); - return cmd; + cond_broadcast(&monitor->empty); + + return ret; } -/** Stop a queue, waking up any waiters. */ -static void ioqq_stop(struct ioqq *ioqq) { - mutex_lock(&ioqq->mutex); - ioqq->stop = true; - mutex_unlock(&ioqq->mutex); - cond_broadcast(&ioqq->cond); +/** Pop a command from a slot, if one exists. */ +static union ioq_cmd *ioq_slot_trypop(struct ioq_slot *slot) { + struct ioq_monitor *monitor = slot->monitor; + + if (!mutex_trylock(&monitor->mutex)) { + return NULL; + } + + union ioq_cmd *ret = slot->cmd; + slot->cmd = NULL; + + mutex_unlock(&monitor->mutex); + + if (ret) { + cond_broadcast(&monitor->empty); + } + return ret; } +/** + * An MPMC queue of I/O commands. + */ +struct ioqq { + /** Circular buffer index mask. */ + size_t mask; + + /** Number of monitors. */ + size_t nmonitors; + /** Array of monitors used by the slots. */ + struct ioq_monitor *monitors; + + /** Index of next writer. */ + cache_align atomic size_t head; + /** Index of next reader. */ + cache_align atomic size_t tail; + + /** The circular buffer itself. */ + cache_align struct ioq_slot slots[]; +}; + +// If we assign slots sequentially, threads will likely be operating on +// consecutive slots. If these slots are in the same cache line, that will +// result in false sharing. We can mitigate this by assigning slots with a +// stride larger than a cache line e.g. 0, 9, 18, ..., 1, 10, 19, ... +// As long as the stride is relatively prime to circular buffer length, we'll +// still use every available slot. Since the length is a power of two, that +// means the stride must be odd. + +#define IOQ_STRIDE ((FALSE_SHARING_SIZE / sizeof(struct ioq_slot)) | 1) +bfs_static_assert(IOQ_STRIDE % 2 == 1); + +/** Destroy an I/O command queue. */ static void ioqq_destroy(struct ioqq *ioqq) { - if (ioqq) { - cond_destroy(&ioqq->cond); - mutex_destroy(&ioqq->mutex); - free(ioqq); + for (size_t i = 0; i < ioqq->nmonitors; ++i) { + ioq_monitor_destroy(&ioqq->monitors[i]); + } + free(ioqq->monitors); + free(ioqq); +} + +/** Create an I/O command queue. */ +static struct ioqq *ioqq_create(size_t size) { + // Circular buffer size must be a power of two + size = bit_ceil(size); + + struct ioqq *ioqq = xmemalign(alignof(struct ioqq), flex_sizeof(struct ioqq, slots, size)); + if (!ioqq) { + return NULL; + } + + // Use a pool of monitors + size_t nmonitors = size < 64 ? size : 64; + ioqq->nmonitors = 0; + ioqq->monitors = xmemalign(alignof(struct ioq_monitor), nmonitors * sizeof(struct ioq_monitor)); + if (!ioqq->monitors) { + ioqq_destroy(ioqq); + return NULL; + } + + for (size_t i = 0; i < nmonitors; ++i) { + if (ioq_monitor_init(&ioqq->monitors[i]) != 0) { + ioqq_destroy(ioqq); + return NULL; + } + ++ioqq->nmonitors; + } + + ioqq->mask = size - 1; + + atomic_init(&ioqq->head, 0); + atomic_init(&ioqq->tail, 0); + + for (size_t i = 0; i < size; ++i) { + ioq_slot_init(&ioqq->slots[i], &ioqq->monitors[i % nmonitors]); + } + + return ioqq; +} + +/** Push a command onto the queue. */ +static void ioqq_push(struct ioqq *ioqq, union ioq_cmd *cmd) { + size_t i = fetch_add(&ioqq->head, IOQ_STRIDE, relaxed); + ioq_slot_push(&ioqq->slots[i & ioqq->mask], cmd); +} + +/** Pop a command from a queue. */ +static union ioq_cmd *ioqq_pop(struct ioqq *ioqq) { + size_t i = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); + return ioq_slot_pop(&ioqq->slots[i & ioqq->mask]); +} + +/** Pop a command from a queue if one is available. */ +static union ioq_cmd *ioqq_trypop(struct ioqq *ioqq) { + size_t i = load(&ioqq->tail, relaxed); + union ioq_cmd *cmd = ioq_slot_trypop(&ioqq->slots[i & ioqq->mask]); + if (cmd) { +#ifdef NDEBUG + store(&ioqq->tail, i + IOQ_STRIDE, relaxed); +#else + size_t j = fetch_add(&ioqq->tail, IOQ_STRIDE, relaxed); + bfs_assert(j == i, "ioqq_trypop() only supports a single consumer"); +#endif } + return cmd; } +/** Sentinel stop command. */ +static union ioq_cmd IOQ_STOP; + struct ioq { /** The depth of the queue. */ size_t depth; @@ -146,8 +272,8 @@ static void *ioq_work(void *ptr) { struct ioq *ioq = ptr; while (true) { - struct ioq_cmd *cmd = ioqq_pop(ioq->pending); - if (!cmd) { + union ioq_cmd *cmd = ioqq_pop(ioq->pending); + if (cmd == &IOQ_STOP) { break; } @@ -176,16 +302,17 @@ struct ioq *ioq_create(size_t depth, size_t threads) { ioq->depth = depth; ioq->size = 0; + ioq->pending = NULL; ioq->ready = NULL; ioq->nthreads = 0; - ioq->pending = ioqq_create(); + ioq->pending = ioqq_create(depth); if (!ioq->pending) { goto fail; } - ioq->ready = ioqq_create(); + ioq->ready = ioqq_create(depth); if (!ioq->ready) { goto fail; } @@ -218,7 +345,7 @@ int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) { return -1; } - struct ioq_cmd *cmd = malloc(sizeof(*cmd)); + union ioq_cmd *cmd = malloc(sizeof(*cmd)); if (!cmd) { return -1; } @@ -228,8 +355,8 @@ int ioq_opendir(struct ioq *ioq, int dfd, const char *path, void *ptr) { req->path = path; req->ptr = ptr; - ++ioq->size; ioqq_push(ioq->pending, cmd); + ++ioq->size; return 0; } @@ -238,11 +365,7 @@ struct ioq_res *ioq_pop(struct ioq *ioq) { return NULL; } - struct ioq_cmd *cmd = ioqq_pop(ioq->ready); - if (!cmd) { - return NULL; - } - + union ioq_cmd *cmd = ioqq_pop(ioq->ready); --ioq->size; return &cmd->res; } @@ -252,7 +375,7 @@ struct ioq_res *ioq_trypop(struct ioq *ioq) { return NULL; } - struct ioq_cmd *cmd = ioqq_trypop(ioq->ready); + union ioq_cmd *cmd = ioqq_trypop(ioq->ready); if (!cmd) { return NULL; } @@ -262,7 +385,7 @@ struct ioq_res *ioq_trypop(struct ioq *ioq) { } void ioq_free(struct ioq *ioq, struct ioq_res *res) { - struct ioq_cmd *cmd = (struct ioq_cmd *)res; + union ioq_cmd *cmd = (union ioq_cmd *)res; free(cmd); } @@ -271,8 +394,8 @@ void ioq_destroy(struct ioq *ioq) { return; } - if (ioq->pending) { - ioqq_stop(ioq->pending); + for (size_t i = 0; i < ioq->nthreads; ++i) { + ioqq_push(ioq->pending, &IOQ_STOP); } for (size_t i = 0; i < ioq->nthreads; ++i) { |