From 67fc2e156a8cc5aee79906f461d92ef3a772fe27 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Fri, 7 Feb 2025 16:33:17 -0800 Subject: [PATCH] Add new classes for eliminating xds config tears (#11740) * Framework definition to support A74 --- xds/build.gradle | 2 + xds/src/main/java/io/grpc/xds/XdsConfig.java | 192 +++++ .../io/grpc/xds/XdsDependencyManager.java | 769 +++++++++++++++++ .../io/grpc/xds/client/XdsClientImpl.java | 3 +- .../java/io/grpc/xds/ControlPlaneRule.java | 21 +- .../io/grpc/xds/XdsClientFallbackTest.java | 2 +- .../io/grpc/xds/XdsDependencyManagerTest.java | 784 ++++++++++++++++++ .../grpc/xds/XdsTestControlPlaneService.java | 9 +- .../test/java/io/grpc/xds/XdsTestUtils.java | 423 ++++++++++ .../client/CommonBootstrapperTestUtils.java | 6 + 10 files changed, 2186 insertions(+), 25 deletions(-) create mode 100644 xds/src/main/java/io/grpc/xds/XdsConfig.java create mode 100644 xds/src/main/java/io/grpc/xds/XdsDependencyManager.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsTestUtils.java diff --git a/xds/build.gradle b/xds/build.gradle index c51fc2819d7..cdd4924cab3 100644 --- a/xds/build.gradle +++ b/xds/build.gradle @@ -46,6 +46,7 @@ dependencies { thirdpartyImplementation project(':grpc-protobuf'), project(':grpc-stub') compileOnly sourceSets.thirdparty.output + testCompileOnly sourceSets.thirdparty.output implementation project(':grpc-stub'), project(':grpc-core'), project(':grpc-util'), @@ -59,6 +60,7 @@ dependencies { libraries.protobuf.java.util def nettyDependency = implementation project(':grpc-netty') + testImplementation project(':grpc-api') testImplementation project(':grpc-rls') testImplementation project(':grpc-inprocess') testImplementation testFixtures(project(':grpc-core')), diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java new file mode 100644 index 00000000000..999ee0d4b0c --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -0,0 +1,192 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableMap; +import io.grpc.StatusOr; +import io.grpc.xds.XdsClusterResource.CdsUpdate; +import io.grpc.xds.XdsEndpointResource.EdsUpdate; +import io.grpc.xds.XdsListenerResource.LdsUpdate; +import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents the xDS configuration tree for a specified Listener. + */ +final class XdsConfig { + private final LdsUpdate listener; + private final RdsUpdate route; + private final VirtualHost virtualHost; + private final ImmutableMap> clusters; + private final int hashCode; + + XdsConfig(LdsUpdate listener, RdsUpdate route, Map> clusters, + VirtualHost virtualHost) { + this(listener, route, virtualHost, ImmutableMap.copyOf(clusters)); + } + + public XdsConfig(LdsUpdate listener, RdsUpdate route, VirtualHost virtualHost, + ImmutableMap> clusters) { + this.listener = listener; + this.route = route; + this.virtualHost = virtualHost; + this.clusters = clusters; + + hashCode = Objects.hash(listener, route, virtualHost, clusters); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof XdsConfig)) { + return false; + } + + XdsConfig o = (XdsConfig) obj; + + return hashCode() == o.hashCode() && Objects.equals(listener, o.listener) + && Objects.equals(route, o.route) && Objects.equals(virtualHost, o.virtualHost) + && Objects.equals(clusters, o.clusters); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("XdsConfig{") + .append("\n listener=").append(listener) + .append(",\n route=").append(route) + .append(",\n virtualHost=").append(virtualHost) + .append(",\n clusters=").append(clusters) + .append("\n}"); + return builder.toString(); + } + + public LdsUpdate getListener() { + return listener; + } + + public RdsUpdate getRoute() { + return route; + } + + public VirtualHost getVirtualHost() { + return virtualHost; + } + + public ImmutableMap> getClusters() { + return clusters; + } + + static final class XdsClusterConfig { + private final String clusterName; + private final CdsUpdate clusterResource; + private final StatusOr endpoint; //Will be null for non-EDS clusters + + XdsClusterConfig(String clusterName, CdsUpdate clusterResource, + StatusOr endpoint) { + this.clusterName = checkNotNull(clusterName, "clusterName"); + this.clusterResource = checkNotNull(clusterResource, "clusterResource"); + this.endpoint = endpoint; + } + + @Override + public int hashCode() { + int endpointHash = (endpoint != null) ? endpoint.hashCode() : 0; + return clusterName.hashCode() + clusterResource.hashCode() + endpointHash; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof XdsClusterConfig)) { + return false; + } + XdsClusterConfig o = (XdsClusterConfig) obj; + return Objects.equals(clusterName, o.clusterName) + && Objects.equals(clusterResource, o.clusterResource) + && Objects.equals(endpoint, o.endpoint); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("XdsClusterConfig{clusterName=").append(clusterName) + .append(", clusterResource=").append(clusterResource) + .append(", endpoint=").append(endpoint).append("}"); + return builder.toString(); + } + + public String getClusterName() { + return clusterName; + } + + public CdsUpdate getClusterResource() { + return clusterResource; + } + + public StatusOr getEndpoint() { + return endpoint; + } + } + + static final class XdsConfigBuilder { + private LdsUpdate listener; + private RdsUpdate route; + private Map> clusters = new HashMap<>(); + private VirtualHost virtualHost; + + XdsConfigBuilder setListener(LdsUpdate listener) { + this.listener = checkNotNull(listener, "listener"); + return this; + } + + XdsConfigBuilder setRoute(RdsUpdate route) { + this.route = checkNotNull(route, "route"); + return this; + } + + XdsConfigBuilder addCluster(String name, StatusOr clusterConfig) { + checkNotNull(name, "name"); + checkNotNull(clusterConfig, "clusterConfig"); + clusters.put(name, clusterConfig); + return this; + } + + XdsConfigBuilder setVirtualHost(VirtualHost virtualHost) { + this.virtualHost = checkNotNull(virtualHost, "virtualHost"); + return this; + } + + XdsConfig build() { + checkNotNull(listener, "listener"); + checkNotNull(route, "route"); + return new XdsConfig(listener, route, clusters, virtualHost); + } + } + + public interface XdsClusterSubscriptionRegistry { + Closeable subscribeToCluster(String clusterName); + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java new file mode 100644 index 00000000000..d2af47bc9db --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -0,0 +1,769 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.client.XdsClient.ResourceUpdate; +import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import io.grpc.InternalLogId; +import io.grpc.Status; +import io.grpc.StatusOr; +import io.grpc.SynchronizationContext; +import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.client.XdsLogger; +import io.grpc.xds.client.XdsResourceType; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * This class acts as a layer of indirection between the XdsClient and the NameResolver. It + * maintains the watchers for the xds resources and when an update is received, it either requests + * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher. Each instance + * applies to a single data plane authority. + */ +final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry { + public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance(); + public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance(); + private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++ + private final XdsClient xdsClient; + private final XdsConfigWatcher xdsConfigWatcher; + private final SynchronizationContext syncContext; + private final String dataPlaneAuthority; + + private final InternalLogId logId; + private final XdsLogger logger; + private XdsConfig lastXdsConfig = null; + private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); + + XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, + SynchronizationContext syncContext, String dataPlaneAuthority, + String listenerName) { + logId = InternalLogId.allocate("xds-dependency-manager", listenerName); + logger = XdsLogger.withLogId(logId); + this.xdsClient = checkNotNull(xdsClient, "xdsClient"); + this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); + this.syncContext = checkNotNull(syncContext, "syncContext"); + this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority"); + + // start the ball rolling + syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName))); + } + + public static String toContextStr(String typeName, String resourceName) { + return typeName + " resource: " + resourceName; + } + + @Override + public Closeable subscribeToCluster(String clusterName) { + + checkNotNull(clusterName, "clusterName"); + ClusterSubscription subscription = new ClusterSubscription(clusterName); + + syncContext.execute(() -> { + addClusterWatcher(clusterName, subscription, 1); + maybePublishConfig(); + }); + + return subscription; + } + + private void addWatcher(XdsWatcherBase watcher) { + syncContext.throwIfNotInThisSynchronizationContext(); + XdsResourceType type = watcher.type; + String resourceName = watcher.resourceName; + + @SuppressWarnings("unchecked") + TypeWatchers typeWatchers = (TypeWatchers)resourceWatchers.get(type); + if (typeWatchers == null) { + typeWatchers = new TypeWatchers<>(type); + resourceWatchers.put(type, typeWatchers); + } + + typeWatchers.add(resourceName, watcher); + xdsClient.watchXdsResource(type, resourceName, watcher, syncContext); + } + + private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) { + if (watcher == null) { + return; + } + watcher.parentContexts.remove(parentContext); + if (watcher.parentContexts.isEmpty()) { + cancelWatcher(watcher); + } + } + + private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) { + if (watcher == null) { + return; + } + watcher.parentContexts.remove(parentContext); + if (watcher.parentContexts.isEmpty()) { + cancelWatcher(watcher); + } + } + + + + private void cancelWatcher(XdsWatcherBase watcher) { + syncContext.throwIfNotInThisSynchronizationContext(); + + if (watcher == null) { + return; + } + + if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) { + throwIfParentContextsNotEmpty(watcher); + } + + XdsResourceType type = watcher.type; + String resourceName = watcher.resourceName; + + @SuppressWarnings("unchecked") + TypeWatchers typeWatchers = (TypeWatchers)resourceWatchers.get(type); + if (typeWatchers == null) { + logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher); + return; + } + + typeWatchers.watchers.remove(resourceName); + xdsClient.cancelXdsResourceWatch(type, resourceName, watcher); + + } + + private static void throwIfParentContextsNotEmpty(XdsWatcherBase watcher) { + if (watcher instanceof CdsWatcher) { + CdsWatcher cdsWatcher = (CdsWatcher) watcher; + if (!cdsWatcher.parentContexts.isEmpty()) { + String msg = String.format("CdsWatcher %s has parent contexts %s", + cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet()); + throw new IllegalStateException(msg); + } + } else if (watcher instanceof EdsWatcher) { + EdsWatcher edsWatcher = (EdsWatcher) watcher; + if (!edsWatcher.parentContexts.isEmpty()) { + String msg = String.format("CdsWatcher %s has parent contexts %s", + edsWatcher.resourceName(), edsWatcher.parentContexts); + throw new IllegalStateException(msg); + } + } + } + + public void shutdown() { + syncContext.execute(() -> { + for (TypeWatchers watchers : resourceWatchers.values()) { + shutdownWatchersForType(watchers); + } + resourceWatchers.clear(); + }); + } + + private void shutdownWatchersForType(TypeWatchers watchers) { + for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { + xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), + watcherEntry.getValue()); + } + } + + private void releaseSubscription(ClusterSubscription subscription) { + checkNotNull(subscription, "subscription"); + String clusterName = subscription.getClusterName(); + syncContext.execute(() -> { + XdsWatcherBase cdsWatcher = + resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName); + if (cdsWatcher == null) { + return; // already released while waiting for the syncContext + } + cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription); + maybePublishConfig(); + }); + } + + private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) { + checkNotNull(root, "root"); + + cancelCdsWatcher(root, parentContext); + + if (!root.hasDataValue() || !root.parentContexts.isEmpty()) { + return; + } + + XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue(); + switch (cdsUpdate.clusterType()) { + case EDS: + String edsServiceName = cdsUpdate.edsServiceName(); + EdsWatcher edsWatcher = + (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName); + cancelEdsWatcher(edsWatcher, root); + break; + case AGGREGATE: + for (String cluster : cdsUpdate.prioritizedClusterNames()) { + CdsWatcher clusterWatcher = + (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster); + if (clusterWatcher != null) { + cancelClusterWatcherTree(clusterWatcher, root); + } + } + break; + case LOGICAL_DNS: + // no eds needed + break; + default: + throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType()); + } + } + + /** + * Check if all resources have results, and if so, generate a new XdsConfig and send it to all + * the watchers. + */ + private void maybePublishConfig() { + syncContext.throwIfNotInThisSynchronizationContext(); + boolean waitingOnResource = resourceWatchers.values().stream() + .flatMap(typeWatchers -> typeWatchers.watchers.values().stream()) + .anyMatch(XdsWatcherBase::missingResult); + if (waitingOnResource) { + return; + } + + XdsConfig newConfig = buildConfig(); + if (Objects.equals(newConfig, lastXdsConfig)) { + return; + } + lastXdsConfig = newConfig; + xdsConfigWatcher.onUpdate(lastXdsConfig); + } + + @VisibleForTesting + XdsConfig buildConfig() { + XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); + + // Iterate watchers and build the XdsConfig + + // Will only be 1 listener and 1 route resource + VirtualHost activeVirtualHost = getActiveVirtualHost(); + for (XdsWatcherBase xdsWatcherBase : + resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) { + XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue(); + builder.setListener(ldsUpdate); + if (activeVirtualHost == null) { + activeVirtualHost = RoutingUtils.findVirtualHostForHostName( + ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority); + } + + if (ldsUpdate.httpConnectionManager() != null + && ldsUpdate.httpConnectionManager().virtualHosts() != null) { + RdsUpdate rdsUpdate = new RdsUpdate(ldsUpdate.httpConnectionManager().virtualHosts()); + builder.setRoute(rdsUpdate); + } + } + + resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream() + .map(watcher -> (RdsWatcher) watcher) + .forEach(watcher -> builder.setRoute(watcher.getData().getValue())); + + builder.setVirtualHost(activeVirtualHost); + + Map> edsWatchers = + resourceWatchers.get(ENDPOINT_RESOURCE).watchers; + Map> cdsWatchers = + resourceWatchers.get(CLUSTER_RESOURCE).watchers; + + // Iterate CDS watchers + for (XdsWatcherBase watcher : cdsWatchers.values()) { + CdsWatcher cdsWatcher = (CdsWatcher) watcher; + String clusterName = cdsWatcher.resourceName(); + StatusOr cdsUpdate = cdsWatcher.getData(); + if (cdsUpdate.hasValue()) { + XdsConfig.XdsClusterConfig clusterConfig; + String edsName = cdsUpdate.getValue().edsServiceName(); + EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName); + + // Only EDS type clusters have endpoint data + StatusOr data = + edsWatcher != null ? edsWatcher.getData() : null; + clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), data); + builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig)); + } else { + builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus())); + } + } + + return builder.build(); + } + + @Override + public String toString() { + return logId.toString(); + } + + private static class TypeWatchers { + // Key is resource name + final Map> watchers = new HashMap<>(); + final XdsResourceType resourceType; + + TypeWatchers(XdsResourceType resourceType) { + this.resourceType = resourceType; + } + + public void add(String resourceName, XdsWatcherBase watcher) { + watchers.put(resourceName, watcher); + } + } + + public interface XdsConfigWatcher { + + void onUpdate(XdsConfig config); + + // These 2 methods are invoked when there is an error or + // does-not-exist on LDS or RDS only. The context will be a + // human-readable string indicating the scope in which the error + // occurred (e.g., the resource type and name). + void onError(String resourceContext, Status status); + + void onResourceDoesNotExist(String resourceContext); + } + + private class ClusterSubscription implements Closeable { + String clusterName; + + public ClusterSubscription(String clusterName) { + this.clusterName = clusterName; + } + + public String getClusterName() { + return clusterName; + } + + @Override + public void close() throws IOException { + releaseSubscription(this); + } + } + + private abstract static class XdsWatcherBase + implements ResourceWatcher { + private final XdsResourceType type; + private final String resourceName; + @Nullable + private StatusOr data; + + + private XdsWatcherBase(XdsResourceType type, String resourceName) { + this.type = checkNotNull(type, "type"); + this.resourceName = checkNotNull(resourceName, "resourceName"); + } + + @Override + public void onError(Status error) { + checkNotNull(error, "error"); + setDataAsStatus(error); + } + + protected void handleDoesNotExist(String resourceName) { + checkArgument(this.resourceName.equals(resourceName), "Resource name does not match"); + setDataAsStatus(Status.UNAVAILABLE.withDescription("No " + toContextString())); + } + + boolean missingResult() { + return data == null; + } + + @Nullable + StatusOr getData() { + return data; + } + + boolean hasDataValue() { + return data != null && data.hasValue(); + } + + String resourceName() { + return resourceName; + } + + protected void setData(T data) { + checkNotNull(data, "data"); + this.data = StatusOr.fromValue(data); + } + + protected void setDataAsStatus(Status status) { + checkNotNull(status, "status"); + this.data = StatusOr.fromStatus(status); + } + + String toContextString() { + return toContextStr(type.typeName(), resourceName); + } + } + + private class LdsWatcher extends XdsWatcherBase { + String rdsName; + + private LdsWatcher(String resourceName) { + super(XdsListenerResource.getInstance(), resourceName); + } + + @Override + public void onChanged(XdsListenerResource.LdsUpdate update) { + checkNotNull(update, "update"); + + HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); + List virtualHosts = httpConnectionManager.virtualHosts(); + String rdsName = httpConnectionManager.rdsName(); + VirtualHost activeVirtualHost = getActiveVirtualHost(); + + boolean changedRdsName = !Objects.equals(rdsName, this.rdsName); + if (changedRdsName) { + cleanUpRdsWatcher(); + } + + if (virtualHosts != null) { + // No RDS watcher since we are getting RDS updates via LDS + updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null); + this.rdsName = null; + } else if (changedRdsName) { + cleanUpRdsWatcher(); + this.rdsName = rdsName; + addWatcher(new RdsWatcher(rdsName)); + logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); + } + + setData(update); + maybePublishConfig(); + } + + @Override + public void onError(Status error) { + super.onError(checkNotNull(error, "error")); + xdsConfigWatcher.onError(toContextString(), error); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(resourceName); + xdsConfigWatcher.onResourceDoesNotExist(toContextString()); + } + + private void cleanUpRdsWatcher() { + RdsWatcher oldRdsWatcher = getRdsWatcher(); + if (oldRdsWatcher != null) { + cancelWatcher(oldRdsWatcher); + logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName); + + // Cleanup clusters (as appropriate) that had the old rds watcher as a parent + if (!oldRdsWatcher.hasDataValue() || !oldRdsWatcher.getData().hasValue() + || resourceWatchers.get(CLUSTER_RESOURCE) == null) { + return; + } + for (XdsWatcherBase watcher : + resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) { + cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher); + } + } + } + + private RdsWatcher getRdsWatcher() { + TypeWatchers watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (watchers == null || rdsName == null || watchers.watchers.isEmpty()) { + return null; + } + + return (RdsWatcher) watchers.watchers.get(rdsName); + } + } + + private class RdsWatcher extends XdsWatcherBase { + + public RdsWatcher(String resourceName) { + super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName")); + } + + @Override + public void onChanged(RdsUpdate update) { + checkNotNull(update, "update"); + RdsUpdate oldData = hasDataValue() ? getData().getValue() : null; + VirtualHost oldVirtualHost = + (oldData != null) + ? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority) + : null; + setData(update); + updateRoutes(update.virtualHosts, this, oldVirtualHost, true); + maybePublishConfig(); + } + + @Override + public void onError(Status error) { + super.onError(checkNotNull(error, "error")); + xdsConfigWatcher.onError(toContextString(), error); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(checkNotNull(resourceName, "resourceName")); + xdsConfigWatcher.onResourceDoesNotExist(toContextString()); + } + + ImmutableList getCdsNames() { + if (!hasDataValue() || getData().getValue().virtualHosts == null) { + return ImmutableList.of(); + } + + return ImmutableList.copyOf(getClusterNamesFromVirtualHost(getActiveVirtualHost())); + } + } + + private class CdsWatcher extends XdsWatcherBase { + Map parentContexts = new HashMap<>(); + + CdsWatcher(String resourceName, Object parentContext, int depth) { + super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName")); + this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth); + } + + @Override + public void onChanged(XdsClusterResource.CdsUpdate update) { + checkNotNull(update, "update"); + switch (update.clusterType()) { + case EDS: + setData(update); + if (!addEdsWatcher(update.edsServiceName(), this)) { + maybePublishConfig(); + } + break; + case LOGICAL_DNS: + setData(update); + maybePublishConfig(); + // no eds needed + break; + case AGGREGATE: + Object parentContext = this; + int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1; + if (depth > MAX_CLUSTER_RECURSION_DEPTH) { + logger.log(XdsLogger.XdsLogLevel.WARNING, + "Cluster recursion depth limit exceeded for cluster {0}", resourceName()); + Status error = Status.UNAVAILABLE.withDescription( + "aggregate cluster graph exceeds max depth"); + setDataAsStatus(error); + } + if (hasDataValue()) { + Set oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames()); + Set newNames = new HashSet<>(update.prioritizedClusterNames()); + + + Set deletedClusters = Sets.difference(oldNames, newNames); + deletedClusters.forEach((cluster) + -> cancelClusterWatcherTree(getCluster(cluster), parentContext)); + + if (depth <= MAX_CLUSTER_RECURSION_DEPTH) { + setData(update); + Set addedClusters = Sets.difference(newNames, oldNames); + addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth)); + + if (addedClusters.isEmpty()) { + maybePublishConfig(); + } + } else { // data was set to error status above + maybePublishConfig(); + } + + } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) { + setData(update); + update.prioritizedClusterNames() + .forEach(name -> addClusterWatcher(name, parentContext, depth)); + maybePublishConfig(); + } + break; + default: + Status error = Status.UNAVAILABLE.withDescription( + "aggregate cluster graph exceeds max depth"); + setDataAsStatus(error); + maybePublishConfig(); + } + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(checkNotNull(resourceName, "resourceName")); + maybePublishConfig(); + } + } + + // Returns true if the watcher was added, false if it already exists + private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) { + TypeWatchers typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance()); + if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) { + addWatcher(new EdsWatcher(edsServiceName, parentContext)); + return true; + } + + EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName); + watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence + return false; + } + + private void addClusterWatcher(String clusterName, Object parentContext, int depth) { + TypeWatchers clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE); + if (clusterWatchers != null) { + CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName); + if (watcher != null) { + watcher.parentContexts.put(parentContext, depth); + return; + } + } + + addWatcher(new CdsWatcher(clusterName, parentContext, depth)); + } + + private class EdsWatcher extends XdsWatcherBase { + private final Set parentContexts = new HashSet<>(); + + private EdsWatcher(String resourceName, CdsWatcher parentContext) { + super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName")); + parentContexts.add(checkNotNull(parentContext, "parentContext")); + } + + @Override + public void onChanged(XdsEndpointResource.EdsUpdate update) { + setData(checkNotNull(update, "update")); + maybePublishConfig(); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + handleDoesNotExist(checkNotNull(resourceName, "resourceName")); + maybePublishConfig(); + } + + void addParentContext(CdsWatcher parentContext) { + parentContexts.add(checkNotNull(parentContext, "parentContext")); + } + } + + private void updateRoutes(List virtualHosts, Object newParentContext, + VirtualHost oldVirtualHost, boolean sameParentContext) { + VirtualHost virtualHost = + RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority); + if (virtualHost == null) { + String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority; + logger.log(XdsLogger.XdsLogLevel.WARNING, error); + cleanUpRoutes(); + xdsConfigWatcher.onError( + "xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error)); + return; + } + + Set newClusters = getClusterNamesFromVirtualHost(virtualHost); + Set oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost); + + if (sameParentContext) { + // Calculate diffs. + Set addedClusters = Sets.difference(newClusters, oldClusters); + Set deletedClusters = Sets.difference(oldClusters, newClusters); + + deletedClusters.forEach(watcher -> + cancelClusterWatcherTree(getCluster(watcher), newParentContext)); + addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1)); + } else { + newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1)); + } + } + + private static Set getClusterNamesFromVirtualHost(VirtualHost virtualHost) { + if (virtualHost == null) { + return Collections.emptySet(); + } + + // Get all cluster names to which requests can be routed through the virtual host. + Set clusters = new HashSet<>(); + for (VirtualHost.Route route : virtualHost.routes()) { + VirtualHost.Route.RouteAction action = route.routeAction(); + if (action == null) { + continue; + } + if (action.cluster() != null) { + clusters.add(action.cluster()); + } else if (action.weightedClusters() != null) { + for (ClusterWeight weighedCluster : action.weightedClusters()) { + clusters.add(weighedCluster.name()); + } + } + } + + return clusters; + } + + @Nullable + private VirtualHost getActiveVirtualHost() { + TypeWatchers rdsWatchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (rdsWatchers == null) { + return null; + } + + RdsWatcher activeRdsWatcher = + (RdsWatcher) rdsWatchers.watchers.values().stream().findFirst().orElse(null); + if (activeRdsWatcher == null || activeRdsWatcher.missingResult() + || !activeRdsWatcher.getData().hasValue()) { + return null; + } + + return RoutingUtils.findVirtualHostForHostName( + activeRdsWatcher.getData().getValue().virtualHosts, dataPlaneAuthority); + } + + // Must be in SyncContext + private void cleanUpRoutes() { + // Remove RdsWatcher & CDS Watchers + TypeWatchers rdsResourceWatcher = + resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (rdsResourceWatcher == null || rdsResourceWatcher.watchers.isEmpty()) { + return; + } + + XdsWatcherBase watcher = rdsResourceWatcher.watchers.values().stream().findFirst().get(); + cancelWatcher(watcher); + + // Remove CdsWatchers pointed to by the RdsWatcher + RdsWatcher rdsWatcher = (RdsWatcher) watcher; + for (String cName : rdsWatcher.getCdsNames()) { + CdsWatcher cdsWatcher = getCluster(cName); + if (cdsWatcher != null) { + cancelClusterWatcherTree(cdsWatcher, rdsWatcher); + } + } + } + + private CdsWatcher getCluster(String clusterName) { + return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName); + } +} diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 4304d1d9e6f..034779ed023 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -582,8 +582,7 @@ private void handleResourceUpdate( String errorDetail = null; if (errors.isEmpty()) { checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); - controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, - args.nonce); + controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce); } else { errorDetail = Joiner.on('\n').join(errors); logger.log(XdsLogLevel.WARNING, diff --git a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java index ac1c4829c74..39761912ea5 100644 --- a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java +++ b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.protobuf.Any; -import com.google.protobuf.BoolValue; import com.google.protobuf.Message; import com.google.protobuf.UInt32Value; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -45,7 +44,6 @@ import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.NonForwardingAction; import io.envoyproxy.envoy.config.route.v3.Route; -import io.envoyproxy.envoy.config.route.v3.RouteAction; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.config.route.v3.RouteMatch; import io.envoyproxy.envoy.config.route.v3.VirtualHost; @@ -239,24 +237,7 @@ void setEdsConfig(String edsName, ClusterLoadAssignment clusterLoadAssignment) { * Builds a new default RDS configuration. */ static RouteConfiguration buildRouteConfiguration(String authority) { - return buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME); - } - - static RouteConfiguration buildRouteConfiguration(String authority, String rdsName, - String clusterName) { - VirtualHost.Builder vhBuilder = VirtualHost.newBuilder() - .setName(rdsName) - .addDomains(authority) - .addRoutes( - Route.newBuilder() - .setMatch( - RouteMatch.newBuilder().setPrefix("/").build()) - .setRoute( - RouteAction.newBuilder().setCluster(clusterName) - .setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build()) - .build())); - VirtualHost virtualHost = vhBuilder.build(); - return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + return XdsTestUtils.buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME); } /** diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 7cf6280711f..97c2695f209 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -205,7 +205,7 @@ private static void setAdsConfig(ControlPlaneRule controlPlane, String serverNam ControlPlaneRule.buildClientListener(MAIN_SERVER, serverName)); controlPlane.setRdsConfig(rdsName, - ControlPlaneRule.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName)); + XdsTestUtils.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName)); controlPlane.setCdsConfig(clusterName, ControlPlaneRule.buildCluster(clusterName, edsName)); controlPlane.setEdsConfig(edsName, diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java new file mode 100644 index 00000000000..96aeb0f41fb --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -0,0 +1,784 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE; +import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; +import static io.grpc.xds.XdsTestUtils.CLUSTER_NAME; +import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME; +import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT; +import static io.grpc.xds.XdsTestUtils.RDS_NAME; +import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster; +import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.protobuf.Message; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.grpc.BindableService; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusOr; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.XdsListenerResource.LdsUpdate; +import io.grpc.xds.client.CommonBootstrapperTestUtils; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClientMetricReporter; +import io.grpc.xds.client.XdsResourceType; +import io.grpc.xds.client.XdsTransportFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** Unit tests for {@link XdsDependencyManager}. */ +@RunWith(JUnit4.class) +public class XdsDependencyManagerTest { + private static final Logger log = Logger.getLogger(XdsDependencyManagerTest.class.getName()); + public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName(); + public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName(); + + @Mock + private XdsClientMetricReporter xdsClientMetricReporter; + + private final SynchronizationContext syncContext = + new SynchronizationContext(mock(Thread.UncaughtExceptionHandler.class)); + + private ManagedChannel channel; + private XdsClientImpl xdsClient; + private XdsDependencyManager xdsDependencyManager; + private TestWatcher xdsConfigWatcher; + private Server xdsServer; + + private final FakeClock fakeClock = new FakeClock(); + private final String serverName = InProcessServerBuilder.generateName(); + private final Queue loadReportCalls = new ArrayDeque<>(); + private final AtomicBoolean adsEnded = new AtomicBoolean(true); + private final AtomicBoolean lrsEnded = new AtomicBoolean(true); + private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService(); + private final BindableService lrsService = + XdsTestUtils.createLrsService(lrsEnded, loadReportCalls); + + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + private TestWatcher testWatcher; + private XdsConfig defaultXdsConfig; // set in setUp() + + @Captor + private ArgumentCaptor xdsConfigCaptor; + @Captor + private ArgumentCaptor statusCaptor; + + @Before + public void setUp() throws Exception { + xdsServer = cleanupRule.register(InProcessServerBuilder + .forName(serverName) + .addService(controlPlaneService) + .addService(lrsService) + .directExecutor() + .build() + .start()); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName); + + channel = cleanupRule.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + XdsTransportFactory xdsTransportFactory = + ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel); + + xdsClient = CommonBootstrapperTestUtils.createXdsClient( + Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock, + new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); + + testWatcher = new TestWatcher(); + xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher)); + defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName); + } + + @After + public void tearDown() throws InterruptedException { + if (xdsDependencyManager != null) { + xdsDependencyManager.shutdown(); + } + xdsClient.shutdown(); + channel.shutdown(); // channel not owned by XdsClient + + xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + + assertThat(adsEnded.get()).isTrue(); + assertThat(lrsEnded.get()).isTrue(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + @Test + public void verify_basic_config() { + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + testWatcher.verifyStats(1, 0, 0); + } + + @Test + public void verify_config_update() { + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + testWatcher.verifyStats(1, 0, 0); + assertThat(testWatcher.lastConfig).isEqualTo(defaultXdsConfig); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT + 2); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(ArgumentMatchers.notNull()); + testWatcher.verifyStats(2, 0, 0); + assertThat(testWatcher.lastConfig).isNotEqualTo(defaultXdsConfig); + } + + @Test + public void verify_simple_aggregate() { + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + + List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); + String rootName = "root_c"; + + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, rootName); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + + XdsTestUtils.setAggregateCdsConfig(controlPlaneService, serverName, rootName, childNames); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + Map> lastConfigClusters = + testWatcher.lastConfig.getClusters(); + assertThat(lastConfigClusters).hasSize(childNames.size() + 1); + StatusOr rootC = lastConfigClusters.get(rootName); + XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource(); + assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE); + assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames); + + for (String childName : childNames) { + assertThat(lastConfigClusters).containsKey(childName); + XdsClusterResource.CdsUpdate childResource = + lastConfigClusters.get(childName).getValue().getClusterResource(); + assertThat(childResource.clusterType()).isEqualTo(EDS); + assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName)); + + StatusOr endpoint = + lastConfigClusters.get(childName).getValue().getEndpoint(); + assertThat(endpoint.hasValue()).isTrue(); + assertThat(endpoint.getValue().clusterName).isEqualTo(getEdsNameForCluster(childName)); + } + } + + @Test + public void testComplexRegisteredAggregate() throws IOException { + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + + // Do initialization + String rootName1 = "root_c"; + List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); + XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames); + + String rootName2 = "root_2"; + List childNames2 = Arrays.asList("clusterA", "clusterX"); + XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2); + + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + Closeable subscription2 = xdsDependencyManager.subscribeToCluster(rootName2); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + testWatcher.verifyStats(3, 0, 0); + ImmutableSet.Builder builder = ImmutableSet.builder(); + Set expectedClusters = builder.add(rootName1).add(rootName2).add(CLUSTER_NAME) + .addAll(childNames).addAll(childNames2).build(); + assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters); + + // Close 1 subscription shouldn't affect the other or RDS subscriptions + subscription1.close(); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + builder = ImmutableSet.builder(); + Set expectedClusters2 = + builder.add(rootName2).add(CLUSTER_NAME).addAll(childNames2).build(); + assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters2); + + subscription2.close(); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + } + + @Test + public void testDelayedSubscription() { + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + + String rootName1 = "root_c"; + List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); + + Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); + assertThat(subscription1).isNotNull(); + fakeClock.forwardTime(16, TimeUnit.SECONDS); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture()); + assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).toString()).isEqualTo( + StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( + "No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString()); + + XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture()); + assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue(); + } + + @Test + public void testMissingCdsAndEds() { + // update config so that agg cluster references 2 existing & 1 non-existing cluster + List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); + Cluster cluster = XdsTestUtils.buildAggCluster(CLUSTER_NAME, childNames); + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + + clusterMap.put(CLUSTER_NAME, cluster); + for (int i = 0; i < childNames.size() - 1; i++) { + String edsName = XdsTestUtils.EDS_NAME + i; + Cluster child = ControlPlaneRule.buildCluster(childNames.get(i), edsName); + clusterMap.put(childNames.get(i), child); + } + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + + // Update config so that one of the 2 "valid" clusters has an EDS resource, the other does not + // and there is an EDS that doesn't have matching clusters + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, XdsTestUtils.EDS_NAME + 0); + edsMap.put(XdsTestUtils.EDS_NAME + 0, clusterLoadAssignment); + clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, "garbageEds"); + edsMap.put("garbageEds", clusterLoadAssignment); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + + fakeClock.forwardTime(16, TimeUnit.SECONDS); + verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + + List> returnedClusters = new ArrayList<>(); + for (String childName : childNames) { + returnedClusters.add(xdsConfigCaptor.getValue().getClusters().get(childName)); + } + + // Check that missing cluster reported Status and the other 2 are present + Status expectedClusterStatus = Status.UNAVAILABLE.withDescription( + "No " + toContextStr(CLUSTER_TYPE_NAME, childNames.get(2))); + StatusOr missingCluster = returnedClusters.get(2); + assertThat(missingCluster.getStatus().toString()).isEqualTo(expectedClusterStatus.toString()); + assertThat(returnedClusters.get(0).hasValue()).isTrue(); + assertThat(returnedClusters.get(1).hasValue()).isTrue(); + + // Check that missing EDS reported Status, the other one is present and the garbage EDS is not + Status expectedEdsStatus = Status.UNAVAILABLE.withDescription( + "No " + toContextStr(ENDPOINT_TYPE_NAME, XdsTestUtils.EDS_NAME + 1)); + assertThat(returnedClusters.get(0).getValue().getEndpoint().hasValue()).isTrue(); + assertThat(returnedClusters.get(1).getValue().getEndpoint().hasValue()).isFalse(); + assertThat(returnedClusters.get(1).getValue().getEndpoint().getStatus().toString()) + .isEqualTo(expectedEdsStatus.toString()); + + verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any()); + testWatcher.verifyStats(1, 0, 0); + } + + @Test + public void testMissingLds() { + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, "badLdsName"); + + fakeClock.forwardTime(16, TimeUnit.SECONDS); + verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist( + toContextStr(XdsListenerResource.getInstance().typeName(), "badLdsName")); + + testWatcher.verifyStats(0, 0, 1); + } + + @Test + public void testMissingRds() { + Listener serverListener = ControlPlaneRule.buildServerListener(); + Listener clientListener = + ControlPlaneRule.buildClientListener(serverName, serverName, "badRdsName"); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, + ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); + + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + + fakeClock.forwardTime(16, TimeUnit.SECONDS); + verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist( + toContextStr(XdsRouteConfigureResource.getInstance().typeName(), "badRdsName")); + + testWatcher.verifyStats(0, 0, 1); + } + + @Test + public void testUpdateToMissingVirtualHost() { + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + WrappedXdsClient wrappedXdsClient = new WrappedXdsClient(xdsClient, syncContext); + xdsDependencyManager = new XdsDependencyManager( + wrappedXdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + + // Update with a config that has a virtual host that doesn't match the server name + wrappedXdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts()); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onError(any(), statusCaptor.capture()); + assertThat(statusCaptor.getValue().getDescription()) + .isEqualTo("Failed to find virtual host matching hostname: " + serverName); + + testWatcher.verifyStats(1, 1, 0); + + wrappedXdsClient.shutdown(); + } + + private List buildUnmatchedVirtualHosts() { + io.grpc.xds.VirtualHost.Route route1 = + io.grpc.xds.VirtualHost.Route.forAction( + io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/GreetService/bye"), + io.grpc.xds.VirtualHost.Route.RouteAction.forCluster( + "cluster-bar.googleapis.com", Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L), null, false), ImmutableMap.of()); + io.grpc.xds.VirtualHost.Route route2 = + io.grpc.xds.VirtualHost.Route.forAction( + io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/HelloService/hi"), + io.grpc.xds.VirtualHost.Route.RouteAction.forCluster( + "cluster-foo.googleapis.com", Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L), null, false), + ImmutableMap.of()); + return Arrays.asList( + io.grpc.xds.VirtualHost.create("virtualhost-foo", Collections.singletonList("hello" + + ".googleapis.com"), + Collections.singletonList(route1), + ImmutableMap.of()), + io.grpc.xds.VirtualHost.create("virtualhost-bar", Collections.singletonList("hi" + + ".googleapis.com"), + Collections.singletonList(route2), + ImmutableMap.of())); + } + + @Test + public void testCorruptLds() { + String ldsResourceName = + "xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1"; + + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, ldsResourceName); + + Status expectedStatus = Status.INVALID_ARGUMENT.withDescription( + "Wrong configuration: xds server does not exist for resource " + ldsResourceName); + String context = toContextStr(XdsListenerResource.getInstance().typeName(), ldsResourceName); + verify(xdsConfigWatcher, timeout(1000)) + .onError(eq(context), argThat(new XdsTestUtils.StatusMatcher(expectedStatus))); + + fakeClock.forwardTime(16, TimeUnit.SECONDS); + testWatcher.verifyStats(0, 1, 0); + } + + @Test + public void testChangeRdsName_fromLds() { + // TODO implement + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + Listener serverListener = ControlPlaneRule.buildServerListener(); + + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + + String newRdsName = "newRdsName1"; + + Listener clientListener = buildInlineClientListener(newRdsName, CLUSTER_NAME); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, + ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + assertThat(xdsConfigCaptor.getValue()).isNotEqualTo(defaultXdsConfig); + assertThat(xdsConfigCaptor.getValue().getVirtualHost().name()).isEqualTo(newRdsName); + } + + @Test + public void testMultipleParentsInCdsTree() throws IOException { + /* + * Configure Xds server with the following cluster tree and point RDS to root: + 2 aggregates under root A & B + B has EDS Cluster B1 && shared agg AB1; A has agg A1 && shared agg AB1 + A1 has shared EDS Cluster A11 && shared agg AB1 + AB1 has shared EDS Clusters A11 && AB11 + + As an alternate visualization, parents are: + A -> root, B -> root, A1 -> A, AB1 -> A|B|A1, B1 -> B, A11 -> A1|AB1, AB11 -> AB1 + */ + Cluster rootCluster = + XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA", "clusterB")); + Cluster clusterA = + XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA1", "clusterAB1")); + Cluster clusterB = + XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterB1", "clusterAB1")); + Cluster clusterA1 = + XdsTestUtils.buildAggCluster("clusterA1", Arrays.asList("clusterA11", "clusterAB1")); + Cluster clusterAB1 = + XdsTestUtils.buildAggCluster("clusterAB1", Arrays.asList("clusterA11", "clusterAB11")); + + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + + clusterMap.put("root", rootCluster); + clusterMap.put("clusterA", clusterA); + clusterMap.put("clusterB", clusterB); + clusterMap.put("clusterA1", clusterA1); + clusterMap.put("clusterAB1", clusterAB1); + + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterAB11", "clusterB1"); + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "root"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + // Start the actual test + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + XdsConfig initialConfig = xdsConfigCaptor.getValue(); + + // Make sure that adding subscriptions that rds points at doesn't change the config + Closeable rootSub = xdsDependencyManager.subscribeToCluster("root"); + assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + Closeable clusterAB11Sub = xdsDependencyManager.subscribeToCluster("clusterAB11"); + assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + + // Make sure that closing subscriptions that rds points at doesn't change the config + rootSub.close(); + assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + clusterAB11Sub.close(); + assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + + // Make an explicit root subscription and then change RDS to point to A11 + rootSub = xdsDependencyManager.subscribeToCluster("root"); + RouteConfiguration newRouteConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA11"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig)); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(8); + + // Now that it is released, we should only have A11 + rootSub.close(); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).containsExactly("clusterA11"); + } + + @Test + public void testMultipleCdsReferToSameEds() { + // Create the maps and Update the config to have 2 clusters that refer to the same EDS resource + String edsName = "sharedEds"; + + Cluster rootCluster = + XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA", "clusterB")); + Cluster clusterA = ControlPlaneRule.buildCluster("clusterA", edsName); + Cluster clusterB = ControlPlaneRule.buildCluster("clusterB", edsName); + + Map clusterMap = new HashMap<>(); + clusterMap.put("root", rootCluster); + clusterMap.put("clusterA", clusterA); + clusterMap.put("clusterB", clusterB); + + Map edsMap = new HashMap<>(); + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, edsName); + edsMap.put(edsName, clusterLoadAssignment); + + RouteConfiguration routeConfig = + XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "root"); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + // Start the actual test + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + XdsConfig initialConfig = xdsConfigCaptor.getValue(); + assertThat(initialConfig.getClusters().keySet()) + .containsExactly("root", "clusterA", "clusterB"); + + XdsEndpointResource.EdsUpdate edsForA = + initialConfig.getClusters().get("clusterA").getValue().getEndpoint().getValue(); + assertThat(edsForA.clusterName).isEqualTo(edsName); + XdsEndpointResource.EdsUpdate edsForB = + initialConfig.getClusters().get("clusterB").getValue().getEndpoint().getValue(); + assertThat(edsForB.clusterName).isEqualTo(edsName); + assertThat(edsForA).isEqualTo(edsForB); + edsForA.localityLbEndpointsMap.values().forEach( + localityLbEndpoints -> assertThat(localityLbEndpoints.endpoints()).hasSize(1)); + } + + @Test + public void testChangeRdsName_FromLds_complexTree() { + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + + // Create the same tree as in testMultipleParentsInCdsTree + Cluster rootCluster = + XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA", "clusterB")); + Cluster clusterA = + XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA1", "clusterAB1")); + Cluster clusterB = + XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterB1", "clusterAB1")); + Cluster clusterA1 = + XdsTestUtils.buildAggCluster("clusterA1", Arrays.asList("clusterA11", "clusterAB1")); + Cluster clusterAB1 = + XdsTestUtils.buildAggCluster("clusterAB1", Arrays.asList("clusterA11", "clusterAB11")); + + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + + clusterMap.put("root", rootCluster); + clusterMap.put("clusterA", clusterA); + clusterMap.put("clusterB", clusterB); + clusterMap.put("clusterA1", clusterA1); + clusterMap.put("clusterAB1", clusterAB1); + + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterAB11", "clusterB1"); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + inOrder.verify(xdsConfigWatcher, atLeastOnce()).onUpdate(any()); + + // Do the test + String newRdsName = "newRdsName1"; + Listener clientListener = buildInlineClientListener(newRdsName, "root"); + Listener serverListener = ControlPlaneRule.buildServerListener(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, + ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + XdsConfig config = xdsConfigCaptor.getValue(); + assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName); + assertThat(config.getClusters().size()).isEqualTo(8); + } + + @Test + public void testChangeAggCluster() { + InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); + + xdsDependencyManager = new XdsDependencyManager( + xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + inOrder.verify(xdsConfigWatcher, atLeastOnce()).onUpdate(any()); + + // Setup initial config A -> A1 -> (A11, A12) + Cluster rootCluster = + XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA")); + Cluster clusterA = + XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA1")); + Cluster clusterA1 = + XdsTestUtils.buildAggCluster("clusterA1", Arrays.asList("clusterA11", "clusterA12")); + + Map clusterMap = new HashMap<>(); + Map edsMap = new HashMap<>(); + + clusterMap.put("root", rootCluster); + clusterMap.put("clusterA", clusterA); + clusterMap.put("clusterA1", clusterA1); + + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterA12"); + Listener clientListener = buildInlineClientListener(RDS_NAME, "root"); + Listener serverListener = ControlPlaneRule.buildServerListener(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, + ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); + + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + inOrder.verify(xdsConfigWatcher).onUpdate(any()); + + // Update the cluster to A -> A2 -> (A21, A22) + Cluster clusterA2 = + XdsTestUtils.buildAggCluster("clusterA2", Arrays.asList("clusterA21", "clusterA22")); + clusterA = + XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA2")); + clusterMap.clear(); + edsMap.clear(); + clusterMap.put("root", rootCluster); + clusterMap.put("clusterA", clusterA); + clusterMap.put("clusterA2", clusterA2); + XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA21", "clusterA22"); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + + // Verify that the config is updated as expected + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + XdsConfig config = xdsConfigCaptor.getValue(); + assertThat(config.getClusters().keySet()).containsExactly("root", "clusterA", "clusterA2", + "clusterA21", "clusterA22"); + } + + private Listener buildInlineClientListener(String rdsName, String clusterName) { + return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); + } + + + private static String toContextStr(String type, String resourceName) { + return type + " resource: " + resourceName; + } + + private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher { + XdsConfig lastConfig; + int numUpdates = 0; + int numError = 0; + int numDoesNotExist = 0; + + @Override + public void onUpdate(XdsConfig config) { + log.fine("Config changed: " + config); + lastConfig = config; + numUpdates++; + } + + @Override + public void onError(String resourceContext, Status status) { + log.fine(String.format("Error %s for %s: ", status, resourceContext)); + numError++; + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + log.fine("Resource does not exist: " + resourceName); + numDoesNotExist++; + } + + private List getStats() { + return Arrays.asList(numUpdates, numError, numDoesNotExist); + } + + private void verifyStats(int updt, int err, int notExist) { + assertThat(getStats()).isEqualTo(Arrays.asList(updt, err, notExist)); + } + } + + private static class WrappedXdsClient extends XdsClient { + private final XdsClient delegate; + private final SynchronizationContext syncContext; + private ResourceWatcher ldsWatcher; + + WrappedXdsClient(XdsClient delegate, SynchronizationContext syncContext) { + this.delegate = delegate; + this.syncContext = syncContext; + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + @SuppressWarnings("unchecked") + public void watchXdsResource( + XdsResourceType type, String resourceName, ResourceWatcher watcher, + Executor executor) { + if (type.equals(XdsListenerResource.getInstance())) { + ldsWatcher = (ResourceWatcher) watcher; + } + delegate.watchXdsResource(type, resourceName, watcher, executor); + } + + + + @Override + public void cancelXdsResourceWatch(XdsResourceType type, + String resourceName, + ResourceWatcher watcher) { + delegate.cancelXdsResourceWatch(type, resourceName, watcher); + } + + void deliverLdsUpdate(long httpMaxStreamDurationNano, + List virtualHosts) { + syncContext.execute(() -> { + LdsUpdate ldsUpdate = LdsUpdate.forApiListener( + io.grpc.xds.HttpConnectionManager.forVirtualHosts( + httpMaxStreamDurationNano, virtualHosts, null)); + ldsWatcher.onChanged(ldsUpdate); + }); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java index 98f5fcbfef9..a54893c9075 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java @@ -106,7 +106,7 @@ public void setXdsConfig(final String type, final Map copyResources = new HashMap<>(resources); xdsResources.put(type, copyResources); - String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndDecrement()); + String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndIncrement()); for (Map.Entry, Set> entry : subscribers.get(type).entrySet()) { @@ -119,6 +119,11 @@ public void run() { }); } + ImmutableMap getCurrentConfig(String type) { + HashMap hashMap = xdsResources.get(type); + return (hashMap != null) ? ImmutableMap.copyOf(hashMap) : ImmutableMap.of(); + } + @Override public StreamObserver streamAggregatedResources( final StreamObserver responseObserver) { @@ -159,7 +164,7 @@ public void run() { DiscoveryResponse response = generateResponse(resourceType, String.valueOf(xdsVersions.get(resourceType)), - String.valueOf(xdsNonces.get(resourceType).get(responseObserver)), + String.valueOf(xdsNonces.get(resourceType).get(responseObserver).addAndGet(1)), requestedResourceNames); responseObserver.onNext(response); subscribers.get(resourceType).put(responseObserver, requestedResourceNames); diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java new file mode 100644 index 00000000000..7f5ec0b27c6 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -0,0 +1,423 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.BoolValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig; +import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.Status; +import io.grpc.StatusOr; +import io.grpc.internal.JsonParser; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.Endpoints.LbEndpoint; +import io.grpc.xds.Endpoints.LocalityLbEndpoints; +import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.Locality; +import io.grpc.xds.client.XdsResourceType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +public class XdsTestUtils { + private static final Logger log = Logger.getLogger(XdsTestUtils.class.getName()); + static final String RDS_NAME = "route-config.googleapis.com"; + static final String CLUSTER_NAME = "cluster0"; + static final String EDS_NAME = "eds-service-0"; + static final String SERVER_LISTENER = "grpc/server?udpa.resource.listening_address="; + static final String HTTP_CONNECTION_MANAGER_TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" + + ".HttpConnectionManager"; + public static final String ENDPOINT_HOSTNAME = "data-host"; + public static final int ENDPOINT_PORT = 1234; + + static BindableService createLrsService(AtomicBoolean lrsEnded, + Queue loadReportCalls) { + return new LoadReportingServiceGrpc.LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + LrsRpcCall call = new LrsRpcCall(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + } + + static boolean matchErrorDetail( + com.google.rpc.Status errorDetail, int expectedCode, List expectedMessages) { + if (expectedCode != errorDetail.getCode()) { + return false; + } + List errors = Splitter.on('\n').splitToList(errorDetail.getMessage()); + if (errors.size() != expectedMessages.size()) { + return false; + } + for (int i = 0; i < errors.size(); i++) { + if (!errors.get(i).startsWith(expectedMessages.get(i))) { + return false; + } + } + return true; + } + + static void setAdsConfig(XdsTestControlPlaneService service, String serverName) { + setAdsConfig(service, serverName, RDS_NAME, CLUSTER_NAME, EDS_NAME, ENDPOINT_HOSTNAME, + ENDPOINT_PORT); + } + + static void setAdsConfig(XdsTestControlPlaneService service, String serverName, String rdsName, + String clusterName, String edsName, String endpointHostname, + int endpointPort) { + + Listener serverListener = ControlPlaneRule.buildServerListener(); + Listener clientListener = ControlPlaneRule.buildClientListener(serverName, serverName, rdsName); + service.setXdsConfig(ADS_TYPE_URL_LDS, + ImmutableMap.of(SERVER_LISTENER, serverListener, serverName, clientListener)); + + RouteConfiguration routeConfig = + buildRouteConfiguration(serverName, rdsName, clusterName); + service.setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(rdsName, routeConfig));; + + Cluster cluster = ControlPlaneRule.buildCluster(clusterName, edsName); + service.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(clusterName, cluster)); + + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + serverName, endpointHostname, endpointPort, edsName); + service.setXdsConfig(ADS_TYPE_URL_EDS, + ImmutableMap.of(edsName, clusterLoadAssignment)); + + log.log(Level.FINE, String.format("Set ADS config for %s with address %s:%d", + serverName, endpointHostname, endpointPort)); + + } + + static String getEdsNameForCluster(String clusterName) { + return "eds_" + clusterName; + } + + static void setAggregateCdsConfig(XdsTestControlPlaneService service, String serverName, + String clusterName, List children) { + Map clusterMap = new HashMap<>(); + + ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(children).build(); + Cluster.CustomClusterType type = + Cluster.CustomClusterType.newBuilder() + .setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME) + .setTypedConfig(Any.pack(rootConfig)) + .build(); + Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type); + builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN); + Cluster cluster = builder.build(); + clusterMap.put(clusterName, cluster); + + for (String child : children) { + Cluster childCluster = ControlPlaneRule.buildCluster(child, getEdsNameForCluster(child)); + clusterMap.put(child, childCluster); + } + + service.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + + Map edsMap = new HashMap<>(); + for (String child : children) { + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, getEdsNameForCluster(child)); + edsMap.put(getEdsNameForCluster(child), clusterLoadAssignment); + } + service.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + } + + static void addAggregateToExistingConfig(XdsTestControlPlaneService service, String rootName, + List children) { + Map clusterMap = new HashMap<>(service.getCurrentConfig(ADS_TYPE_URL_CDS)); + if (clusterMap.containsKey(rootName)) { + throw new IllegalArgumentException("Root cluster " + rootName + " already exists"); + } + ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(children).build(); + Cluster.CustomClusterType type = + Cluster.CustomClusterType.newBuilder() + .setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME) + .setTypedConfig(Any.pack(rootConfig)) + .build(); + Cluster.Builder builder = Cluster.newBuilder().setName(rootName).setClusterType(type); + builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN); + Cluster cluster = builder.build(); + clusterMap.put(rootName, cluster); + + for (String child : children) { + if (clusterMap.containsKey(child)) { + continue; + } + Cluster childCluster = ControlPlaneRule.buildCluster(child, getEdsNameForCluster(child)); + clusterMap.put(child, childCluster); + } + + service.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); + + Map edsMap = new HashMap<>(service.getCurrentConfig(ADS_TYPE_URL_EDS)); + for (String child : children) { + if (edsMap.containsKey(getEdsNameForCluster(child))) { + continue; + } + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + child, ENDPOINT_HOSTNAME, ENDPOINT_PORT, getEdsNameForCluster(child)); + edsMap.put(getEdsNameForCluster(child), clusterLoadAssignment); + } + service.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); + } + + static XdsConfig getDefaultXdsConfig(String serverHostName) + throws XdsResourceType.ResourceInvalidException, IOException { + XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); + + Filter.NamedFilterConfig routerFilterConfig = new Filter.NamedFilterConfig( + serverHostName, RouterFilter.ROUTER_CONFIG); + + HttpConnectionManager httpConnectionManager = HttpConnectionManager.forRdsName( + 0L, RDS_NAME, Collections.singletonList(routerFilterConfig)); + XdsListenerResource.LdsUpdate ldsUpdate = + XdsListenerResource.LdsUpdate.forApiListener(httpConnectionManager); + + RouteConfiguration routeConfiguration = + buildRouteConfiguration(serverHostName, RDS_NAME, CLUSTER_NAME); + Bootstrapper.ServerInfo serverInfo = null; + XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, "0", "0", null, null, null); + XdsRouteConfigureResource.RdsUpdate rdsUpdate = + XdsRouteConfigureResource.getInstance().doParse(args, routeConfiguration); + + // Take advantage of knowing that there is only 1 virtual host in the route configuration + assertThat(rdsUpdate.virtualHosts).hasSize(1); + VirtualHost virtualHost = rdsUpdate.virtualHosts.get(0); + + // Need to create endpoints to create locality endpoints map to create edsUpdate + Map lbEndpointsMap = new HashMap<>(); + LbEndpoint lbEndpoint = + LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME); + lbEndpointsMap.put( + Locality.create("", "", ""), + LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0)); + + // Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder + XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate( + EDS_NAME, lbEndpointsMap, Collections.emptyList()); + XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds( + CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null) + .lbPolicyConfig(getWrrLbConfigAsMap()).build(); + XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig( + CLUSTER_NAME, cdsUpdate, StatusOr.fromValue(edsUpdate)); + + builder + .setListener(ldsUpdate) + .setRoute(rdsUpdate) + .setVirtualHost(virtualHost) + .addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig)); + + return builder.build(); + } + + @SuppressWarnings("unchecked") + private static ImmutableMap getWrrLbConfigAsMap() throws IOException { + String lbConfigStr = "{\"wrr_locality_experimental\" : " + + "{ \"childPolicy\" : [{\"round_robin\" : {}}]}}"; + + return ImmutableMap.copyOf((Map) JsonParser.parse(lbConfigStr)); + } + + static RouteConfiguration buildRouteConfiguration(String authority, String rdsName, + String clusterName) { + io.envoyproxy.envoy.config.route.v3.VirtualHost.Builder vhBuilder = + io.envoyproxy.envoy.config.route.v3.VirtualHost.newBuilder() + .setName(rdsName) + .addDomains(authority) + .addRoutes( + Route.newBuilder() + .setMatch( + RouteMatch.newBuilder().setPrefix("/").build()) + .setRoute( + RouteAction.newBuilder().setCluster(clusterName) + .setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build()) + .build())); + io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHost = vhBuilder.build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); + } + + static Cluster buildAggCluster(String name, List childNames) { + ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(childNames).build(); + Cluster.CustomClusterType type = + Cluster.CustomClusterType.newBuilder() + .setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME) + .setTypedConfig(Any.pack(rootConfig)) + .build(); + Cluster.Builder builder = + Cluster.newBuilder().setName(name).setClusterType(type); + builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN); + Cluster cluster = builder.build(); + return cluster; + } + + static void addEdsClusters(Map clusterMap, Map edsMap, + String... clusterNames) { + for (String clusterName : clusterNames) { + String edsName = getEdsNameForCluster(clusterName); + Cluster cluster = ControlPlaneRule.buildCluster(clusterName, edsName); + clusterMap.put(clusterName, cluster); + + ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment( + clusterName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, edsName); + edsMap.put(edsName, clusterLoadAssignment); + } + } + + static Listener buildInlineClientListener(String rdsName, String clusterName, String serverName) { + HttpFilter + httpFilter = HttpFilter.newBuilder() + .setName(serverName) + .setTypedConfig(Any.pack(Router.newBuilder().build())) + .setIsOptional(true) + .build(); + ApiListener.Builder clientListenerBuilder = + ApiListener.newBuilder().setApiListener(Any.pack( + io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3 + .HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration(serverName, rdsName, clusterName)) + .addAllHttpFilters(Collections.singletonList(httpFilter)) + .build(), + HTTP_CONNECTION_MANAGER_TYPE_URL)); + return Listener.newBuilder() + .setName(serverName) + .setApiListener(clientListenerBuilder.build()).build(); + + } + + /** + * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with + * the same list of clusterName:clusterServiceName pair. + */ + static class LrsRequestMatcher implements ArgumentMatcher { + private final List expected; + + private LrsRequestMatcher(List clusterNames) { + expected = new ArrayList<>(); + for (String[] pair : clusterNames) { + expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1])); + } + Collections.sort(expected); + } + + @Override + public boolean matches(LoadStatsRequest argument) { + List actual = new ArrayList<>(); + for (ClusterStats clusterStats : argument.getClusterStatsList()) { + actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName()); + } + Collections.sort(actual); + return actual.equals(expected); + } + } + + static class LrsRpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + private final InOrder inOrder; + + private LrsRpcCall(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + inOrder = inOrder(requestObserver); + } + + protected void verifyNextReportClusters(List clusters) { + inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters))); + } + + protected void sendResponse(List clusters, long loadReportIntervalNano) { + LoadStatsResponse response = + LoadStatsResponse.newBuilder() + .addAllClusters(clusters) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano)) + .build(); + responseObserver.onNext(response); + } + } + + static class StatusMatcher implements ArgumentMatcher { + private final Status expectedStatus; + + StatusMatcher(Status expectedStatus) { + this.expectedStatus = expectedStatus; + } + + @Override + public boolean matches(Status status) { + return status != null && expectedStatus.getCode().equals(status.getCode()) + && expectedStatus.getDescription().equals(status.getDescription()); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java index f3de4549ba9..485970741c1 100644 --- a/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java @@ -34,9 +34,15 @@ import javax.annotation.Nullable; public class CommonBootstrapperTestUtils { + public static final String SERVER_URI = "trafficdirector.googleapis.com"; private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create(); private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com"; private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; + public static final String LDS_RESOURCE = "listener.googleapis.com"; + public static final String RDS_RESOURCE = "route-configuration.googleapis.com"; + public static final String CDS_RESOURCE = "cluster.googleapis.com"; + public static final String EDS_RESOURCE = "cluster-load-assignment.googleapis.com"; + private static final String FILE_WATCHER_CONFIG = "{\"path\": \"/etc/secret/certs\"}"; private static final String MESHCA_CONFIG = "{\n"