Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions fs/fuse/dev_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ MODULE_PARM_DESC(enable_uring,
#define FUSE_RING_HEADER_PG 0
#define FUSE_RING_PAYLOAD_PG 1

/* Threshold that determines if a better queue should be searched for */
#define FUSE_URING_Q_THRESHOLD 2

/* Number of (re)tries to find a better queue */
#define FUSE_URING_Q_TRIES 3

/* redfs only to allow patch backports */
#define IO_URING_F_TASK_DEAD (1 << 13)

Expand Down Expand Up @@ -1557,7 +1561,7 @@ static struct fuse_ring_queue *fuse_uring_select_queue(struct fuse_ring *ring,
bool background)
{
unsigned int qid;
int node, retries = 0;
int node, tries = 0;
unsigned int nr_queues;
unsigned int cpu = task_cpu(current);
struct fuse_ring_queue *queue, *primary_queue = NULL;
Expand All @@ -1582,26 +1586,36 @@ static struct fuse_ring_queue *fuse_uring_select_queue(struct fuse_ring *ring,

nr_queues = READ_ONCE(ring->numa_q_map[node].nr_queues);
if (nr_queues) {
/* prefer the queue that corresponds to the current cpu */
queue = READ_ONCE(ring->queues[cpu]);
if (queue) {
if (queue->nr_reqs <= FUSE_URING_Q_THRESHOLD)
return queue;
primary_queue = queue;
}

qid = ring->numa_q_map[node].cpu_to_qid[cpu];
if (WARN_ON_ONCE(qid >= ring->max_nr_queues))
return NULL;
queue = READ_ONCE(ring->queues[qid]);
if (qid != cpu) {
queue = READ_ONCE(ring->queues[qid]);

/* Might happen on teardown */
if (unlikely(!queue))
return NULL;
/* Might happen on teardown */
if (unlikely(!queue))
return NULL;

if (queue->nr_reqs < FUSE_URING_Q_THRESHOLD)
return queue;
if (queue->nr_reqs <= FUSE_URING_Q_THRESHOLD)
return queue;
}

/* Retries help for load balancing */
if (retries < FUSE_URING_Q_THRESHOLD) {
if (!retries)
if (tries < FUSE_URING_Q_TRIES && tries + 1 < nr_queues) {
if (!primary_queue)
primary_queue = queue;

/* Increase cpu, assuming it will map to a differet qid*/
/* Increase cpu, assuming it will map to a different qid*/
cpu++;
retries++;
tries++;
goto retry;
}
}
Expand All @@ -1612,9 +1626,10 @@ static struct fuse_ring_queue *fuse_uring_select_queue(struct fuse_ring *ring,

/* global registered queue bitmap */
qid = ring->q_map.cpu_to_qid[cpu];
if (WARN_ON_ONCE(qid >= ring->max_nr_queues))
/* Might happen on teardown */
if (WARN_ON_ONCE(qid >= ring->max_nr_queues)) {
/* Might happen on teardown */
return NULL;
}
return READ_ONCE(ring->queues[qid]);
}

Expand Down
43 changes: 32 additions & 11 deletions fs/fuse/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,18 @@ static void fuse_aio_complete(struct fuse_io_priv *io, int err, ssize_t pos)
struct inode *inode = file_inode(io->iocb->ki_filp);
struct fuse_conn *fc = get_fuse_conn(inode);
struct fuse_inode *fi = get_fuse_inode(inode);
struct address_space *mapping = io->iocb->ki_filp->f_mapping;

/*
* As in generic_file_direct_write(), invalidate after the
* write, to invalidate read-ahead cache that may have competed
* with the write.
*/
if (io->write && res && mapping->nrpages) {
invalidate_inode_pages2_range(mapping,
io->offset >> PAGE_SHIFT,
(io->offset + res - 1) >> PAGE_SHIFT);
}

spin_lock(&fi->lock);
fi->attr_version = atomic64_inc_return(&fc->attr_version);
Expand Down Expand Up @@ -1199,9 +1211,11 @@ static ssize_t fuse_send_write(struct fuse_io_args *ia, loff_t pos,
{
struct kiocb *iocb = ia->io->iocb;
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
struct fuse_file *ff = file->private_data;
struct fuse_mount *fm = ff->fm;
struct fuse_write_in *inarg = &ia->write.in;
ssize_t written;
ssize_t err;

fuse_write_args_fill(ia, ff, pos, count);
Expand All @@ -1215,10 +1229,26 @@ static ssize_t fuse_send_write(struct fuse_io_args *ia, loff_t pos,
return fuse_async_req_send(fm, ia, count);

err = fuse_simple_request(fm, &ia->ap.args);
if (!err && ia->write.out.size > count)
written = ia->write.out.size;
if (!err && written > count)
err = -EIO;

return err ?: ia->write.out.size;
/*
* Without FOPEN_DIRECT_IO, generic_file_direct_write() does the
* invalidation for us.
*/
if (!err && written && mapping->nrpages &&
(ff->open_flags & FOPEN_DIRECT_IO)) {
/*
* As in generic_file_direct_write(), invalidate after the
* write, to invalidate read-ahead cache that may have competed
* with the write.
*/
invalidate_inode_pages2_range(mapping, pos >> PAGE_SHIFT,
(pos + written - 1) >> PAGE_SHIFT);
}

return err ?: written;
}

bool fuse_write_update_attr(struct inode *inode, loff_t pos, ssize_t written)
Expand Down Expand Up @@ -1766,15 +1796,6 @@ ssize_t fuse_direct_io(struct fuse_io_priv *io, struct iov_iter *iter,
if (res > 0)
*ppos = pos;

if (res > 0 && write && fopen_direct_io) {
/*
* As in generic_file_direct_write(), invalidate after the
* write, to invalidate read-ahead cache that may have competed
* with the write.
*/
invalidate_inode_pages2_range(mapping, idx_from, idx_to);
}

return res > 0 ? res : err;
}
EXPORT_SYMBOL_GPL(fuse_direct_io);
Expand Down