Skip to content

core: Propagate authority override from LB exactly once #11862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,15 @@ public final ClientStream newStream(
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
ClientStream stream = transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), callOptions,
tracers);
// User code provided authority takes precedence over the LB provided one; this will be
// overwritten by ClientCallImpl if the application sets an authority override
if (pickResult.getAuthorityOverride() != null) {
stream.setAuthority(pickResult.getAuthorityOverride());
}
return stream;
}
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
Expand Down Expand Up @@ -287,10 +293,6 @@ final void reprocess(@Nullable SubchannelPicker picker) {
for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
// User code provided authority takes precedence over the LB provided one.
if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
stream.setAuthority(pickResult.getAuthorityOverride());
}
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Expand All @@ -301,7 +303,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
Runnable runnable = stream.createRealStream(transport);
Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride());
if (runnable != null) {
executor.execute(runnable);
}
Expand Down Expand Up @@ -354,7 +356,7 @@ private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
}

/** Runnable may be null. */
private Runnable createRealStream(ClientTransport transport) {
private Runnable createRealStream(ClientTransport transport, String authorityOverride) {
ClientStream realStream;
Context origContext = context.attach();
try {
Expand All @@ -364,6 +366,13 @@ private Runnable createRealStream(ClientTransport transport) {
} finally {
context.detach(origContext);
}
if (authorityOverride != null) {
// User code provided authority takes precedence over the LB provided one; this will be
// overwritten by an enqueud call from ClientCallImpl if the application sets an authority
// override. We must call the real stream directly because stream.start() has likely already
// been called on the delayed stream.
realStream.setAuthority(authorityOverride);
}
return setStream(realStream);
}

Expand Down
54 changes: 20 additions & 34 deletions core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,26 +503,11 @@ public void uncaughtException(Thread t, Throwable e) {
}

@Test
public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFromLbIsIgnored() {
DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream(
method, headers, callOptions, tracers);
delayedStream.start(mock(ClientStreamListener.class));
SubchannelPicker picker = mock(SubchannelPicker.class);
PickResult pickResult = PickResult.withSubchannel(
mockSubchannel, null, "authority-override-hostname-from-lb");
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult);

delayedTransport.reprocess(picker);
fakeExecutor.runDueTasks();

verify(mockRealStream, never()).setAuthority("authority-override-hostname-from-lb");
}

@Test
public void
reprocess_authorityOverrideNotInCallOptions_authorityOverrideFromLbIsSetIntoStream() {
public void reprocess_authorityOverrideFromLb() {
InOrder inOrder = inOrder(mockRealStream);
DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream(
method, headers, callOptions.withAuthority(null), tracers);
delayedStream.setAuthority("authority-override-from-calloptions");
delayedStream.start(mock(ClientStreamListener.class));
SubchannelPicker picker = mock(SubchannelPicker.class);
PickResult pickResult = PickResult.withSubchannel(
Expand All @@ -536,7 +521,10 @@ public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFro
delayedTransport.reprocess(picker);
fakeExecutor.runDueTasks();

verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
// Must be set before start(), and may be overwritten
inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions");
inOrder.verify(mockRealStream).start(any(ClientStreamListener.class));
}

@Test
Expand All @@ -563,28 +551,26 @@ public void reprocess_NoPendingStream() {
}

@Test
public void newStream_assignsTransport_authorityFromCallOptionsSupersedesAuthorityFromLB() {
public void newStream_authorityOverrideFromLb() {
InOrder inOrder = inOrder(mockRealStream);
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
PickResult pickResult = PickResult.withSubchannel(
subchannel, null, "authority-override-hostname-from-lb");
mockSubchannel, null, "authority-override-hostname-from-lb");
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult);
ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
ArgumentCaptor.forClass(CallOptions.class);
when(mockRealTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), callOptionsArgumentCaptor.capture(),
ArgumentMatchers.<ClientStreamTracer[]>any()))
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), any()))
.thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);

CallOptions callOptions =
CallOptions.DEFAULT.withAuthority("authority-override-hosstname-from-calloptions");
delayedTransport.newStream(method, headers, callOptions, tracers);
assertThat(callOptionsArgumentCaptor.getValue().getAuthority()).isEqualTo(
"authority-override-hosstname-from-calloptions");
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, tracers);
assertThat(stream).isSameInstanceAs(mockRealStream);
stream.setAuthority("authority-override-from-calloptions");
stream.start(mock(ClientStreamListener.class));

// Must be set before start(), and may be overwritten
inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions");
inOrder.verify(mockRealStream).start(any(ClientStreamListener.class));
}

@Test
Expand Down
Loading