summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bench/ioq.c1
-rw-r--r--src/bftw.c5
-rw-r--r--src/ioq.c49
-rw-r--r--src/ioq.h5
-rw-r--r--tests/ioq.c1
5 files changed, 44 insertions, 17 deletions
diff --git a/bench/ioq.c b/bench/ioq.c
index 0886e9e..b3fbfbf 100644
--- a/bench/ioq.c
+++ b/bench/ioq.c
@@ -304,6 +304,7 @@ int main(int argc, char *argv[]) {
break;
}
}
+ ioq_submit(ioq);
}
while (pop(ioq, &lap, true));
diff --git a/src/bftw.c b/src/bftw.c
index 5725825..f822456 100644
--- a/src/bftw.c
+++ b/src/bftw.c
@@ -1008,6 +1008,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
return -1;
}
+ ioq_submit(ioq);
struct ioq_ent *ent = ioq_pop(ioq, block);
if (!ent) {
return -1;
@@ -1957,6 +1958,10 @@ static void bftw_flush(struct bftw_state *state) {
bftw_queue_flush(&state->dirq);
bftw_ioq_opendirs(state);
+
+ if (state->ioq) {
+ ioq_submit(state->ioq);
+ }
}
/** Close the current directory. */
diff --git a/src/ioq.c b/src/ioq.c
index 763f5ba..1280598 100644
--- a/src/ioq.c
+++ b/src/ioq.c
@@ -428,13 +428,6 @@ static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t s
} while (size > 0);
}
-/** Pop an entry from the queue. */
-static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) {
- size_t i = fetch_add(&ioqq->tail, 1, relaxed);
- ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
- return ioq_slot_pop(ioqq, slot, block);
-}
-
/** Pop a batch of entries from the queue. */
static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) {
size_t mask = ioqq->slot_mask;
@@ -468,7 +461,7 @@ static void ioq_batch_reset(struct ioq_batch *batch) {
/** Check if a batch is empty. */
static bool ioq_batch_empty(const struct ioq_batch *batch) {
- return batch->head == batch->tail;
+ return batch->head >= batch->tail;
}
/** Send a batch to a queue. */
@@ -506,6 +499,15 @@ static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool bloc
/** Pop an entry from a batch, filling it first if necessary. */
static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) {
if (ioq_batch_empty(batch)) {
+ // For non-blocking pops, make sure that each ioq_batch_pop()
+ // corresponds to a single (amortized) increment of ioqq->head.
+ // Otherwise, we start skipping many slots and batching ends up
+ // degrading performance.
+ if (!block && batch->head < IOQ_BATCH) {
+ ++batch->head;
+ return NULL;
+ }
+
if (!ioq_batch_fill(ioqq, batch, block)) {
return NULL;
}
@@ -560,11 +562,16 @@ struct ioq {
struct arena xbufs;
#endif
- /** Pending I/O requests. */
+ /** Pending I/O request queue. */
struct ioqq *pending;
- /** Ready I/O responses. */
+ /** Ready I/O response queue. */
struct ioqq *ready;
+ /** Pending request batch. */
+ struct ioq_batch pending_batch;
+ /** Ready request batch. */
+ struct ioq_batch ready_batch;
+
/** The number of background threads. */
size_t nthreads;
/** The background threads themselves. */
@@ -1175,7 +1182,7 @@ int ioq_nop(struct ioq *ioq, enum ioq_nop_type type, void *ptr) {
ent->nop.type = type;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1187,7 +1194,7 @@ int ioq_close(struct ioq *ioq, int fd, void *ptr) {
ent->close.fd = fd;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1203,7 +1210,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path,
args->path = path;
args->flags = flags;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1215,7 +1222,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {
ent->closedir.dir = dir;
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
@@ -1239,16 +1246,23 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla
}
#endif
- ioqq_push(ioq->pending, ent);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}
+void ioq_submit(struct ioq *ioq) {
+ ioq_batch_flush(ioq->pending, &ioq->pending_batch);
+}
+
struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) {
+ // Don't forget to submit before popping
+ bfs_assert(ioq_batch_empty(&ioq->pending_batch));
+
if (ioq->size == 0) {
return NULL;
}
- return ioqq_pop(ioq->ready, block);
+ return ioq_batch_pop(ioq->ready, &ioq->ready_batch, block);
}
void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
@@ -1266,7 +1280,8 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
void ioq_cancel(struct ioq *ioq) {
if (!exchange(&ioq->cancel, true, relaxed)) {
- ioqq_push(ioq->pending, &IOQ_STOP);
+ ioq_batch_push(ioq->pending, &ioq->pending_batch, &IOQ_STOP);
+ ioq_submit(ioq);
}
}
diff --git a/src/ioq.h b/src/ioq.h
index da0a525..5eaa066 100644
--- a/src/ioq.h
+++ b/src/ioq.h
@@ -190,6 +190,11 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr);
int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags flags, struct bfs_stat *buf, void *ptr);
/**
+ * Submit any buffered requests.
+ */
+void ioq_submit(struct ioq *ioq);
+
+/**
* Pop a response from the queue.
*
* @ioq
diff --git a/tests/ioq.c b/tests/ioq.c
index f067436..1a0da97 100644
--- a/tests/ioq.c
+++ b/tests/ioq.c
@@ -49,6 +49,7 @@ static void check_ioq_push_block(void) {
int ret = ioq_opendir(ioq, dir, AT_FDCWD, ".", 0, NULL);
bfs_everify(ret == 0, "ioq_opendir()");
}
+ ioq_submit(ioq);
bfs_verify(ioq_capacity(ioq) == 0);
// Now cancel the queue, pushing an additional IOQ_STOP message