Skip to content

core: remember last pick status in no real stream #11851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 14, 2025
20 changes: 16 additions & 4 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ public final ClientStream newStream(
if (state.shutdownStatus != null) {
return new FailingClientStream(state.shutdownStatus, tracers);
}
PickResult pickResult = null;
if (state.lastPicker != null) {
PickResult pickResult = state.lastPicker.pickSubchannel(args);
pickResult = state.lastPicker.pickSubchannel(args);
callOptions = args.getCallOptions();
// User code provided authority takes precedence over the LB provided one.
if (callOptions.getAuthority() == null
Expand All @@ -156,7 +157,7 @@ public final ClientStream newStream(
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers);
return createPendingStream(args, tracers, pickResult);
}
state = newerState;
}
Expand All @@ -171,9 +172,12 @@ public final ClientStream newStream(
* schedule tasks on syncContext.
*/
@GuardedBy("lock")
private PendingStream createPendingStream(
PickSubchannelArgs args, ClientStreamTracer[] tracers) {
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus = pickResult.getStatus();
}
pendingStreams.add(pendingStream);
if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse);
Expand Down Expand Up @@ -293,6 +297,9 @@ final void reprocess(@Nullable SubchannelPicker picker) {
for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
if (callOptions.isWaitForReady() && pickResult.hasResult()) {
stream.lastPickStatus = pickResult.getStatus();
}
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Expand Down Expand Up @@ -349,6 +356,7 @@ private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args;
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private volatile Status lastPickStatus;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
this.args = args;
Expand Down Expand Up @@ -405,6 +413,10 @@ protected void onEarlyCancellation(Status reason) {
public void appendTimeoutInsight(InsightBuilder insight) {
if (args.getCallOptions().isWaitForReady()) {
insight.append("wait_for_ready");
Status status = lastPickStatus;
if (status != null && !status.isOk()) {
insight.appendKeyValue("Last Pick Failure", status);
}
}
super.appendTimeoutInsight(insight);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,23 @@ public void pendingStream_appendTimeoutInsight_waitForReady() {
.matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]");
}

@Test
public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure() {
ClientStream stream = delayedTransport.newStream(
method, headers, callOptions.withWaitForReady(), tracers);
stream.start(streamListener);
SubchannelPicker picker = mock(SubchannelPicker.class);
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withError(Status.PERMISSION_DENIED));
delayedTransport.reprocess(picker);
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString())
.matches("\\[wait_for_ready, "
+ "Last Pick Failure=Status\\{code=PERMISSION_DENIED, description=null, cause=null\\},"
+ " buffered_nanos=[0-9]+, waiting_for_connection]");
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
Expand Down