Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: delay retriable stream master listener close until all sub streams are closed #9754

Merged
merged 3 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
private final AtomicInteger inFlightSubStreams = new AtomicInteger();
private Status savedCancellationReason;
private SavedCloseMasterListenerReason savedCloseMasterListenerReason;

// Used for recording the share of buffer used for the current call out of the channel buffer.
// This field would not be necessary if there is no channel buffer limit.
Expand Down Expand Up @@ -222,9 +222,10 @@ private void commitAndRun(Substream winningSubstream) {
}
}

@Nullable // returns null when cancelled
// returns null means we should not create new sub streams, e.g. cancelled or
// other close condition is met for retriableStream.
@Nullable
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
// increment only when >= 0, i.e. not cancelled
int inFlight;
do {
inFlight = inFlightSubStreams.get();
Expand Down Expand Up @@ -506,11 +507,8 @@ public final void cancel(final Status reason) {
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
savedCancellationReason = reason;
runnable.run();
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
}
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}

Expand Down Expand Up @@ -816,14 +814,30 @@ private void freezeHedging() {
}

private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, progress, metadata);
}
});
savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
metadata);
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, progress, metadata);
}
});
}
}

private static final class SavedCloseMasterListenerReason {
private final Status status;
private final RpcProgress progress;
private final Metadata metadata;

SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
this.status = status;
this.progress = progress;
this.metadata = metadata;
}
}

private interface BufferEntry {
Expand Down Expand Up @@ -864,8 +878,17 @@ public void closed(
}

if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
assert savedCancellationReason != null;
safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata());
assert savedCloseMasterListenerReason != null;
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(savedCloseMasterListenerReason.status,
savedCloseMasterListenerReason.progress,
savedCloseMasterListenerReason.metadata);
}
});
return;
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2151,14 +2151,19 @@ public Void answer(InvocationOnMock in) {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(
Status.CANCELLED, PROCESSED, new Metadata());
sublistenerCaptor4.getValue().closed(
Status.CANCELLED, PROCESSED, new Metadata());
inOrder.verify(masterListener).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions();

insight = new InsightBuilder();
hedgingStream.appendTimeoutInsight(insight);
assertThat(insight.toString()).isEqualTo(
"[closed=[UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:81]]");
"[closed=[UNAVAILABLE, INTERNAL, CANCELLED, CANCELLED], "
+ "committed=[remote_addr=2.2.2.2:81]]");
}

@Test
Expand Down Expand Up @@ -2425,6 +2430,7 @@ public void hedging_pushback_positive() {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor3.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
inOrder.verify(masterListener).closed(fatal, PROCESSED, metadata);
inOrder.verifyNoMoreInteractions();
}
Expand Down Expand Up @@ -2605,6 +2611,8 @@ public void hedging_transparentRetry() {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
sublistenerCaptor4.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
verify(masterListener).closed(status, REFUSED, metadata);
}

Expand Down Expand Up @@ -2645,6 +2653,9 @@ public void hedging_transparentRetryNotAllowed() {
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue()
.closed(Status.CANCELLED, REFUSED, new Metadata());
//master listener close should wait until all substreams are closed
verify(masterListener).closed(status, REFUSED, metadata);
}

Expand Down