Skip to content

Commit 7585b16

Browse files
authored
core: remember last pick status in no real stream (#11851)
1 parent 122b683 commit 7585b16

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ public final ClientStream newStream(
129129
if (state.shutdownStatus != null) {
130130
return new FailingClientStream(state.shutdownStatus, tracers);
131131
}
132+
PickResult pickResult = null;
132133
if (state.lastPicker != null) {
133-
PickResult pickResult = state.lastPicker.pickSubchannel(args);
134+
pickResult = state.lastPicker.pickSubchannel(args);
134135
callOptions = args.getCallOptions();
135136
// User code provided authority takes precedence over the LB provided one.
136137
if (callOptions.getAuthority() == null
@@ -156,7 +157,7 @@ public final ClientStream newStream(
156157
synchronized (lock) {
157158
PickerState newerState = pickerState;
158159
if (state == newerState) {
159-
return createPendingStream(args, tracers);
160+
return createPendingStream(args, tracers, pickResult);
160161
}
161162
state = newerState;
162163
}
@@ -171,9 +172,12 @@ public final ClientStream newStream(
171172
* schedule tasks on syncContext.
172173
*/
173174
@GuardedBy("lock")
174-
private PendingStream createPendingStream(
175-
PickSubchannelArgs args, ClientStreamTracer[] tracers) {
175+
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
176+
PickResult pickResult) {
176177
PendingStream pendingStream = new PendingStream(args, tracers);
178+
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
179+
pendingStream.lastPickStatus = pickResult.getStatus();
180+
}
177181
pendingStreams.add(pendingStream);
178182
if (getPendingStreamsCount() == 1) {
179183
syncContext.executeLater(reportTransportInUse);
@@ -293,6 +297,9 @@ final void reprocess(@Nullable SubchannelPicker picker) {
293297
for (final PendingStream stream : toProcess) {
294298
PickResult pickResult = picker.pickSubchannel(stream.args);
295299
CallOptions callOptions = stream.args.getCallOptions();
300+
if (callOptions.isWaitForReady() && pickResult.hasResult()) {
301+
stream.lastPickStatus = pickResult.getStatus();
302+
}
296303
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
297304
callOptions.isWaitForReady());
298305
if (transport != null) {
@@ -349,6 +356,7 @@ private class PendingStream extends DelayedStream {
349356
private final PickSubchannelArgs args;
350357
private final Context context = Context.current();
351358
private final ClientStreamTracer[] tracers;
359+
private volatile Status lastPickStatus;
352360

353361
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
354362
this.args = args;
@@ -405,6 +413,10 @@ protected void onEarlyCancellation(Status reason) {
405413
public void appendTimeoutInsight(InsightBuilder insight) {
406414
if (args.getCallOptions().isWaitForReady()) {
407415
insight.append("wait_for_ready");
416+
Status status = lastPickStatus;
417+
if (status != null && !status.isOk()) {
418+
insight.appendKeyValue("Last Pick Failure", status);
419+
}
408420
}
409421
super.appendTimeoutInsight(insight);
410422
}

core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,23 @@ public void pendingStream_appendTimeoutInsight_waitForReady() {
745745
.matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]");
746746
}
747747

748+
@Test
749+
public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure() {
750+
ClientStream stream = delayedTransport.newStream(
751+
method, headers, callOptions.withWaitForReady(), tracers);
752+
stream.start(streamListener);
753+
SubchannelPicker picker = mock(SubchannelPicker.class);
754+
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
755+
.thenReturn(PickResult.withError(Status.PERMISSION_DENIED));
756+
delayedTransport.reprocess(picker);
757+
InsightBuilder insight = new InsightBuilder();
758+
stream.appendTimeoutInsight(insight);
759+
assertThat(insight.toString())
760+
.matches("\\[wait_for_ready, "
761+
+ "Last Pick Failure=Status\\{code=PERMISSION_DENIED, description=null, cause=null\\},"
762+
+ " buffered_nanos=[0-9]+, waiting_for_connection]");
763+
}
764+
748765
private static TransportProvider newTransportProvider(final ClientTransport transport) {
749766
return new TransportProvider() {
750767
@Override

0 commit comments

Comments
 (0)