diff options
-rw-r--r-- | bench/ioq.c | 1 | ||||
-rw-r--r-- | src/bftw.c | 5 | ||||
-rw-r--r-- | src/ioq.c | 49 | ||||
-rw-r--r-- | src/ioq.h | 5 | ||||
-rw-r--r-- | tests/ioq.c | 1 |
5 files changed, 44 insertions, 17 deletions
diff --git a/bench/ioq.c b/bench/ioq.c index 0886e9e..b3fbfbf 100644 --- a/bench/ioq.c +++ b/bench/ioq.c @@ -304,6 +304,7 @@ int main(int argc, char *argv[]) { break; } } + ioq_submit(ioq); } while (pop(ioq, &lap, true)); @@ -1008,6 +1008,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) { return -1; } + ioq_submit(ioq); struct ioq_ent *ent = ioq_pop(ioq, block); if (!ent) { return -1; @@ -1957,6 +1958,10 @@ static void bftw_flush(struct bftw_state *state) { bftw_queue_flush(&state->dirq); bftw_ioq_opendirs(state); + + if (state->ioq) { + ioq_submit(state->ioq); + } } /** Close the current directory. */ @@ -428,13 +428,6 @@ static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t s } while (size > 0); } -/** Pop an entry from the queue. */ -static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) { - size_t i = fetch_add(&ioqq->tail, 1, relaxed); - ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask]; - return ioq_slot_pop(ioqq, slot, block); -} - /** Pop a batch of entries from the queue. */ static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) { size_t mask = ioqq->slot_mask; @@ -468,7 +461,7 @@ static void ioq_batch_reset(struct ioq_batch *batch) { /** Check if a batch is empty. */ static bool ioq_batch_empty(const struct ioq_batch *batch) { - return batch->head == batch->tail; + return batch->head >= batch->tail; } /** Send a batch to a queue. */ @@ -506,6 +499,15 @@ static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool bloc /** Pop an entry from a batch, filling it first if necessary. */ static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) { if (ioq_batch_empty(batch)) { + // For non-blocking pops, make sure that each ioq_batch_pop() + // corresponds to a single (amortized) increment of ioqq->head. + // Otherwise, we start skipping many slots and batching ends up + // degrading performance. + if (!block && batch->head < IOQ_BATCH) { + ++batch->head; + return NULL; + } + if (!ioq_batch_fill(ioqq, batch, block)) { return NULL; } @@ -560,11 +562,16 @@ struct ioq { struct arena xbufs; #endif - /** Pending I/O requests. */ + /** Pending I/O request queue. */ struct ioqq *pending; - /** Ready I/O responses. */ + /** Ready I/O response queue. */ struct ioqq *ready; + /** Pending request batch. */ + struct ioq_batch pending_batch; + /** Ready request batch. */ + struct ioq_batch ready_batch; + /** The number of background threads. */ size_t nthreads; /** The background threads themselves. */ @@ -1175,7 +1182,7 @@ int ioq_nop(struct ioq *ioq, enum ioq_nop_type type, void *ptr) { ent->nop.type = type; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1187,7 +1194,7 @@ int ioq_close(struct ioq *ioq, int fd, void *ptr) { ent->close.fd = fd; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1203,7 +1210,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path, args->path = path; args->flags = flags; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1215,7 +1222,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) { ent->closedir.dir = dir; - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } @@ -1239,16 +1246,23 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla } #endif - ioqq_push(ioq->pending, ent); + ioq_batch_push(ioq->pending, &ioq->pending_batch, ent); return 0; } +void ioq_submit(struct ioq *ioq) { + ioq_batch_flush(ioq->pending, &ioq->pending_batch); +} + struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) { + // Don't forget to submit before popping + bfs_assert(ioq_batch_empty(&ioq->pending_batch)); + if (ioq->size == 0) { return NULL; } - return ioqq_pop(ioq->ready, block); + return ioq_batch_pop(ioq->ready, &ioq->ready_batch, block); } void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { @@ -1266,7 +1280,8 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) { void ioq_cancel(struct ioq *ioq) { if (!exchange(&ioq->cancel, true, relaxed)) { - ioqq_push(ioq->pending, &IOQ_STOP); + ioq_batch_push(ioq->pending, &ioq->pending_batch, &IOQ_STOP); + ioq_submit(ioq); } } @@ -190,6 +190,11 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr); int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags flags, struct bfs_stat *buf, void *ptr); /** + * Submit any buffered requests. + */ +void ioq_submit(struct ioq *ioq); + +/** * Pop a response from the queue. * * @ioq diff --git a/tests/ioq.c b/tests/ioq.c index f067436..1a0da97 100644 --- a/tests/ioq.c +++ b/tests/ioq.c @@ -49,6 +49,7 @@ static void check_ioq_push_block(void) { int ret = ioq_opendir(ioq, dir, AT_FDCWD, ".", 0, NULL); bfs_everify(ret == 0, "ioq_opendir()"); } + ioq_submit(ioq); bfs_verify(ioq_capacity(ioq) == 0); // Now cancel the queue, pushing an additional IOQ_STOP message |