summaryrefslogtreecommitdiffstats
path: root/src/ioq.c
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2023-10-02 13:09:41 -0400
committerTavian Barnes <tavianator@tavianator.com>2023-10-02 13:09:41 -0400
commit1c775d0128a797370bbc0bbd527b4bbbc9d0b83d (patch)
treedba15bad6c144786f462c0fed7ba727d4823b93e /src/ioq.c
parentfed31013ebfa6b0c5165f015da5b96ce524224eb (diff)
parent1afa241472709b32baf5e3e1fd3ba6ebd5fd1bf6 (diff)
downloadbfs-1c775d0128a797370bbc0bbd527b4bbbc9d0b83d.tar.xz
Merge branch 'io-uring'
Diffstat (limited to 'src/ioq.c')
-rw-r--r--src/ioq.c307
1 files changed, 270 insertions, 37 deletions
diff --git a/src/ioq.c b/src/ioq.c
index d3ba2de..04b9c0d 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -16,6 +16,10 @@
#include <pthread.h>
#include <stdlib.h>
+#if BFS_USE_LIBURING
+# include <liburing.h>
+#endif
+
/**
* A monitor for an I/O queue slot.
*/
@@ -280,6 +284,21 @@ static struct ioq_ent *ioqq_trypop(struct ioqq *ioqq) {
/** Sentinel stop command. */
static struct ioq_ent IOQ_STOP;
+/** I/O queue thread-specific data. */
+struct ioq_thread {
+ /** The thread handle. */
+ pthread_t id;
+ /** Pointer back to the I/O queue. */
+ struct ioq *parent;
+
+#if BFS_USE_LIBURING
+ /** io_uring instance. */
+ struct io_uring ring;
+ /** Any error that occurred initializing the ring. */
+ int ring_err;
+#endif
+};
+
struct ioq {
/** The depth of the queue. */
size_t depth;
@@ -299,60 +318,247 @@ struct ioq {
/** The number of background threads. */
size_t nthreads;
/** The background threads themselves. */
- pthread_t threads[];
+ struct ioq_thread threads[];
};
-/** Background thread entry point. */
-static void *ioq_work(void *ptr) {
- struct ioq *ioq = ptr;
+/** Cancel a request if we need to. */
+static bool ioq_check_cancel(struct ioq *ioq, struct ioq_ent *ent) {
+ if (!load(&ioq->cancel, relaxed)) {
+ return false;
+ }
- while (true) {
- struct ioq_ent *ent = ioqq_pop(ioq->pending);
- if (ent == &IOQ_STOP) {
- break;
+ // Always close(), even if we're cancelled, just like a real EINTR
+ if (ent->op == IOQ_CLOSE || ent->op == IOQ_CLOSEDIR) {
+ return false;
+ }
+
+ ent->ret = -1;
+ ent->error = EINTR;
+ ioqq_push(ioq->ready, ent);
+ return true;
+}
+
+/** Handle a single request synchronously. */
+static void ioq_handle(struct ioq *ioq, struct ioq_ent *ent) {
+ int ret;
+
+ switch (ent->op) {
+ case IOQ_CLOSE:
+ ret = xclose(ent->close.fd);
+ break;
+
+ case IOQ_OPENDIR:
+ ret = bfs_opendir(ent->opendir.dir, ent->opendir.dfd, ent->opendir.path);
+ if (ret == 0) {
+ bfs_polldir(ent->opendir.dir);
}
+ break;
+
+ case IOQ_CLOSEDIR:
+ ret = bfs_closedir(ent->closedir.dir);
+ break;
+
+ default:
+ bfs_bug("Unknown ioq_op %d", (int)ent->op);
+ ret = -1;
+ errno = ENOSYS;
+ break;
+ }
+
+ ent->ret = ret;
+ ent->error = ret == 0 ? 0 : errno;
+
+ ioqq_push(ioq->ready, ent);
+}
- bool cancel = load(&ioq->cancel, relaxed);
+#if BFS_USE_LIBURING
+/** io_uring worker state. */
+struct ioq_ring_state {
+ /** The I/O queue. */
+ struct ioq *ioq;
+ /** The io_uring. */
+ struct io_uring *ring;
+ /** The current ioq->pending slot. */
+ ioq_slot *slot;
+ /** Number of prepped, unsubmitted SQEs. */
+ size_t prepped;
+ /** Number of submitted, unreaped SQEs. */
+ size_t submitted;
+ /** Whether to stop the loop. */
+ bool stop;
+};
+
+/** Pop a request for ioq_ring_prep(). */
+static struct ioq_ent *ioq_ring_pop(struct ioq_ring_state *state) {
+ if (state->stop) {
+ return NULL;
+ }
+
+ // Advance to the next slot if necessary
+ struct ioq *ioq = state->ioq;
+ if (!state->slot) {
+ state->slot = ioqq_read(ioq->pending);
+ }
+
+ // Block if we have nothing else to do
+ bool block = !state->prepped && !state->submitted;
+ struct ioq_ent *ret = ioq_slot_pop(ioq->pending, state->slot, block);
+
+ if (ret) {
+ // Got an entry, move to the next slot next time
+ state->slot = NULL;
+ }
+
+ if (ret == &IOQ_STOP) {
+ state->stop = true;
+ ret = NULL;
+ }
+
+ return ret;
+}
- ent->ret = -1;
+/** Prep a single SQE. */
+static void ioq_prep_sqe(struct io_uring_sqe *sqe, struct ioq_ent *ent) {
+ switch (ent->op) {
+ case IOQ_CLOSE:
+ io_uring_prep_close(sqe, ent->close.fd);
+ break;
+
+ case IOQ_OPENDIR:
+ io_uring_prep_openat(sqe, ent->opendir.dfd, ent->opendir.path, O_RDONLY | O_CLOEXEC | O_DIRECTORY, 0);
+ break;
+
+#if BFS_USE_UNWRAPDIR
+ case IOQ_CLOSEDIR:
+ io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir));
+ break;
+#endif
+
+ default:
+ bfs_bug("Unknown ioq_op %d", (int)ent->op);
+ io_uring_prep_nop(sqe);
+ break;
+ }
- switch (ent->op) {
- case IOQ_CLOSE:
- // Always close(), even if we're cancelled, just like a real EINTR
- ent->ret = xclose(ent->close.fd);
+ io_uring_sqe_set_data(sqe, ent);
+}
+
+/** Prep a batch of SQEs. */
+static bool ioq_ring_prep(struct ioq_ring_state *state) {
+ struct ioq *ioq = state->ioq;
+ struct io_uring *ring = state->ring;
+
+ while (io_uring_sq_space_left(ring)) {
+ struct ioq_ent *ent = ioq_ring_pop(state);
+ if (!ent) {
break;
+ }
+
+ if (ioq_check_cancel(ioq, ent)) {
+ continue;
+ }
- case IOQ_OPENDIR:
- if (!cancel) {
- struct ioq_opendir *args = &ent->opendir;
- ent->ret = bfs_opendir(args->dir, args->dfd, args->path);
- if (ent->ret == 0) {
- bfs_polldir(args->dir);
- }
+#if !BFS_USE_UNWRAPDIR
+ if (ent->op == IOQ_CLOSEDIR) {
+ ioq_handle(ioq, ent);
+ continue;
+ }
+#endif
+
+ struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+ ioq_prep_sqe(sqe, ent);
+ ++state->prepped;
+ }
+
+ return state->prepped || state->submitted;
+}
+
+/** Reap a batch of SQEs. */
+static void ioq_ring_reap(struct ioq_ring_state *state) {
+ struct ioq *ioq = state->ioq;
+ struct io_uring *ring = state->ring;
+
+ while (state->prepped) {
+ int ret = io_uring_submit_and_wait(ring, 1);
+ if (ret > 0) {
+ state->prepped -= ret;
+ state->submitted += ret;
+ }
+ }
+
+ while (state->submitted) {
+ struct io_uring_cqe *cqe;
+ if (io_uring_wait_cqe(ring, &cqe) < 0) {
+ continue;
+ }
+
+ struct ioq_ent *ent = io_uring_cqe_get_data(cqe);
+ ent->ret = cqe->res >= 0 ? cqe->res : -1;
+ ent->error = cqe->res < 0 ? -cqe->res : 0;
+ io_uring_cqe_seen(ring, cqe);
+ --state->submitted;
+
+ if (ent->op == IOQ_OPENDIR && ent->ret >= 0) {
+ int fd = ent->ret;
+ if (ioq_check_cancel(ioq, ent)) {
+ xclose(fd);
+ continue;
}
- break;
- case IOQ_CLOSEDIR:
- ent->ret = bfs_closedir(ent->closedir.dir);
- break;
+ ent->ret = bfs_opendir(ent->opendir.dir, fd, NULL);
+ if (ent->ret == 0) {
+ // TODO: io_uring_prep_getdents()
+ bfs_polldir(ent->opendir.dir);
+ } else {
+ ent->error = errno;
+ }
+ }
+
+ ioqq_push(ioq->ready, ent);
+ }
+}
+
+/** io_uring worker loop. */
+static void ioq_ring_work(struct ioq_thread *thread) {
+ struct ioq_ring_state state = {
+ .ioq = thread->parent,
+ .ring = &thread->ring,
+ };
- default:
- bfs_bug("Unknown ioq_op %d", (int)ent->op);
- errno = ENOSYS;
+ while (ioq_ring_prep(&state)) {
+ ioq_ring_reap(&state);
+ }
+}
+#endif
+
+/** Synchronous syscall loop. */
+static void ioq_sync_work(struct ioq_thread *thread) {
+ struct ioq *ioq = thread->parent;
+
+ while (true) {
+ struct ioq_ent *ent = ioqq_pop(ioq->pending);
+ if (ent == &IOQ_STOP) {
break;
}
- if (cancel) {
- ent->error = EINTR;
- } else if (ent->ret < 0) {
- ent->error = errno;
- } else {
- ent->error = 0;
+ if (!ioq_check_cancel(ioq, ent)) {
+ ioq_handle(ioq, ent);
}
+ }
+}
- ioqq_push(ioq->ready, ent);
+/** Background thread entry point. */
+static void *ioq_work(void *ptr) {
+ struct ioq_thread *thread = ptr;
+
+#if BFS_USE_LIBURING
+ if (thread->ring_err == 0) {
+ ioq_ring_work(thread);
+ return NULL;
}
+#endif
+ ioq_sync_work(thread);
return NULL;
}
@@ -376,7 +582,30 @@ struct ioq *ioq_create(size_t depth, size_t nthreads) {
}
for (size_t i = 0; i < nthreads; ++i) {
- if (thread_create(&ioq->threads[i], NULL, ioq_work, ioq) != 0) {
+ struct ioq_thread *thread = &ioq->threads[i];
+ thread->parent = ioq;
+
+#if BFS_USE_LIBURING
+ struct ioq_thread *prev = i ? &ioq->threads[i - 1] : NULL;
+ if (prev && prev->ring_err) {
+ thread->ring_err = prev->ring_err;
+ } else {
+ // Share io-wq workers between rings
+ struct io_uring_params params = {0};
+ if (prev) {
+ params.flags |= IORING_SETUP_ATTACH_WQ;
+ params.wq_fd = prev->ring.ring_fd;
+ }
+
+ size_t entries = depth / nthreads;
+ if (entries < 16) {
+ entries = 16;
+ }
+ thread->ring_err = -io_uring_queue_init_params(entries, &thread->ring, &params);
+ }
+#endif
+
+ if (thread_create(&thread->id, NULL, ioq_work, thread) != 0) {
goto fail;
}
++ioq->nthreads;
@@ -496,7 +725,11 @@ void ioq_destroy(struct ioq *ioq) {
ioq_cancel(ioq);
for (size_t i = 0; i < ioq->nthreads; ++i) {
- thread_join(ioq->threads[i], NULL);
+ struct ioq_thread *thread = &ioq->threads[i];
+ thread_join(thread->id, NULL);
+#if BFS_USE_LIBURING
+ io_uring_queue_exit(&thread->ring);
+#endif
}
ioqq_destroy(ioq->ready);