summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ioq.c131
1 files changed, 84 insertions, 47 deletions
diff --git a/src/ioq.c b/src/ioq.c
index f71be68..73883dd 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -450,30 +450,68 @@ static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t si
#define IOQ_BATCH (FALSE_SHARING_SIZE / sizeof(ioq_slot))
/**
- * A batch of entries to send all at once.
+ * A batch of I/O queue entries.
*/
struct ioq_batch {
- /** The current batch size. */
- size_t size;
+ /** The start of the batch. */
+ size_t head;
+ /** The end of the batch. */
+ size_t tail;
/** The array of entries. */
struct ioq_ent *entries[IOQ_BATCH];
};
-/** Send the batch to a queue. */
+/** Reset a batch. */
+static void ioq_batch_reset(struct ioq_batch *batch) {
+ batch->head = batch->tail = 0;
+}
+
+/** Check if a batch is empty. */
+static bool ioq_batch_empty(const struct ioq_batch *batch) {
+ return batch->head == batch->tail;
+}
+
+/** Send a batch to a queue. */
static void ioq_batch_flush(struct ioqq *ioqq, struct ioq_batch *batch) {
- if (batch->size > 0) {
- ioqq_push_batch(ioqq, batch->entries, batch->size);
- batch->size = 0;
+ if (batch->tail > 0) {
+ ioqq_push_batch(ioqq, batch->entries, batch->tail);
+ ioq_batch_reset(batch);
}
}
-/** An an entry to a batch, flushing if necessary. */
+/** Push an entry to a batch, flushing if necessary. */
static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct ioq_ent *ent) {
- if (batch->size >= IOQ_BATCH) {
+ batch->entries[batch->tail++] = ent;
+
+ if (batch->tail >= IOQ_BATCH) {
ioq_batch_flush(ioqq, batch);
}
+}
+
+/** Fill a batch from a queue. */
+static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool block) {
+ ioqq_pop_batch(ioqq, batch->entries, IOQ_BATCH, block);
+
+ ioq_batch_reset(batch);
+ for (size_t i = 0; i < IOQ_BATCH; ++i) {
+ struct ioq_ent *ent = batch->entries[i];
+ if (ent) {
+ batch->entries[batch->tail++] = ent;
+ }
+ }
+
+ return batch->tail > 0;
+}
+
+/** Pop an entry from a batch, filling it first if necessary. */
+static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) {
+ if (ioq_batch_empty(batch)) {
+ if (!ioq_batch_fill(ioqq, batch, block)) {
+ return NULL;
+ }
+ }
- batch->entries[batch->size++] = ent;
+ return batch->entries[batch->head++];
}
/** Sentinel stop command. */
@@ -665,7 +703,7 @@ static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, str
/** Check if ioq_ring_reap() has work to do. */
static bool ioq_ring_empty(struct ioq_ring_state *state) {
- return !state->prepped && !state->submitted && !state->ready.size;
+ return !state->prepped && !state->submitted && ioq_batch_empty(&state->ready);
}
/** Prep a single SQE. */
@@ -694,31 +732,31 @@ static bool ioq_ring_prep(struct ioq_ring_state *state) {
struct ioq *ioq = state->ioq;
struct io_uring *ring = state->ring;
- struct ioq_ent *pending[IOQ_BATCH];
- while (io_uring_sq_space_left(ring) >= IOQ_BATCH) {
- bool block = ioq_ring_empty(state);
- ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, block);
-
- bool any = false;
- for (size_t i = 0; i < IOQ_BATCH; ++i) {
- struct ioq_ent *ent = pending[i];
- if (ent == &IOQ_STOP) {
- ioqq_push(ioq->pending, &IOQ_STOP);
- state->stop = true;
- goto done;
- } else if (ent) {
- ioq_prep_sqe(state, ent);
- any = true;
+ struct ioq_batch pending;
+ ioq_batch_reset(&pending);
+
+ while (true) {
+ if (ioq_batch_empty(&pending)) {
+ if (io_uring_sq_space_left(ring) < IOQ_BATCH) {
+ break;
}
}
- if (!any) {
+ bool block = ioq_ring_empty(state);
+ struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, block);
+ if (!ent) {
+ break;
+ } else if (ent == &IOQ_STOP) {
+ ioqq_push(ioq->pending, ent);
+ state->stop = true;
break;
}
+
+ ioq_prep_sqe(state, ent);
}
-done:
+ bfs_assert(ioq_batch_empty(&pending));
return !ioq_ring_empty(state);
}
@@ -816,30 +854,29 @@ static void ioq_ring_work(struct ioq_thread *thread) {
static void ioq_sync_work(struct ioq_thread *thread) {
struct ioq *ioq = thread->parent;
- bool stop = false;
- while (!stop) {
- struct ioq_ent *pending[IOQ_BATCH];
- ioqq_pop_batch(ioq->pending, pending, IOQ_BATCH, true);
+ struct ioq_batch pending, ready;
+ ioq_batch_reset(&pending);
+ ioq_batch_reset(&ready);
- struct ioq_batch ready;
- ready.size = 0;
+ while (true) {
+ if (ioq_batch_empty(&pending)) {
+ ioq_batch_flush(ioq->ready, &ready);
+ }
- for (size_t i = 0; i < IOQ_BATCH; ++i) {
- struct ioq_ent *ent = pending[i];
- if (ent == &IOQ_STOP) {
- ioqq_push(ioq->pending, &IOQ_STOP);
- stop = true;
- break;
- } else if (ent) {
- if (!ioq_check_cancel(ioq, ent)) {
- ioq_dispatch_sync(ioq, ent);
- }
- ioq_batch_push(ioq->ready, &ready, ent);
- }
+ struct ioq_ent *ent = ioq_batch_pop(ioq->pending, &pending, true);
+ if (ent == &IOQ_STOP) {
+ ioqq_push(ioq->pending, ent);
+ break;
}
- ioq_batch_flush(ioq->ready, &ready);
+ if (!ioq_check_cancel(ioq, ent)) {
+ ioq_dispatch_sync(ioq, ent);
+ }
+ ioq_batch_push(ioq->ready, &ready, ent);
}
+
+ bfs_assert(ioq_batch_empty(&pending));
+ ioq_batch_flush(ioq->ready, &ready);
}
/** Background thread entry point. */