Skip to content

Commit

Permalink
PWX-18756 fix race in restart and new req processing (#190)
Browse files Browse the repository at this point in the history
* address race between restart and new req processing

Signed-off-by: Lakshmi Narasimhan Sundararajan <lns@portworx.com>
  • Loading branch information
sulakshm authored Feb 25, 2021
1 parent c38f514 commit 07f1b44
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
31 changes: 16 additions & 15 deletions dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ static void fuse_put_unique(struct fuse_conn *fc, u64 uid)
{
struct fuse_per_cpu_ids *my_ids;
int num_free;
int cpu = get_cpu();
int cpu;

if (uid == 0) {
return;
}

cpu = get_cpu();

my_ids = per_cpu_ptr(fc->per_cpu_ids, cpu);

Expand Down Expand Up @@ -176,6 +182,9 @@ static void queue_request(struct fuse_conn *fc, struct fuse_req *req)
struct rdwr_in *rdwr;
struct fuse_queue_cb *cb = &fc->queue->requests_cb;

req->in.unique = fuse_get_unique(fc);
fc->request_map[req->in.unique & (FUSE_MAX_REQUEST_IDS - 1)] = req;

spin_lock(&cb->w.lock);
write = cb->w.write;
if (write - cb->w.read >= FUSE_REQUEST_QUEUE_SIZE) {
Expand Down Expand Up @@ -222,24 +231,16 @@ static void request_end(struct fuse_conn *fc, struct fuse_req *req,
if (shouldfree) fuse_request_free(req);
}

void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req, bool force)
void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req)
{
req->in.unique = fuse_get_unique(fc);
fc->request_map[req->in.unique & (FUSE_MAX_REQUEST_IDS - 1)] = req;

/*
* Ensures checking the value of allow_disconnected and adding request to
* queue is done atomically.
*/
rcu_read_lock();

if (force) {
queue_request(fc, req);
if (fc->connected || fc->allow_disconnected) {
fuse_conn_wakeup(fc);
}
rcu_read_unlock();
} else if (fc->connected || fc->allow_disconnected) {
// 'allow_disconnected' check subsumes 'connected' as well
if (READ_ONCE(fc->allow_disconnected)) {
queue_request(fc, req);
rcu_read_unlock();

Expand Down Expand Up @@ -1267,8 +1268,8 @@ struct fuse_conn *fuse_conn_get(struct fuse_conn *fc)
void fuse_abort_conn(struct fuse_conn *fc)
{
spin_lock(&fc->lock);
if (fc->connected) {
fc->connected = 0;
if (READ_ONCE(fc->connected)) {
WRITE_ONCE(fc->connected, 0);
fuse_end_queued_requests(fc);
wake_up_all(&fc->waitq);
kill_fasync(&fc->fasync, SIGIO, POLL_IN);
Expand All @@ -1281,7 +1282,7 @@ int fuse_dev_release(struct inode *inode, struct file *file)
struct fuse_conn *fc = fuse_get_conn(file);
if (fc) {
spin_lock(&fc->lock);
fc->connected = 0;
WRITE_ONCE(fc->connected, 0);
fuse_end_queued_requests(fc);
spin_unlock(&fc->lock);
fuse_conn_put(fc);
Expand Down
2 changes: 1 addition & 1 deletion fuse_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ struct fuse_req *fuse_get_req_for_background(struct fuse_conn *fc);
/**
* Send a request in the background
*/
void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req, bool force);
void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req);

/* Abort all requests */
void fuse_abort_conn(struct fuse_conn *fc);
Expand Down
44 changes: 23 additions & 21 deletions pxd.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static int pxd_open(struct block_device *bdev, fmode_t mode)
int err = 0;

spin_lock(&fc->lock);
if (!fc->connected) {
if (!READ_ONCE(fc->connected)) {
err = -ENXIO;
} else {
spin_lock(&pxd_dev->lock);
Expand Down Expand Up @@ -131,7 +131,7 @@ static long pxd_ioctl_dump_fc_info(void)
}
printk(KERN_INFO "%s: pxd_ctx: %s ndevices: %lu",
__func__, ctx->name, ctx->num_devices);
printk(KERN_INFO "\tFC: connected: %d", ctx->fc.connected);
printk(KERN_INFO "\tFC: connected: %d", READ_ONCE(ctx->fc.connected));
}
return 0;
}
Expand Down Expand Up @@ -843,7 +843,7 @@ int pxd_initiate_ioswitch(struct pxd_device *pxd_dev, int code)
req->pxd_rdwr_in.size = 0;
req->pxd_rdwr_in.flags = PXD_FLAGS_SYNC;

fuse_request_send_nowait(&pxd_dev->ctx->fc, req, true);
fuse_request_send_nowait(&pxd_dev->ctx->fc, req);
return 0;
}

Expand Down Expand Up @@ -931,7 +931,7 @@ void pxd_reroute_slowpath(struct request_queue *q, struct bio *bio)
return;
}

fuse_request_send_nowait(&pxd_dev->ctx->fc, req, true);
fuse_request_send_nowait(&pxd_dev->ctx->fc, req);
}

// fastpath uses this path to punt requests to slowpath
Expand Down Expand Up @@ -974,7 +974,7 @@ void pxd_make_request_slowpath(struct request_queue *q, struct bio *bio)
return BLK_QC_RETVAL;
}

fuse_request_send_nowait(&pxd_dev->ctx->fc, req, false);
fuse_request_send_nowait(&pxd_dev->ctx->fc, req);
return BLK_QC_RETVAL;
}

Expand All @@ -983,6 +983,7 @@ static void pxd_rq_fn(struct request_queue *q)
{
struct pxd_device *pxd_dev = q->queuedata;
struct fuse_req *req;
struct fuse_conn *fc = &pxd_dev->ctx->fc;

for (;;) {
struct request *rq;
Expand All @@ -993,7 +994,7 @@ static void pxd_rq_fn(struct request_queue *q)
break;

/* Filter out block requests we don't understand. */
if (BLK_RQ_IS_PASSTHROUGH(rq)) {
if (BLK_RQ_IS_PASSTHROUGH(rq) || !READ_ONCE(fc->allow_disconnected)) {
__blk_end_request_all(rq, 0);
continue;
}
Expand Down Expand Up @@ -1028,7 +1029,7 @@ static void pxd_rq_fn(struct request_queue *q)
continue;
}

fuse_request_send_nowait(&pxd_dev->ctx->fc, req, false);
fuse_request_send_nowait(fc, req);
spin_lock_irq(&pxd_dev->qlock);
}
}
Expand All @@ -1040,8 +1041,9 @@ static blk_status_t pxd_queue_rq(struct blk_mq_hw_ctx *hctx,
struct request *rq = bd->rq;
struct pxd_device *pxd_dev = rq->q->queuedata;
struct fuse_req *req = blk_mq_rq_to_pdu(rq);
struct fuse_conn *fc = &pxd_dev->ctx->fc;

if (BLK_RQ_IS_PASSTHROUGH(rq))
if (BLK_RQ_IS_PASSTHROUGH(rq) || !READ_ONCE(fc->allow_disconnected))
return BLK_STS_IOERR;

pxd_printk("%s: dev m %d g %lld %s at %ld len %d bytes %d pages "
Expand All @@ -1062,7 +1064,7 @@ static blk_status_t pxd_queue_rq(struct blk_mq_hw_ctx *hctx,
return BLK_STS_IOERR;
}

fuse_request_send_nowait(&pxd_dev->ctx->fc, req, false);
fuse_request_send_nowait(fc, req);

return BLK_STS_OK;
}
Expand Down Expand Up @@ -1648,12 +1650,12 @@ ssize_t pxd_timeout_store(struct device *dev, struct device_attribute *attr,
return -EINVAL;
}

if (!ctx->fc.connected) {
if (!READ_ONCE(ctx->fc.connected)) {
cancel_delayed_work_sync(&ctx->abort_work);
}
spin_lock(&ctx->lock);
pxd_timeout_secs = new_timeout_secs;
if (!ctx->fc.connected) {
if (!READ_ONCE(ctx->fc.connected)) {
schedule_delayed_work(&ctx->abort_work, pxd_timeout_secs * HZ);
}
spin_unlock(&ctx->lock);
Expand Down Expand Up @@ -1919,7 +1921,7 @@ static int pxd_nodewipe_cleanup(struct pxd_context *ctx)
{
struct list_head *cur;

if (ctx->fc.connected) {
if (READ_ONCE(ctx->fc.connected)) {
return -EINVAL;
}

Expand Down Expand Up @@ -1949,7 +1951,7 @@ static ssize_t pxd_release_store(struct device *dev,
printk("pxd kernel node wipe action initiated\n");
for (i = 0; i < pxd_num_contexts; ++i) {
ctx = &pxd_contexts[i];
if (ctx->fc.connected) {
if (READ_ONCE(ctx->fc.connected)) {
printk("%s px is still connected... cannot release\n", __func__);
break;
}
Expand Down Expand Up @@ -2076,7 +2078,7 @@ static int pxd_control_open(struct inode *inode, struct file *file)
}

fc = &ctx->fc;
if (fc->connected == 1) {
if (READ_ONCE(fc->connected) == 1) {
printk(KERN_ERR "%s: pxd-control-%d(%lld) already open\n", __func__,
ctx->id, ctx->open_seq);
return -EINVAL;
Expand All @@ -2090,10 +2092,10 @@ static int pxd_control_open(struct inode *inode, struct file *file)

spin_lock(&ctx->lock);
pxd_timeout_secs = PXD_TIMER_SECS_DEFAULT;
fc->connected = 1;
WRITE_ONCE(fc->connected, 1);
spin_unlock(&ctx->lock);

fc->allow_disconnected = 1;
WRITE_ONCE(fc->allow_disconnected, 1);
file->private_data = fc;

pxdctx_set_connected(ctx, true);
Expand All @@ -2116,10 +2118,10 @@ static int pxd_control_release(struct inode *inode, struct file *file)
}

spin_lock(&ctx->lock);
if (ctx->fc.connected == 0) {
if (READ_ONCE(ctx->fc.connected) == 0) {
pxd_printk("%s: not opened\n", __func__);
} else {
ctx->fc.connected = 0;
WRITE_ONCE(ctx->fc.connected, 0);
}

schedule_delayed_work(&ctx->abort_work, pxd_timeout_secs * HZ);
Expand Down Expand Up @@ -2147,12 +2149,12 @@ static void pxd_abort_context(struct work_struct *work)
abort_work);
struct fuse_conn *fc = &ctx->fc;

BUG_ON(fc->connected);
BUG_ON(READ_ONCE(fc->connected));

printk(KERN_ERR "PXD_TIMEOUT (%s:%u): Aborting all requests...",
ctx->name, ctx->id);

fc->allow_disconnected = 0;
WRITE_ONCE(fc->allow_disconnected, 0);

/* Let other threads see the value of allow_disconnected. */
synchronize_rcu();
Expand Down Expand Up @@ -2375,7 +2377,7 @@ void pxd_exit(void)

for (i = 0; i < pxd_num_contexts; ++i) {
/* force cleanup @@@ */
pxd_contexts[i].fc.connected = true;
WRITE_ONCE(pxd_contexts[i].fc.connected, 1);
pxd_context_destroy(&pxd_contexts[i]);
}

Expand Down

0 comments on commit 07f1b44

Please sign in to comment.