Skip to content

Commit

Permalink
Fixed a bug during reconnect.
Browse files Browse the repository at this point in the history
rpc_find_pdu() removes the pdu from the waitpdu list, this pdu is added to
rpc->pdu. If a reconnect happens during this time, we were not correctly
re-queuing this pdu to rpc->outqueue. This was causing application to forever
wait for the callback.

(cherry picked from commit 6499e2a)
Signed-off-by: Ronnie Sahlberg <ronniesahlberg@gmail.com>
  • Loading branch information
Nagendra Tomar authored and sahlberg committed Aug 27, 2024
1 parent 6ff65cb commit ae86b44
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 6 deletions.
8 changes: 8 additions & 0 deletions include/libnfs-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ struct rpc_iovec_cursor {
* At any point these many new bytes need to be read into this cursor.
*/
size_t remaining_size;

/*
* Following ref are used to reset iov[] in case we need to resend
* this request, (possibly) after a reconnect.
*/
struct iovec *iov_ref;
int iovcnt_ref;
};

enum input_state {
Expand Down Expand Up @@ -819,6 +826,7 @@ void rpc_shrink_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v,
void rpc_memcpy_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v,
const void *src, size_t len);
void rpc_free_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v);
void rpc_reset_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v);
const struct nfs_fh *nfs_get_rootfh(struct nfs_context *nfs);

int nfs_normalize_path(struct nfs_context *nfs, char *path);
Expand Down
35 changes: 35 additions & 0 deletions lib/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ void rpc_advance_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v,

/* remaining_size can only be 0 when iovcnt is 0 and v.v. */
assert((v->iovcnt == 0) == (v->remaining_size == 0));
assert(v->iovcnt <= v->iovcnt_ref);
assert(v->iov >= v->base);
assert(v->iov <= v->iov_ref);
assert(v->iov_ref == (v->base + v->iovcnt_ref));
}

/*
Expand Down Expand Up @@ -721,6 +725,10 @@ void rpc_shrink_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v,

/* remaining_size can only be 0 when iovcnt is 0 and v.v. */
assert((v->iovcnt == 0) == (v->remaining_size == 0));
assert(v->iovcnt <= v->iovcnt_ref);
assert(v->iov >= v->base);
assert(v->iov <= v->iov_ref);
assert(v->iov_ref == (v->base + v->iovcnt_ref));
}

/*
Expand Down Expand Up @@ -752,6 +760,33 @@ void rpc_memcpy_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v,

/* remaining_size can only be 0 when iovcnt is 0 and v.v. */
assert((v->iovcnt == 0) == (v->remaining_size == 0));
assert(v->iovcnt <= v->iovcnt_ref);
assert(v->iov >= v->base);
assert(v->iov <= v->iov_ref);
assert(v->iov_ref == (v->base + v->iovcnt_ref));
}

void rpc_reset_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v)
{
int i;

if (!v->base) {
return;
}

assert(v->iovcnt <= v->iovcnt_ref);
assert(v->iov >= v->base);
assert(v->iov <= v->iov_ref);
assert(v->iov_ref == (v->base + v->iovcnt_ref));

v->iovcnt = v->iovcnt_ref;
v->iov = v->base;

v->remaining_size = 0;
for (i = 0; i < v->iovcnt_ref; i++) {
v->iov[i] = v->iov_ref[i];
v->remaining_size += v->iov[i].iov_len;
}
}

void rpc_free_cursor(struct rpc_context *rpc, struct rpc_iovec_cursor *v)
Expand Down
10 changes: 8 additions & 2 deletions lib/pdu.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,16 @@ void rpc_reset_queue(struct rpc_queue *q)
*/
void rpc_enqueue(struct rpc_queue *q, struct rpc_pdu *pdu)
{
if (q->head == NULL)
if (q->head == NULL) {
q->head = pdu;
else
} else {
/* Ensure same pdu not being requeued */
assert(q->head != pdu);
assert(q->tail != pdu);
assert(q->tail->next != pdu);

q->tail->next = pdu;
}
q->tail = pdu;
pdu->next = NULL;
}
Expand Down
19 changes: 18 additions & 1 deletion lib/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ static void rpc_finished_pdu(struct rpc_context *rpc)
}
rpc->state = READ_RM;
rpc->inpos = 0;
if (rpc->is_udp == 0 || rpc->is_broadcast == 0) {
if (rpc->pdu && (rpc->is_udp == 0 || rpc->is_broadcast == 0)) {
rpc_free_pdu(rpc, rpc->pdu);
rpc->pdu = NULL;
}
Expand Down Expand Up @@ -1741,6 +1741,23 @@ rpc_reconnect_requeue(struct rpc_context *rpc)
rpc_reset_queue(q);
}
rpc->waitpdu_len = 0;

/*
* If there's any half-read PDU, that needs to be restarted too.
*/
if (rpc->pdu) {
rpc_return_to_queue(&rpc->outqueue, rpc->pdu);
/* Retransmit on reconnect */
INC_STATS(rpc, num_retransmitted);
/*
* Reset output and input cursors as we have to re-send the
* whole pdu again.
*/
rpc->pdu->out.num_done = 0;
rpc_reset_cursor(rpc, &rpc->pdu->in);
rpc->pdu = NULL;
}

#ifdef HAVE_MULTITHREADING
if (rpc->multithreading_enabled) {
nfs_mt_mutex_unlock(&rpc->rpc_mutex);
Expand Down
15 changes: 12 additions & 3 deletions nfs/nfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,27 @@ rpc_nfs3_readv_task(struct rpc_context *rpc, rpc_cb cb,
return NULL;
}

pdu->in.base = (struct iovec *) malloc(sizeof(struct iovec) * iovcnt);
/*
* Allocate twice the iovec space, first half will be used for iov[].
* This will be updated as data is read into user buffers.
* Second half is for iov_ref[]. This is not used in happy path. Only
* if we need to resend the request we need it to reset the cursor to
* the original iovec.
* See rpc_reset_cursor().
*/
pdu->in.base = (struct iovec *) malloc(sizeof(struct iovec) * iovcnt * 2);
if (!pdu->in.base) {
rpc_set_error(rpc, "error: Failed to allocate memory");
rpc_free_pdu(rpc, pdu);
return NULL;
}

pdu->in.iov = pdu->in.base;
pdu->in.iovcnt = iovcnt;
pdu->in.iov_ref = pdu->in.base + iovcnt;
pdu->in.iovcnt = pdu->in.iovcnt_ref = iovcnt;

for (i = 0; i < iovcnt; i++) {
pdu->in.iov[i] = iov[i];
pdu->in.iov[i] = pdu->in.iov_ref[i] = iov[i];
pdu->in.remaining_size += iov[i].iov_len;
}

Expand Down

0 comments on commit ae86b44

Please sign in to comment.