Skip to content

xds: remove retained resources logics for non State-of-The-World resources #9724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 3 additions & 54 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.Collection;
Expand Down Expand Up @@ -107,7 +105,6 @@ public void uncaughtException(Thread t, Throwable e) {
private final XdsLogger logger;
private volatile boolean isShutdown;

// TODO(zdapeng): rename to XdsClientImpl
XdsClientImpl(
XdsChannelFactory xdsChannelFactory,
Bootstrapper.BootstrapInfo bootstrapInfo,
Expand Down Expand Up @@ -398,7 +395,6 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
Set<String> invalidResources = result.invalidResources;
Set<String> retainedResources = result.retainedResources;
List<String> errors = result.errors;
String errorDetail = null;
if (errors.isEmpty()) {
Expand Down Expand Up @@ -432,16 +428,14 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
}

// Nothing else to do for incremental ADS resources.
if (xdsResourceType.dependentResource() == null) {
if (!xdsResourceType.isFullStateOfTheWorld()) {
continue;
}

// Handle State of the World ADS: invalid resources.
if (invalidResources.contains(resourceName)) {
// The resource is missing. Reuse the cached resource if possible.
if (subscriber.data != null) {
retainDependentResource(subscriber, retainedResources);
} else {
if (subscriber.data == null) {
// No cached data. Notify the watchers of an invalid update.
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
}
Expand All @@ -451,49 +445,6 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
// Retain any dependent resources if the resource deletion is ignored
// per bootstrap ignore_resource_deletion server feature.
if (!subscriber.absent) {
retainDependentResource(subscriber, retainedResources);
}
}

// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
// LDS/CDS resources should be deleted.
if (xdsResourceType.dependentResource() != null) {
XdsResourceType<?> dependency = xdsResourceType.dependentResource();
Map<String, ResourceSubscriber<? extends ResourceUpdate>> dependentSubscribers =
resourceSubscribers.get(dependency);
if (dependentSubscribers == null) {
return;
}
for (String resource : dependentSubscribers.keySet()) {
if (!retainedResources.contains(resource)) {
dependentSubscribers.get(resource).onAbsent();
}
}
}
}

private void retainDependentResource(
ResourceSubscriber<? extends ResourceUpdate> subscriber, Set<String> retainedResources) {
if (subscriber.data == null) {
return;
}
String resourceName = null;
if (subscriber.type == XdsListenerResource.getInstance()) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
resourceName = hcm.rdsName();
}
} else if (subscriber.type == XdsClusterResource.getInstance()) {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
resourceName = cdsUpdate.edsServiceName();
}

if (resourceName != null) {
retainedResources.add(resourceName);
}
}

Expand Down Expand Up @@ -657,9 +608,7 @@ void onAbsent() {
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion();
boolean isStateOfTheWorld = (type == XdsListenerResource.getInstance()
|| type == XdsClusterResource.getInstance());
if (ignoreResourceDeletionEnabled && isStateOfTheWorld && data != null) {
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
Expand Down
20 changes: 7 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ String typeUrlV2() {
return ADS_TYPE_URL_CDS_V2;
}

@Nullable
@Override
XdsResourceType<?> dependentResource() {
return XdsEndpointResource.getInstance();
boolean isFullStateOfTheWorld() {
return true;
}

@Override
Expand All @@ -101,8 +100,7 @@ Class<Cluster> unpackedClassName() {
}

@Override
CdsUpdate doParse(Args args, Message unpackedMessage,
Set<String> retainedResources, boolean isResourceV3)
CdsUpdate doParse(Args args, Message unpackedMessage, boolean isResourceV3)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof Cluster)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
Expand All @@ -111,20 +109,20 @@ CdsUpdate doParse(Args args, Message unpackedMessage,
if (args.bootstrapInfo != null && args.bootstrapInfo.certProviders() != null) {
certProviderInstances = args.bootstrapInfo.certProviders().keySet();
}
return processCluster((Cluster) unpackedMessage, retainedResources, certProviderInstances,
return processCluster((Cluster) unpackedMessage, certProviderInstances,
args.serverInfo, args.loadBalancerRegistry);
}

@VisibleForTesting
static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEdsResources,
static CdsUpdate processCluster(Cluster cluster,
Set<String> certProviderInstances,
Bootstrapper.ServerInfo serverInfo,
LoadBalancerRegistry loadBalancerRegistry)
throws ResourceInvalidException {
StructOrError<CdsUpdate.Builder> structOrError;
switch (cluster.getClusterDiscoveryTypeCase()) {
case TYPE:
structOrError = parseNonAggregateCluster(cluster, retainedEdsResources,
structOrError = parseNonAggregateCluster(cluster,
certProviderInstances, serverInfo);
break;
case CLUSTER_TYPE:
Expand Down Expand Up @@ -178,8 +176,7 @@ private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cl
}

private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
Cluster cluster, Set<String> edsResources, Set<String> certProviderInstances,
Bootstrapper.ServerInfo serverInfo) {
Cluster cluster, Set<String> certProviderInstances, Bootstrapper.ServerInfo serverInfo) {
String clusterName = cluster.getName();
Bootstrapper.ServerInfo lrsServerInfo = null;
Long maxConcurrentRequests = null;
Expand Down Expand Up @@ -249,9 +246,6 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
// If the service_name field is set, that value will be used for the EDS request.
if (!edsClusterConfig.getServiceName().isEmpty()) {
edsServiceName = edsClusterConfig.getServiceName();
edsResources.add(edsServiceName);
} else {
edsResources.add(clusterName);
}
return StructOrError.fromStruct(CdsUpdate.forEds(
clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext,
Expand Down
8 changes: 3 additions & 5 deletions xds/src/main/java/io/grpc/xds/XdsEndpointResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ String typeUrlV2() {
return ADS_TYPE_URL_EDS_V2;
}

@Nullable
@Override
XdsResourceType<?> dependentResource() {
return null;
boolean isFullStateOfTheWorld() {
return false;
}

@Override
Expand All @@ -90,8 +89,7 @@ Class<ClusterLoadAssignment> unpackedClassName() {
}

@Override
EdsUpdate doParse(Args args, Message unpackedMessage,
Set<String> retainedResources, boolean isResourceV3)
EdsUpdate doParse(Args args, Message unpackedMessage, boolean isResourceV3)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof ClusterLoadAssignment)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
Expand Down
39 changes: 16 additions & 23 deletions xds/src/main/java/io/grpc/xds/XdsListenerResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ String typeUrlV2() {
return ADS_TYPE_URL_LDS_V2;
}

@Nullable
@Override
XdsResourceType<?> dependentResource() {
return XdsRouteConfigureResource.getInstance();
boolean isFullStateOfTheWorld() {
return true;
}

@Override
LdsUpdate doParse(Args args, Message unpackedMessage, Set<String> retainedResources,
boolean isResourceV3)
LdsUpdate doParse(Args args, Message unpackedMessage, boolean isResourceV3)
throws ResourceInvalidException {
if (!(unpackedMessage instanceof Listener)) {
throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
Expand All @@ -114,15 +112,14 @@ LdsUpdate doParse(Args args, Message unpackedMessage, Set<String> retainedResour

if (listener.hasApiListener()) {
return processClientSideListener(
listener, retainedResources, args, enableFaultInjection && isResourceV3);
listener, args, enableFaultInjection && isResourceV3);
} else {
return processServerSideListener(
listener, retainedResources, args, enableRbac && isResourceV3);
listener, args, enableRbac && isResourceV3);
}
}

private LdsUpdate processClientSideListener(
Listener listener, Set<String> rdsResources, Args args, boolean parseHttpFilter)
private LdsUpdate processClientSideListener(Listener listener, Args args, boolean parseHttpFilter)
throws ResourceInvalidException {
// Unpack HttpConnectionManager from the Listener.
HttpConnectionManager hcm;
Expand All @@ -135,24 +132,22 @@ private LdsUpdate processClientSideListener(
"Could not parse HttpConnectionManager config from ApiListener", e);
}
return LdsUpdate.forApiListener(parseHttpConnectionManager(
hcm, rdsResources, args.filterRegistry, parseHttpFilter, true /* isForClient */));
hcm, args.filterRegistry, parseHttpFilter, true /* isForClient */));
}

private LdsUpdate processServerSideListener(
Listener proto, Set<String> rdsResources, Args args, boolean parseHttpFilter)
private LdsUpdate processServerSideListener(Listener proto, Args args, boolean parseHttpFilter)
throws ResourceInvalidException {
Set<String> certProviderInstances = null;
if (args.bootstrapInfo != null && args.bootstrapInfo.certProviders() != null) {
certProviderInstances = args.bootstrapInfo.certProviders().keySet();
}
return LdsUpdate.forTcpListener(parseServerSideListener(
proto, rdsResources, args.tlsContextManager, args.filterRegistry, certProviderInstances,
parseHttpFilter));
return LdsUpdate.forTcpListener(parseServerSideListener(proto, args.tlsContextManager,
args.filterRegistry, certProviderInstances, parseHttpFilter));
}

@VisibleForTesting
static EnvoyServerProtoData.Listener parseServerSideListener(
Listener proto, Set<String> rdsResources, TlsContextManager tlsContextManager,
Listener proto, TlsContextManager tlsContextManager,
FilterRegistry filterRegistry, Set<String> certProviderInstances, boolean parseHttpFilter)
throws ResourceInvalidException {
if (!proto.getTrafficDirection().equals(TrafficDirection.INBOUND)
Expand Down Expand Up @@ -190,13 +185,13 @@ static EnvoyServerProtoData.Listener parseServerSideListener(
Set<FilterChainMatch> uniqueSet = new HashSet<>();
for (io.envoyproxy.envoy.config.listener.v3.FilterChain fc : proto.getFilterChainsList()) {
filterChains.add(
parseFilterChain(fc, rdsResources, tlsContextManager, filterRegistry, uniqueSet,
parseFilterChain(fc, tlsContextManager, filterRegistry, uniqueSet,
certProviderInstances, parseHttpFilter));
}
FilterChain defaultFilterChain = null;
if (proto.hasDefaultFilterChain()) {
defaultFilterChain = parseFilterChain(
proto.getDefaultFilterChain(), rdsResources, tlsContextManager, filterRegistry,
proto.getDefaultFilterChain(), tlsContextManager, filterRegistry,
null, certProviderInstances, parseHttpFilter);
}

Expand All @@ -206,7 +201,7 @@ static EnvoyServerProtoData.Listener parseServerSideListener(

@VisibleForTesting
static FilterChain parseFilterChain(
io.envoyproxy.envoy.config.listener.v3.FilterChain proto, Set<String> rdsResources,
io.envoyproxy.envoy.config.listener.v3.FilterChain proto,
TlsContextManager tlsContextManager, FilterRegistry filterRegistry,
Set<FilterChainMatch> uniqueSet, Set<String> certProviderInstances, boolean parseHttpFilters)
throws ResourceInvalidException {
Expand Down Expand Up @@ -235,7 +230,7 @@ static FilterChain parseFilterChain(
+ filter.getName() + " failed to unpack message", e);
}
io.grpc.xds.HttpConnectionManager httpConnectionManager = parseHttpConnectionManager(
hcmProto, rdsResources, filterRegistry, parseHttpFilters, false /* isForClient */);
hcmProto, filterRegistry, parseHttpFilters, false /* isForClient */);

EnvoyServerProtoData.DownstreamTlsContext downstreamTlsContext = null;
if (proto.hasTransportSocket()) {
Expand Down Expand Up @@ -466,7 +461,7 @@ private static FilterChainMatch parseFilterChainMatch(

@VisibleForTesting
static io.grpc.xds.HttpConnectionManager parseHttpConnectionManager(
HttpConnectionManager proto, Set<String> rdsResources, FilterRegistry filterRegistry,
HttpConnectionManager proto, FilterRegistry filterRegistry,
boolean parseHttpFilter, boolean isForClient) throws ResourceInvalidException {
if (enableRbac && proto.getXffNumTrustedHops() != 0) {
throw new ResourceInvalidException(
Expand Down Expand Up @@ -541,8 +536,6 @@ static io.grpc.xds.HttpConnectionManager parseHttpConnectionManager(
throw new ResourceInvalidException(
"HttpConnectionManager contains invalid RDS: must specify ADS or self ConfigSource");
}
// Collect the RDS resource referenced by this HttpConnectionManager.
rdsResources.add(rds.getRouteConfigName());
return io.grpc.xds.HttpConnectionManager.forRdsName(
maxStreamDuration, rds.getRouteConfigName(), filterConfigs);
}
Expand Down
22 changes: 10 additions & 12 deletions xds/src/main/java/io/grpc/xds/XdsResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ abstract class XdsResourceType<T extends ResourceUpdate> {

abstract String typeUrlV2();

// Non-null for State of the World resources.
@Nullable
abstract XdsResourceType<?> dependentResource();
// Do not confuse with the SotW approach: it is the mechanism in which the client must specify all
// resource names it is interested in with each request. Different resource types may behave
// differently in this approach. For LDS and CDS resources, the server must return all resources
// that the client has subscribed to in each request. For RDS and EDS, the server may only return
// the resources that need an update.
abstract boolean isFullStateOfTheWorld();

static class Args {
final ServerInfo serverInfo;
Expand Down Expand Up @@ -125,7 +128,6 @@ ValidatedResourceUpdate<T> parse(Args args, List<Any> resources) {
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedResources = new HashSet<>();

for (int i = 0; i < resources.size(); i++) {
Any resource = resources.get(i);
Expand Down Expand Up @@ -156,7 +158,7 @@ ValidatedResourceUpdate<T> parse(Args args, List<Any> resources) {

T resourceUpdate;
try {
resourceUpdate = doParse(args, unpackedMessage, retainedResources, isResourceV3);
resourceUpdate = doParse(args, unpackedMessage, isResourceV3);
} catch (XdsClientImpl.ResourceInvalidException e) {
errors.add(String.format("%s response %s '%s' validation error: %s",
typeName(), unpackedClassName().getSimpleName(), cname, e.getMessage()));
Expand All @@ -168,12 +170,11 @@ ValidatedResourceUpdate<T> parse(Args args, List<Any> resources) {
parsedResources.put(cname, new ParsedResource<T>(resourceUpdate, resource));
}
return new ValidatedResourceUpdate<T>(parsedResources, unpackedResources, invalidResources,
errors, retainedResources);
errors);

}

abstract T doParse(Args args, Message unpackedMessage, Set<String> retainedResources,
boolean isResourceV3)
abstract T doParse(Args args, Message unpackedMessage, boolean isResourceV3)
throws ResourceInvalidException;

/**
Expand Down Expand Up @@ -231,19 +232,16 @@ static final class ValidatedResourceUpdate<T extends ResourceUpdate> {
Set<String> unpackedResources;
Set<String> invalidResources;
List<String> errors;
Set<String> retainedResources;

// validated resource update
public ValidatedResourceUpdate(Map<String, ParsedResource<T>> parsedResources,
Set<String> unpackedResources,
Set<String> invalidResources,
List<String> errors,
Set<String> retainedResources) {
List<String> errors) {
this.parsedResources = parsedResources;
this.unpackedResources = unpackedResources;
this.invalidResources = invalidResources;
this.errors = errors;
this.retainedResources = retainedResources;
}
}

Expand Down
Loading