Skip to content

add "resource_timer_is_transient_failure" server feature #12063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
9 changes: 6 additions & 3 deletions xds/src/main/java/io/grpc/xds/client/Bootstrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,20 @@ public abstract static class ServerInfo {

public abstract boolean isTrustedXdsServer();

public abstract boolean resourceTimerIsTransientError();

@VisibleForTesting
public static ServerInfo create(String target, @Nullable Object implSpecificConfig) {
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig, false, false);
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
false, false, false);
}

@VisibleForTesting
public static ServerInfo create(
String target, Object implSpecificConfig, boolean ignoreResourceDeletion,
boolean isTrustedXdsServer) {
boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) {
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
ignoreResourceDeletion, isTrustedXdsServer);
ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError);
}
}

Expand Down
14 changes: 13 additions & 1 deletion xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public abstract class BootstrapperImpl extends Bootstrapper {

public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK =
"GRPC_EXPERIMENTAL_XDS_FALLBACK";
public static final String GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING =
"GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING";

// Client features.
@VisibleForTesting
Expand All @@ -54,10 +56,16 @@ public abstract class BootstrapperImpl extends Bootstrapper {
// Server features.
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
private static final String
SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";

@VisibleForTesting
static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);

@VisibleForTesting
public static boolean xdsDataErrorHandlingEnabled
= GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING, false);

protected final XdsLogger logger;

protected FileReader reader = LocalFileReader.INSTANCE;
Expand Down Expand Up @@ -247,18 +255,22 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo

Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri);

boolean resourceTimerIsTransientError = false;
boolean ignoreResourceDeletion = false;
// "For forward compatibility reasons, the client will ignore any entry in the list that it
// does not understand, regardless of type."
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
}
servers.add(
ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
serverFeatures != null
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER)));
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
resourceTimerIsTransientError));
}
return servers.build();
}
Expand Down
6 changes: 5 additions & 1 deletion xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ public static ResourceMetadata newResourceMetadataDoesNotExist() {
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
}

public static ResourceMetadata newResourceMetadataTimeout() {
return new ResourceMetadata(ResourceMetadataStatus.TIMEOUT, "", 0, false, null, null);
}

public static ResourceMetadata newResourceMetadataAcked(
Any rawResource, String version, long updateTimeNanos) {
checkNotNull(rawResource, "rawResource");
Expand Down Expand Up @@ -256,7 +260,7 @@ public UpdateFailureState getErrorState() {
* config_dump.proto</a>
*/
public enum ResourceMetadataStatus {
UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED
UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED, TIMEOUT
}

/**
Expand Down
20 changes: 16 additions & 4 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.BootstrapperImpl.xdsDataErrorHandlingEnabled;
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;

Expand Down Expand Up @@ -67,6 +68,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30;

private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
Expand Down Expand Up @@ -738,6 +740,9 @@ void restartTimer() {
// When client becomes ready, it triggers a restartTimer for all relevant subscribers.
return;
}
ServerInfo serverInfo = activeCpc.getServerInfo();
int timeoutSec = xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError()
? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC;

class ResourceNotFound implements Runnable {
@Override
Expand All @@ -761,8 +766,7 @@ public String toString() {
respTimer.cancel();
}
respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
timeService);
new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService);
}

void stopTimer() {
Expand Down Expand Up @@ -840,6 +844,8 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn
// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion();
boolean resourceTimerIsTransientError =
xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError();
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
Expand All @@ -854,14 +860,20 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn
if (!absent) {
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
metadata = resourceTimerIsTransientError ? ResourceMetadata.newResourceMetadataTimeout() :
ResourceMetadata.newResourceMetadataDoesNotExist();
for (ResourceWatcher<T> watcher : watchers.keySet()) {
if (processingTracker != null) {
processingTracker.startTask();
}
watchers.get(watcher).execute(() -> {
try {
watcher.onResourceDoesNotExist(resource);
if (resourceTimerIsTransientError) {
watcher.onError(Status.UNAVAILABLE.withDescription(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test for watcher.onError and the timeoutSec value when resource timer is transient error.

"Timed out waiting for resource " + resource + " from xDS server"));
} else {
watcher.onResourceDoesNotExist(resource);
}
} finally {
if (processingTracker != null) {
processingTracker.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3549,7 +3549,7 @@ private static Filter buildHttpConnectionManagerFilter(HttpFilter... httpFilters

private XdsResourceType.Args getXdsResourceTypeArgs(boolean isTrustedServer) {
return new XdsResourceType.Args(
ServerInfo.create("http://td", "", false, isTrustedServer), "1.0", null, null, null, null
ServerInfo.create("http://td", "", false, isTrustedServer, false), "1.0", null, null, null, null
);
}
}
154 changes: 129 additions & 25 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.BootstrapperImpl;
import io.grpc.xds.client.EnvoyProtoData.Node;
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.client.Locality;
Expand Down Expand Up @@ -304,6 +305,30 @@ public long currentTimeNanos() {
private final BindableService adsService = createAdsService();
private final BindableService lrsService = createLrsService();

private XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
@Override
public XdsTransport create(ServerInfo serverInfo) {
if (serverInfo.target().equals(SERVER_URI)) {
return new GrpcXdsTransport(channel);
}
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) {
if (channelForCustomAuthority == null) {
channelForCustomAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForCustomAuthority);
}
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
if (channelForEmptyAuthority == null) {
channelForEmptyAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForEmptyAuthority);
}
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
}
};

@Before
public void setUp() throws IOException {
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
Expand All @@ -322,32 +347,9 @@ public void setUp() throws IOException {
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
@Override
public XdsTransport create(ServerInfo serverInfo) {
if (serverInfo.target().equals(SERVER_URI)) {
return new GrpcXdsTransport(channel);
}
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) {
if (channelForCustomAuthority == null) {
channelForCustomAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForCustomAuthority);
}
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
if (channelForEmptyAuthority == null) {
channelForEmptyAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return new GrpcXdsTransport(channelForEmptyAuthority);
}
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
}
};

xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
true);
true, false);
BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
Expand Down Expand Up @@ -3155,6 +3157,108 @@ public void flowControlAbsent() throws Exception {
verify(anotherWatcher).onError(any());
}

@Test
@SuppressWarnings("unchecked")
public void resourceTimerIsTransientError_schedulesExtendedTimeout() {
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
ServerInfo serverInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS,
false, true, true);
BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(serverInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of())
.build();
xdsClient = new XdsClientImpl(
xdsTransportFactory,
bootstrapInfo,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo),
xdsClientMetricReporter);
ResourceWatcher<CdsUpdate> watcher = mock(ResourceWatcher.class);
String resourceName = "cluster.googleapis.com";

xdsClient.watchXdsResource(
XdsClusterResource.getInstance(),
resourceName,
watcher,
fakeClock.getScheduledExecutorService());

ScheduledTask task = Iterables.getOnlyElement(
fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
assertThat(task.getDelay(TimeUnit.SECONDS))
.isEqualTo(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC);
fakeClock.runDueTasks();
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
}

@Test
@SuppressWarnings("unchecked")
public void resourceTimerIsTransientError_callsOnErrorUnavailable() {
BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(),
true, true);
BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"authority.xds.com",
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.of())))
.build();
xdsClient = new XdsClientImpl(
xdsTransportFactory,
bootstrapInfo,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo),
xdsClientMetricReporter);
String timeoutResource = CDS_RESOURCE + "_timeout";
ResourceWatcher<CdsUpdate> timeoutWatcher = mock(ResourceWatcher.class);

xdsClient.watchXdsResource(
XdsClusterResource.getInstance(),
timeoutResource,
timeoutWatcher,
fakeClock.getScheduledExecutorService());

assertThat(resourceDiscoveryCalls).hasSize(1);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.verifyRequest(CDS, ImmutableList.of(timeoutResource), "", "", NODE);
fakeClock.forwardTime(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
fakeClock.runDueTasks();
ArgumentCaptor<Status> errorCaptor = ArgumentCaptor.forClass(Status.class);
verify(timeoutWatcher).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
assertThat(error.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo(
"Timed out waiting for resource " + timeoutResource + " from xDS server");
BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
}

private Answer<Void> blockUpdate(CyclicBarrier barrier) {
return new Answer<Void>() {
@Override
Expand Down Expand Up @@ -4220,7 +4324,7 @@ private XdsClientImpl createXdsClient(String serverUri) {
private BootstrapInfo buildBootStrap(String serverUri) {

ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
ignoreResourceDeletion(), true);
ignoreResourceDeletion(), true, false);

return Bootstrapper.BootstrapInfo.builder()
.servers(Collections.singletonList(xdsServerInfo))
Expand Down
4 changes: 2 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,13 @@ public void resolving_targetAuthorityInAuthoritiesMap() {
String serviceAuthority = "[::FFFF:129.144.52.38]:80";
bootstrapInfo = BootstrapInfo.builder()
.servers(ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true, true)))
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false)))
.node(Node.newBuilder().build())
.authorities(
ImmutableMap.of(targetAuthority, AuthorityInfo.create(
"xdstp://" + targetAuthority + "/envoy.config.listener.v3.Listener/%s?foo=1&bar=2",
ImmutableList.of(ServerInfo.create(
"td.googleapis.com", InsecureChannelCredentials.create(), true, true)))))
"td.googleapis.com", InsecureChannelCredentials.create(), true, true, false)))))
.build();
expectedLdsResourceName = "xdstp://xds.authority.com/envoy.config.listener.v3.Listener/"
+ "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public static Bootstrapper.BootstrapInfo buildBootStrap(List<String> serverUris)

List<ServerInfo> serverInfos = new ArrayList<>();
for (String uri : serverUris) {
serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true));
serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true, false));
}
EnvoyProtoData.Node node = EnvoyProtoData.Node.newBuilder().setId("node-id").build();

Expand Down