summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2024-01-26 13:32:26 -0500
committerTavian Barnes <tavianator@tavianator.com>2024-01-31 17:00:28 -0500
commit43ec427535e2d21a3d8ec36d98889f1bc0515938 (patch)
treea75995d222b4f69fa6b983dbed1c79f34bb0dabf
parent6f0091981e38210e36ee830694f61cb695f2b99b (diff)
downloadbfs-43ec427535e2d21a3d8ec36d98889f1bc0515938.tar.xz
bftw: New bftw_queue abstraction
-rw-r--r--src/bftw.c366
1 files changed, 292 insertions, 74 deletions
diff --git a/src/bftw.c b/src/bftw.c
index 02dd051..ce08dba 100644
--- a/src/bftw.c
+++ b/src/bftw.c
@@ -9,6 +9,8 @@
*
* - struct bftw_list: A linked list of bftw_file's.
*
+ * - struct bftw_queue: A multi-stage queue of bftw_file's.
+ *
* - struct bftw_cache: An LRU list of bftw_file's with open file descriptors,
* used for openat() to minimize the amount of path re-traversals.
*
@@ -125,7 +127,7 @@ struct bftw_file {
/** The next file to open/close/visit. */
struct bftw_file *next;
/** The next directory to read. */
- struct { struct bftw_file *next; } to_read;
+ struct { struct bftw_file *next; } ready;
/** LRU list node. */
struct {
@@ -171,6 +173,254 @@ struct bftw_list {
};
/**
+ * bftw_queue flags.
+ */
+enum bftw_qflags {
+ /** Track the sync/async service balance. */
+ BFTW_QBALANCE = 1 << 0,
+ /** Buffer files before adding them to the queue. */
+ BFTW_QBUFFER = 1 << 1,
+ /** Use LIFO (stack/DFS) ordering. */
+ BFTW_QLIFO = 1 << 2,
+ /** Maintain a strict order. */
+ BFTW_QORDER = 1 << 3,
+};
+
+/**
+ * A queue of bftw_file's that may be serviced asynchronously.
+ *
+ * A bftw_queue comprises three linked lists each tracking different stages.
+ * When BFTW_QBUFFER is set, files are initially pushed to the buffer:
+ *
+ * ╔═══╗ ╔═══╦═══╗
+ * buffer: ║ 𝘩 ║ ║ 𝘩 ║ 𝘪 ║
+ * ╠═══╬═══╦═══╗ ╠═══╬═══╬═══╗
+ * waiting: ║ e ║ f ║ g ║ → ║ e ║ f ║ g ║
+ * ╠═══╬═══╬═══╬═══╗ ╠═══╬═══╬═══╬═══╗
+ * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║
+ * ╚═══╩═══╩═══╩═══╝ ╚═══╩═══╩═══╩═══╝
+ *
+ * When bftw_queue_flush() is called, the files in the buffer are appended to
+ * the waiting list (or prepended, if BFTW_QLIFO is set):
+ *
+ * ╔═╗
+ * buffer: ║ ║
+ * ╠═╩═╦═══╦═══╦═══╦═══╗
+ * waiting: ║ e ║ f ║ g ║ h ║ i ║
+ * ╠═══╬═══╬═══╬═══╬═══╝
+ * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║
+ * ╚═══╩═══╩═══╩═══╝
+ *
+ * Using the buffer gives a more natural ordering for BFTW_QLIFO, and allows
+ * files to be sorted before adding them to the waiting list. If BFTW_QBUFFER
+ * is not set, files are pushed directly to the waiting list instead.
+ *
+ * Files on the waiting list are waiting to be "serviced" asynchronously by the
+ * ioq (for example, by an ioq_opendir() or ioq_stat() call). While they are
+ * being serviced, they are detached from the queue by bftw_queue_detach() and
+ * are not tracked by the queue at all:
+ *
+ * ╔═╗
+ * buffer: ║ ║
+ * ╠═╩═╦═══╦═══╗ ⎛ ┌───┬───┐ ⎞
+ * waiting: ║ g ║ h ║ i ║ ⎜ ioq: │ 𝓮 │ 𝓯 │ ⎟
+ * ╠═══╬═══╬═══╬═══╗ ⎝ └───┴───┘ ⎠
+ * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║
+ * ╚═══╩═══╩═══╩═══╝
+ *
+ * When their async service is complete, files are reattached to the queue by
+ * bftw_queue_attach(), this time on the ready list:
+ *
+ * ╔═╗
+ * buffer: ║ ║
+ * ╠═╩═╦═══╦═══╗ ⎛ ┌───┐ ⎞
+ * waiting: ║ g ║ h ║ i ║ ⎜ ioq: │ 𝓮 │ ⎟
+ * ╠═══╬═══╬═══╬═══╦═══╗ ⎝ └───┘ ⎠
+ * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ 𝕗 ║
+ * ╚═══╩═══╩═══╩═══╩═══╝
+ *
+ * Files are added to the ready list in the order they are finished by the ioq.
+ * bftw_queue_pop() pops a file from the ready list if possible. Otherwise, it
+ * pops from the waiting list, and the file must be serviced synchronously.
+ *
+ * However, if BFTW_QORDER is set, files must be popped in the exact order they
+ * are added to the waiting list (to maintain sorted order). In this case,
+ * files are added to the waiting and ready lists at the same time. The
+ * file->ioqueued flag is set while it is in-service, so that bftw() can wait
+ * for it to be truly ready before using it.
+ *
+ * ╔═╗
+ * buffer: ║ ║
+ * ╠═╩═╦═══╦═══╗ ⎛ ┌───┐ ⎞
+ * waiting: ║ g ║ h ║ i ║ ⎜ ioq: │ 𝓮 │ ⎟
+ * ╠═══╬═══╬═══╬═══╦═══╦═══╦═══╦═══╦═══╗ ⎝ └───┘ ⎠
+ * ready: ║ 𝕒 ║ 𝕓 ║ 𝕔 ║ 𝕕 ║ 𝓮 ║ 𝕗 ║ g ║ h ║ i ║
+ * ╚═══╩═══╩═══╩═══╩═══╩═══╩═══╩═══╩═══╝
+ *
+ * If BFTW_QBALANCE is set, queue->imbalance tracks the delta between async
+ * service (negative) and synchronous service (positive). The queue is
+ * considered "balanced" when this number is non-negative. Only a balanced
+ * queue will perform any async service, ensuring work is fairly distributed
+ * between the main thread and the ioq.
+ *
+ * BFTW_QBALANCE is only set for single-threaded ioqs. When an ioq has multiple
+ * threads, it is faster to wait for the ioq to complete an operation than it is
+ * to perform it on the main thread.
+ */
+struct bftw_queue {
+ /** Queue flags. */
+ enum bftw_qflags flags;
+ /** A buffer of files to be enqueued together. */
+ struct bftw_list buffer;
+ /** A list of files which are waiting to be serviced. */
+ struct bftw_list waiting;
+ /** A list of already-serviced files. */
+ struct bftw_list ready;
+ /** Tracks the imbalance between synchronous and async service. */
+ unsigned long imbalance;
+};
+
+/** Initialize a queue. */
+static void bftw_queue_init(struct bftw_queue *queue, enum bftw_qflags flags) {
+ queue->flags = flags;
+ SLIST_INIT(&queue->buffer);
+ SLIST_INIT(&queue->waiting);
+ SLIST_INIT(&queue->ready);
+ queue->imbalance = 0;
+}
+
+/** Add a file to the queue. */
+static void bftw_queue_push(struct bftw_queue *queue, struct bftw_file *file) {
+ if (queue->flags & BFTW_QBUFFER) {
+ SLIST_APPEND(&queue->buffer, file);
+ } else if (queue->flags & BFTW_QLIFO) {
+ SLIST_PREPEND(&queue->waiting, file);
+ if (queue->flags & BFTW_QORDER) {
+ SLIST_PREPEND(&queue->ready, file, ready);
+ }
+ } else {
+ SLIST_APPEND(&queue->waiting, file);
+ if (queue->flags & BFTW_QORDER) {
+ SLIST_APPEND(&queue->ready, file, ready);
+ }
+ }
+}
+
+/** Add any buffered files to the queue. */
+static void bftw_queue_flush(struct bftw_queue *queue) {
+ if (!(queue->flags & BFTW_QBUFFER)) {
+ return;
+ }
+
+ if (queue->flags & BFTW_QORDER) {
+ // When sorting, add files to the ready list at the same time
+ // (and in the same order) as they are added to the waiting list
+ struct bftw_file **cursor = (queue->flags & BFTW_QLIFO)
+ ? &queue->ready.head
+ : queue->ready.tail;
+ for_slist (struct bftw_file, file, &queue->buffer) {
+ cursor = SLIST_INSERT(&queue->ready, cursor, file, ready);
+ }
+ }
+
+ if (queue->flags & BFTW_QLIFO) {
+ SLIST_EXTEND(&queue->buffer, &queue->waiting);
+ }
+
+ SLIST_EXTEND(&queue->waiting, &queue->buffer);
+}
+
+/** Update the queue imbalance. */
+static void bftw_queue_balance(struct bftw_queue *queue, long delta) {
+ queue->imbalance += delta;
+}
+
+/** Check if the queue is properly balanced for async work. */
+static bool bftw_queue_balanced(const struct bftw_queue *queue) {
+ if (queue->flags & BFTW_QBALANCE) {
+ return (long)queue->imbalance >= 0;
+ } else {
+ return true;
+ }
+}
+
+/** Detatch the next waiting file to service it asynchronously. */
+static void bftw_queue_detach(struct bftw_queue *queue, struct bftw_file *file) {
+ if (file == SLIST_HEAD(&queue->buffer)) {
+ // To maintain order, we can't detach any files until they're
+ // added to the waiting/ready lists
+ bfs_assert(!(queue->flags & BFTW_QORDER));
+ SLIST_POP(&queue->buffer);
+ } else if (file == SLIST_HEAD(&queue->waiting)) {
+ SLIST_POP(&queue->waiting);
+ } else {
+ bfs_bug("Detached file was not buffered or waiting");
+ }
+
+ file->ioqueued = true;
+ bftw_queue_balance(queue, -1);
+}
+
+/** Reattach a serviced file to the queue. */
+static void bftw_queue_attach(struct bftw_queue *queue, struct bftw_file *file) {
+ file->ioqueued = false;
+
+ if (!(queue->flags & BFTW_QORDER)) {
+ SLIST_APPEND(&queue->ready, file, ready);
+ }
+}
+
+/** Get the next waiting file. */
+static struct bftw_file *bftw_queue_waiting(const struct bftw_queue *queue) {
+ if (!(queue->flags & BFTW_QBUFFER)) {
+ return SLIST_HEAD(&queue->waiting);
+ }
+
+ if (queue->flags & BFTW_QORDER) {
+ // Don't detach files until they're on the waiting/ready lists
+ return SLIST_HEAD(&queue->waiting);
+ }
+
+ const struct bftw_list *prefix = &queue->waiting;
+ const struct bftw_list *suffix = &queue->buffer;
+ if (queue->flags & BFTW_QLIFO) {
+ prefix = &queue->buffer;
+ suffix = &queue->waiting;
+ }
+
+ struct bftw_file *file = SLIST_HEAD(prefix);
+ if (!file) {
+ file = SLIST_HEAD(suffix);
+ }
+ return file;
+}
+
+/** Get the next ready file. */
+static struct bftw_file *bftw_queue_ready(const struct bftw_queue *queue) {
+ return SLIST_HEAD(&queue->ready);
+}
+
+/** Pop a file from the queue. */
+static struct bftw_file *bftw_queue_pop(struct bftw_queue *queue) {
+ // Don't pop until we've had a chance to sort the buffer
+ bfs_assert(SLIST_EMPTY(&queue->buffer));
+
+ struct bftw_file *file = SLIST_POP(&queue->ready, ready);
+
+ if (!file || file == SLIST_HEAD(&queue->waiting)) {
+ // If no files are ready, try the waiting list. Or, if
+ // BFTW_QORDER is set, we may need to pop from both lists.
+ file = SLIST_POP(&queue->waiting);
+ if (file) {
+ // This file will be serviced synchronously
+ bftw_queue_balance(queue, +1);
+ }
+ }
+
+ return file;
+}
+
+/**
* A cache of open directories.
*/
struct bftw_cache {
@@ -354,7 +604,7 @@ static struct bftw_file *bftw_file_new(struct bftw_cache *cache, struct bftw_fil
}
SLIST_ITEM_INIT(file);
- SLIST_ITEM_INIT(file, to_read);
+ SLIST_ITEM_INIT(file, ready);
LIST_ITEM_INIT(file, lru);
file->refcount = 1;
@@ -430,17 +680,11 @@ struct bftw_state {
struct ioq *ioq;
/** The number of I/O threads. */
size_t nthreads;
- /** Tracks the imbalance between main thread and background I/O. */
- long imbalance;
-
- /** A batch of directories to open. */
- struct bftw_list dir_batch;
- /** The queue of directories to open. */
- struct bftw_list to_open;
- /** The queue of directories to read. */
- struct bftw_list to_read;
+
/** The queue of unpinned directories to unwrap. */
struct bftw_list to_close;
+ /** The queue of directories to open/read. */
+ struct bftw_queue dirq;
/** A batch of files to enqueue. */
struct bftw_list file_batch;
@@ -543,13 +787,22 @@ static int bftw_state_init(struct bftw_state *state, const struct bftw_args *arg
state->dir_flags |= BFS_DIR_WHITEOUTS;
}
- state->imbalance = 0;
-
- SLIST_INIT(&state->dir_batch);
- SLIST_INIT(&state->to_open);
- SLIST_INIT(&state->to_read);
SLIST_INIT(&state->to_close);
+ enum bftw_qflags qflags = 0;
+ if (state->strategy != BFTW_BFS && !(state->flags & BFTW_BUFFER)) {
+ // For a depth-first, unbuffered search, queue directories in
+ // LIFO order
+ qflags |= BFTW_QBUFFER | BFTW_QLIFO;
+ }
+ if (state->flags & BFTW_SORT) {
+ qflags |= BFTW_QORDER;
+ }
+ if (nthreads == 1) {
+ qflags |= BFTW_QBALANCE;
+ }
+ bftw_queue_init(&state->dirq, qflags);
+
SLIST_INIT(&state->file_batch);
SLIST_INIT(&state->to_visit);
@@ -575,12 +828,6 @@ static void bftw_unpin_dir(struct bftw_state *state, struct bftw_file *file, boo
}
}
-/** Adjust the I/O queue balance. */
-static void bftw_ioq_balance(struct bftw_state *state, long delta) {
- // Avoid signed overflow
- state->imbalance = (unsigned long)state->imbalance + (unsigned long)delta;
-}
-
/** Pop a response from the I/O queue. */
static int bftw_ioq_pop(struct bftw_state *state, bool block) {
struct ioq *ioq = state->ioq;
@@ -612,7 +859,6 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
case IOQ_OPENDIR:
file = ent->ptr;
- file->ioqueued = false;
++cache->capacity;
parent = file->parent;
@@ -627,9 +873,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
bftw_freedir(cache, dir);
}
- if (!(state->flags & BFTW_SORT)) {
- SLIST_APPEND(&state->to_read, file, to_read);
- }
+ bftw_queue_attach(&state->dirq, file);
break;
case IOQ_STAT:
@@ -647,19 +891,14 @@ static int bftw_ioq_reserve(struct bftw_state *state) {
return -1;
}
- // With only one background thread, we should balance I/O between it and
- // the main thread. With more than one background thread, it's faster
- // to wait on background I/O than it is to do it on the main thread.
- bool balance = state->nthreads <= 1;
- if (balance && state->imbalance < 0) {
- return -1;
- }
-
if (ioq_capacity(ioq) > 0) {
return 0;
}
- if (bftw_ioq_pop(state, !balance) < 0) {
+ // With more than one background thread, it's faster to wait on
+ // background I/O than it is to do it on the main thread
+ bool block = state->nthreads > 1;
+ if (bftw_ioq_pop(state, block) < 0) {
return -1;
}
@@ -892,9 +1131,7 @@ static int bftw_ioq_opendir(struct bftw_state *state, struct bftw_file *file) {
goto free;
}
- file->ioqueued = true;
--cache->capacity;
- bftw_ioq_balance(state, -1);
return 0;
free:
@@ -908,39 +1145,26 @@ fail:
}
/** Open a batch of directories asynchronously. */
-static void bftw_ioq_opendirs(struct bftw_state *state, struct bftw_list *queue) {
- for_slist (struct bftw_file, dir, queue) {
- if (bftw_ioq_opendir(state, dir) != 0) {
+static void bftw_ioq_opendirs(struct bftw_state *state) {
+ while (bftw_queue_balanced(&state->dirq)) {
+ struct bftw_file *dir = bftw_queue_waiting(&state->dirq);
+ if (!dir) {
+ break;
+ }
+
+ if (bftw_ioq_opendir(state, dir) == 0) {
+ bftw_queue_detach(&state->dirq, dir);
+ } else {
break;
}
- SLIST_POP(queue);
}
}
/** Push a directory onto the queue. */
static void bftw_push_dir(struct bftw_state *state, struct bftw_file *file) {
bfs_assert(file->type == BFS_DIR);
-
- struct bftw_list *queue;
- if (state->strategy == BFTW_BFS || (state->flags & BFTW_BUFFER)) {
- // In breadth-first mode, or if we're already buffering files,
- // we can push directly to the to_open queue
- queue = &state->to_open;
- } else {
- // For a depth-first, unbuffered search, add directories to a
- // batch, then push the patch to the front of the queue
- queue = &state->dir_batch;
- }
-
- SLIST_APPEND(queue, file);
-
- if (state->flags & BFTW_SORT) {
- // When sorting, directories are kept in order on the to_read
- // list; otherwise, they are only added once they are open
- SLIST_APPEND(&state->to_read, file, to_read);
- }
-
- bftw_ioq_opendirs(state, queue);
+ bftw_queue_push(&state->dirq, file);
+ bftw_ioq_opendirs(state);
}
/** Pop a directory to read from the queue. */
@@ -956,9 +1180,9 @@ static bool bftw_pop_dir(struct bftw_state *state) {
return false;
}
} else {
- while (SLIST_EMPTY(&state->to_read)) {
+ while (!bftw_queue_ready(&state->dirq)) {
// Block if we have no other files/dirs to visit, or no room in the cache
- bool have_dirs = !SLIST_EMPTY(&state->to_open);
+ bool have_dirs = bftw_queue_waiting(&state->dirq);
bool have_room = cache->capacity > 0 && cache->dirlimit > 0;
bool block = !(have_dirs || have_files) || !have_room;
@@ -968,10 +1192,7 @@ static bool bftw_pop_dir(struct bftw_state *state) {
}
}
- struct bftw_file *file = SLIST_POP(&state->to_read, to_read);
- if (!file || file == state->to_open.head) {
- file = SLIST_POP(&state->to_open);
- }
+ struct bftw_file *file = bftw_queue_pop(&state->dirq);
if (!file) {
return false;
}
@@ -1051,8 +1272,6 @@ static struct bfs_dir *bftw_file_opendir(struct bftw_state *state, struct bftw_f
return NULL;
}
- bftw_ioq_balance(state, +1);
-
if (bfs_opendir(dir, fd, NULL, state->dir_flags) != 0) {
bftw_freedir(cache, dir);
return NULL;
@@ -1428,14 +1647,13 @@ static void bftw_batch_finish(struct bftw_state *state) {
}
if (state->strategy != BFTW_BFS) {
- SLIST_EXTEND(&state->dir_batch, &state->to_open);
SLIST_EXTEND(&state->file_batch, &state->to_visit);
}
- SLIST_EXTEND(&state->to_open, &state->dir_batch);
SLIST_EXTEND(&state->to_visit, &state->file_batch);
- bftw_ioq_opendirs(state, &state->to_open);
+ bftw_queue_flush(&state->dirq);
+ bftw_ioq_opendirs(state);
}
/** Close the current directory. */
@@ -1525,7 +1743,7 @@ static int bftw_state_destroy(struct bftw_state *state) {
state->ioq = NULL;
}
- SLIST_EXTEND(&state->to_open, &state->dir_batch);
+ bftw_queue_flush(&state->dirq);
SLIST_EXTEND(&state->to_visit, &state->file_batch);
do {
bftw_gc(state, BFTW_VISIT_NONE);