Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DirectPath calls do not duplicate the request metadata #3663

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import io.grpc.Deadline;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -186,11 +185,10 @@ public GrpcCallContext nullToSelf(ApiCallContext inputContext) {
@Override
public GrpcCallContext withCredentials(Credentials newCredentials) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the one main concern about this is that this is a behavior breaking change if there are uses that manually create a GrpcCallContext and use withCredentials. I don't think there are really any valid uses cases for this, but I will need to check for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes sense.

What's also concerning is, by looking at the CI failures, it seems many tests have been building the channel from InstantiatingGrpcChannelProvider without actually giving it a credentials. And MoreCallCredentials.from() throws exception for null credentials.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those showcase tests seems to be fixable within this PR. But I'm unsure about the downstream compatibility tests and those on Cloud Build (I lack permission to view logs).

Copy link
Contributor

@lqiu96 lqiu96 Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The showcase tests were following this: https://github.com/googleapis/gapic-showcase?tab=readme-ov-file#example-for-java-grpc

Essentially it's a local server that is not expected to do any authentication. I think we just need a way to be able to hit the local server with no credentials provided.

The downstream tests for our partner teams is a bit concerning. I think we'll need to investigate them to see if they're just a testing configuration issue or a logic issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially it's a local server that is not expected to do any authentication. I think we just need a way to be able to hit the local server with no credentials provided.

Would it make sense if we allow null credentials in the case of one-way TLS? I believe there are no valid use cases to go without credentials here when mTLS or DirectPath are used.

The downstream tests for our partner teams is a bit concerning. I think we'll need to investigate them to see if they're just a testing configuration issue or a deeper issue.

Agreed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize a tricky thing we missed earlier.

The API setChannelConfigurator takes an option that gets applied to the channel builder. In various tests (downstream or showcase), ManagedChannelBuilder::usePlainText is given. While users are not supposed to use insecure channels in production, this is reasonable and often convenient in tests when the server is turned on locally. This change is breaking that behavior.

Basically, either we have to change all tests (setting up TLS can be difficult), or we need a way to allow insecure channels w/ credentials, and TlsChannelCredentials.create() doesn't support that. For the second approach, IIUC, it goes back to the need of supporting adding call credentials to a managed channel builder that is possibly configured to build insecure channels. cc: @ejona86

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using InsecureChannelCredentials.create() instead of TlsChannelCredentials would be fine for those specific tests. But the existing sdk API works poorly for that.

You can add CallCredentials per-RPC, either on the stub, in the CallOptions, or with an interceptor (that adds it to the CallOptions).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes unfortunately the existing SDK API doesn't take InsecureChannelCredentials explicilty.

The client SDK has been attaching call credentials to the CallOptions. Now the problem is that DirectPath (via GoogleDefaultChannelCredentials) gets or creates its own call credentials. So currently, this is causing the same request metadata to appear twice. Because we want to adopt bound credentials in DirectPath, it's not ideal to change this GoogleDefaultChannelCredentials to build without call credentials. Nor would it be good for backward compatibility reasons.

So as this PR is trying to do, we want to stop attaching call credentials to CallOptions in all gRPC cases. But then we need to make sure all gRPC cases can get the call credentials from the channel. ManagedChannelBuilder doesn't allow it. This is where we are stuck.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add an interceptor to the channel that attaches CallCreds to the CallOptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add an interceptor to the channel that attaches CallCreds to the CallOptions.

Ah. I didn't realize this from your first comment. Will try it. Thanks!

Preconditions.checkNotNull(newCredentials);
CallCredentials callCredentials = MoreCallCredentials.from(newCredentials);
return new GrpcCallContext(
channel,
newCredentials,
callOptions.withCallCredentials(callCredentials),
callOptions,
timeout,
streamWaitTimeout,
streamIdleTimeout,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2025 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.auth.Credentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.auth.MoreCallCredentials;

/**
* This interceptor is package-private and intended only for client library usage. It will be added
* to inject a CallCredentials to the RPC's CallOptions for two use cases: 1. Local Testing:
* NoCredentialsProvider or null Credentials is passed in 2. Non-Oauth2 Credentials (i.e.
* ApiKeyCredentials) which do not have an Access Token
*/
class GrpcCallCredentialsInterceptor implements ClientInterceptor {

private final Credentials credentials;

GrpcCallCredentialsInterceptor(Credentials credentials) {
this.credentials = credentials;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// If user manually sets a CallCredentials, do not override
if (callOptions.getCredentials() == null) {
callOptions = callOptions.withCallCredentials(MoreCallCredentials.from(credentials));
}

ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
super.start(responseListener, headers);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,17 @@ private ManagedChannel createSingleChannel() throws IOException {

ManagedChannelBuilder<?> builder;

// CallCredentials will be attached to newly created channel if there are valid Credentials
// provided. Valid Credentials that can be used as CallCredentials are non-null instances
// of valid GoogleCredentials. There are certain Credentials that cannot be used as
// CallCredentials
// (i.e. ApiKeyCredentials).
boolean needsCallCredentialsInterceptor = credentials != null;

// Check DirectPath traffic.
boolean useDirectPathXds = false;
if (canUseDirectPath()) {
needsCallCredentialsInterceptor = false;
CallCredentials callCreds = MoreCallCredentials.from(credentials);
// altsCallCredentials may be null and GoogleDefaultChannelCredentials
// will solely use callCreds. Otherwise it uses altsCallCredentials
Expand Down Expand Up @@ -665,10 +673,10 @@ private ManagedChannel createSingleChannel() throws IOException {
// which will be used to fetch MTLS_S2A hard bound tokens from the metdata server.
channelCredentials =
CompositeChannelCredentials.create(channelCredentials, mtlsS2ACallCredentials);
needsCallCredentialsInterceptor = false;
}
builder = Grpc.newChannelBuilder(endpoint, channelCredentials);
} else {
// Use default if we cannot initialize channel credentials via DCA or S2A.
builder = ManagedChannelBuilder.forAddress(serviceAddress, port);
}
}
Expand All @@ -678,6 +686,11 @@ private ManagedChannel createSingleChannel() throws IOException {
// See https://github.com/googleapis/gapic-generator/issues/2816
builder.disableServiceConfigLookUp();
}

// This is intercepted first to ensure that CallCredentials is added to CallOptions
if (needsCallCredentialsInterceptor) {
builder = builder.intercept(new GrpcCallCredentialsInterceptor(credentials));
}
builder =
builder
.intercept(new GrpcChannelUUIDInterceptor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ void testWithCredentials() {
GrpcCallContext emptyContext = GrpcCallContext.createDefault();
assertNull(emptyContext.getCallOptions().getCredentials());
GrpcCallContext context = emptyContext.withCredentials(credentials);
assertNotNull(context.getCallOptions().getCredentials());
// The gRPC call credentials will be embedded into the channel credentials
// and not attached to call options.
assertNull(context.getCallOptions().getCredentials());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ void testWithPoolSize() throws IOException {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

Credentials credentials = Mockito.mock(Credentials.class);

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setCredentials(credentials)
.build()
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
Expand All @@ -234,6 +237,7 @@ void testToBuilder() {
new ArrayList<>();
hardBoundTokenTypes.add(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS);
hardBoundTokenTypes.add(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.MTLS_S2A);
Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
Expand All @@ -244,6 +248,7 @@ void testToBuilder() {
.setKeepAliveTimeDuration(keepaliveTime)
.setKeepAliveTimeoutDuration(keepaliveTimeout)
.setKeepAliveWithoutCalls(Boolean.TRUE)
.setCredentials(credentials)
.setChannelConfigurator(channelConfigurator)
.setChannelsPerCpu(2.5)
.setDirectPathServiceConfig(directPathServiceConfig)
Expand Down Expand Up @@ -274,6 +279,7 @@ void testWithInterceptorsAndMultipleChannels() throws Exception {

private void testWithInterceptors(int numChannels) throws Exception {
final GrpcInterceptorProvider interceptorProvider = Mockito.mock(GrpcInterceptorProvider.class);
Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
Expand All @@ -282,6 +288,7 @@ private void testWithInterceptors(int numChannels) throws Exception {
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setInterceptorProvider(interceptorProvider)
.setCredentials(credentials)
.build();

Mockito.verify(interceptorProvider, Mockito.never()).getInterceptors();
Expand All @@ -303,6 +310,7 @@ void testChannelConfigurator() throws IOException {

ManagedChannelBuilder<?> swappedBuilder = Mockito.mock(ManagedChannelBuilder.class);
ManagedChannel fakeChannel = Mockito.mock(ManagedChannel.class);
Credentials credentials = Mockito.mock(Credentials.class);
Mockito.when(swappedBuilder.build()).thenReturn(fakeChannel);

Mockito.when(channelConfigurator.apply(channelBuilderCaptor.capture()))
Expand All @@ -315,6 +323,7 @@ void testChannelConfigurator() throws IOException {
.setExecutor(Mockito.mock(Executor.class))
.setChannelConfigurator(channelConfigurator)
.setPoolSize(numChannels)
.setCredentials(credentials)
.build()
.getTransportChannel();

Expand Down Expand Up @@ -486,8 +495,11 @@ void testWithIPv6Address() throws IOException {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

Credentials credentials = Mockito.mock(Credentials.class);

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setCredentials(credentials)
.build()
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
Expand All @@ -501,6 +513,8 @@ void testWithIPv6Address() throws IOException {
// Test that if ChannelPrimer is provided, it is called during creation
@Test
void testWithPrimeChannel() throws IOException {
Credentials credentials = Mockito.mock(Credentials.class);

// create channelProvider with different pool sizes to verify ChannelPrimer is called the
// correct number of times
for (int poolSize = 1; poolSize < 5; poolSize++) {
Expand All @@ -512,6 +526,7 @@ void testWithPrimeChannel() throws IOException {
.setPoolSize(poolSize)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setCredentials(credentials)
.setChannelPrimer(mockChannelPrimer)
.build();

Expand Down Expand Up @@ -600,10 +615,14 @@ void testWithCustomDirectPathServiceConfig() {
@Override
protected Object getMtlsObjectFromTransportChannel(MtlsProvider provider)
throws IOException, GeneralSecurityException {

Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider channelProvider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEndpoint("localhost:8080")
.setMtlsProvider(provider)
.setCredentials(credentials)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.build();
Expand All @@ -630,9 +649,14 @@ private void createAndCloseTransportChannel(InstantiatingGrpcChannelProvider pro
testLogDirectPathMisconfig_AttemptDirectPathNotSetAndAttemptDirectPathXdsSetViaBuilder_warns()
throws Exception {
FakeLogHandler logHandler = new FakeLogHandler();
Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider.LOG.addHandler(logHandler);
InstantiatingGrpcChannelProvider provider =
createChannelProviderBuilderForDirectPathLogTests().setAttemptDirectPathXds().build();
createChannelProviderBuilderForDirectPathLogTests()
.setAttemptDirectPathXds()
.setCredentials(credentials)
.build();
createAndCloseTransportChannel(provider);
assertThat(logHandler.getAllMessages())
.contains(
Expand All @@ -645,9 +669,10 @@ void testLogDirectPathMisconfig_AttemptDirectPathNotSetAndAttemptDirectPathXdsSe
throws Exception {
FakeLogHandler logHandler = new FakeLogHandler();
InstantiatingGrpcChannelProvider.LOG.addHandler(logHandler);
Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider provider =
createChannelProviderBuilderForDirectPathLogTests().build();
createChannelProviderBuilderForDirectPathLogTests().setCredentials(credentials).build();
createAndCloseTransportChannel(provider);
assertThat(logHandler.getAllMessages())
.contains(
Expand All @@ -672,11 +697,14 @@ void testLogDirectPathMisconfig_shouldNotLogInTheBuilder() {
@Test
void testLogDirectPathMisconfigWrongCredential() throws Exception {
FakeLogHandler logHandler = new FakeLogHandler();
Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider.LOG.addHandler(logHandler);
InstantiatingGrpcChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setAttemptDirectPathXds()
.setAttemptDirectPath(true)
.setCredentials(credentials)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setEndpoint(DEFAULT_ENDPOINT)
Expand All @@ -697,11 +725,14 @@ void testLogDirectPathMisconfigWrongCredential() throws Exception {
@Test
void testLogDirectPathMisconfigNotOnGCE() throws Exception {
FakeLogHandler logHandler = new FakeLogHandler();
Credentials credentials = Mockito.mock(Credentials.class);

InstantiatingGrpcChannelProvider.LOG.addHandler(logHandler);
InstantiatingGrpcChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setAttemptDirectPathXds()
.setAttemptDirectPath(true)
.setCredentials(credentials)
.setAllowNonDefaultServiceAccount(true)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutor(Mockito.mock(Executor.class))
Expand Down
Loading