Skip to content

A75 Aggregate cluster fixes #12186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 54 additions & 64 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CheckReturnValue;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
Expand All @@ -33,6 +35,7 @@
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsConfig.Subscription;
Expand All @@ -41,10 +44,10 @@
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;

/**
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
Expand All @@ -60,14 +63,9 @@
private Subscription clusterSubscription;
private LoadBalancer childLb;

CdsLoadBalancer2(Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry());
}

@VisibleForTesting
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.lbRegistry = lbRegistry;
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
Expand All @@ -91,7 +89,7 @@
if (clusterSubscription == null) {
// Should be impossible, because XdsDependencyManager wouldn't have generated this
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to find non-dynamic root cluster"));
errorPrefix() + "Unable to find non-dynamic cluster"));

Check warning on line 92 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L92

Added line #L92 was not covered by tests
}
// The dynamic cluster must not have loaded yet
return Status.OK;
Expand All @@ -100,42 +98,25 @@
return fail(clusterConfigOr.getStatus());
}
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
List<String> leafNames;
if (clusterConfig.getChildren() instanceof AggregateConfig) {
leafNames = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
} else if (clusterConfig.getChildren() instanceof EndpointConfig) {
leafNames = ImmutableList.of(clusterName);
} else {
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unexpected cluster children type: "
+ clusterConfig.getChildren().getClass()));
}
if (leafNames.isEmpty()) {
// Should be impossible, because XdsClusterResource validated this
return fail(Status.UNAVAILABLE.withDescription(
errorPrefix() + "Zero leaf clusters for root cluster " + clusterName));
}

Status noneFoundError = Status.INTERNAL
.withDescription(errorPrefix() + "No leaves and no error; this is a bug");
List<DiscoveryMechanism> instances = new ArrayList<>();
for (String leafName : leafNames) {
StatusOr<XdsClusterConfig> leafConfigOr = xdsConfig.getClusters().get(leafName);
if (!leafConfigOr.hasValue()) {
noneFoundError = leafConfigOr.getStatus();
continue;
}
if (!(leafConfigOr.getValue().getChildren() instanceof EndpointConfig)) {
noneFoundError = Status.INTERNAL.withDescription(
errorPrefix() + "Unexpected child " + leafName + " cluster children type: "
+ leafConfigOr.getValue().getChildren().getClass());
continue;
NameResolver.ConfigOrError configOrError;
Object childConfig;
if (clusterConfig.getChildren() instanceof EndpointConfig) {
// The LB policy config is provided in service_config.proto/JSON format.
configOrError =
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()),
lbRegistry);
if (configOrError.getError() != null) {
// Should be impossible, because XdsClusterResource validated this
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));

Check warning on line 113 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L112-L113

Added lines #L112 - L113 were not covered by tests
}
CdsUpdate result = leafConfigOr.getValue().getClusterResource();
CdsUpdate result = clusterConfig.getClusterResource();
DiscoveryMechanism instance;
if (result.clusterType() == ClusterType.EDS) {
instance = DiscoveryMechanism.forEds(
leafName,
clusterName,
result.edsServiceName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
Expand All @@ -144,38 +125,47 @@
result.outlierDetection());
} else {
instance = DiscoveryMechanism.forLogicalDns(
leafName,
clusterName,
result.dnsHostName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
result.upstreamTlsContext(),
result.filterMetadata());
}
instances.add(instance);
}
if (instances.isEmpty()) {
return fail(noneFoundError);
}

// The LB policy config is provided in service_config.proto/JSON format.
NameResolver.ConfigOrError configOrError =
GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
Arrays.asList(clusterConfig.getClusterResource().lbPolicyConfig()), lbRegistry);
if (configOrError.getError() != null) {
// Should be impossible, because XdsClusterResource validated this
childConfig = new ClusterResolverConfig(
instance,
configOrError.getConfig(),
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
} else if (clusterConfig.getChildren() instanceof AggregateConfig) {
LoadBalancerProvider priorityLbProvider = lbRegistry.getProvider(PRIORITY_POLICY_NAME);
if (childLb == null) {
childLb = priorityLbProvider.newLoadBalancer(helper);
}
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
for (String childCluster: requireNonNull(
clusterConfig.getClusterResource().prioritizedClusterNames())) {
priorityChildConfigs.put(childCluster,
new PriorityChildConfig(
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider(CDS_POLICY_NAME),
new CdsConfig(childCluster)),
false));
}
childConfig = new PriorityLoadBalancerProvider.PriorityLbConfig(
Collections.unmodifiableMap(priorityChildConfigs),
Collections.unmodifiableList(requireNonNull(
clusterConfig.getClusterResource().prioritizedClusterNames())));
} else {
return fail(Status.INTERNAL.withDescription(
errorPrefix() + "Unable to parse the LB config: " + configOrError.getError()));
errorPrefix() + "Unexpected cluster children type: "
+ clusterConfig.getChildren().getClass()));

Check warning on line 164 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L163-L164

Added lines #L163 - L164 were not covered by tests
}

ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances),
configOrError.getConfig(),
clusterConfig.getClusterResource().isHttp11ProxyAvailable());
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
return childLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build());
}

@Override
Expand Down
18 changes: 17 additions & 1 deletion xds/src/main/java/io/grpc/xds/CdsLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
Expand Down Expand Up @@ -51,9 +52,24 @@ public String getPolicyName() {
return XdsLbPolicies.CDS_POLICY_NAME;
}

private final LoadBalancerRegistry loadBalancerRegistry;

public CdsLoadBalancerProvider() {
this.loadBalancerRegistry = null;
}

public CdsLoadBalancerProvider(LoadBalancerRegistry loadBalancerRegistry) {
this.loadBalancerRegistry = loadBalancerRegistry;
}

@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new CdsLoadBalancer2(helper);
LoadBalancerRegistry loadBalancerRegistry = this.loadBalancerRegistry;
if (loadBalancerRegistry == null) {
loadBalancerRegistry = LoadBalancerRegistry.getDefaultRegistry();
}

return new CdsLoadBalancer2(helper, loadBalancerRegistry);
}

@Override
Expand Down
74 changes: 31 additions & 43 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@
*/
private final class ClusterResolverLbState extends LoadBalancer {
private final Helper helper;
private final List<String> clusters = new ArrayList<>();
private final Map<String, ClusterState> clusterStates = new HashMap<>();
private String cluster;
private Object endpointLbConfig;
private ResolvedAddresses resolvedAddresses;
private LoadBalancer childLb;
Expand All @@ -185,21 +185,20 @@
ClusterResolverConfig config =
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
endpointLbConfig = config.lbConfig;
for (DiscoveryMechanism instance : config.discoveryMechanisms) {
clusters.add(instance.cluster);
ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata, instance.outlierDetection);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata);
}
clusterStates.put(instance.cluster, state);
state.start();
}
DiscoveryMechanism instance = config.discoveryMechanism;
cluster = instance.cluster;
ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata, instance.outlierDetection);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata);
}
clusterStates.put(instance.cluster, state);
state.start();
return Status.OK;
}

Expand Down Expand Up @@ -229,24 +228,22 @@
List<String> priorities = new ArrayList<>(); // totally ordered priority list

Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
ClusterState state = clusterStates.get(cluster);
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
if (!state.resolved && state.status.isOk()) {
return;
}
if (state.result != null) {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
} else {
endpointNotFound = state.status;
}
ClusterState state = clusterStates.get(cluster);
// Propagate endpoints to the child LB policy only after all clusters have been resolved.
if (!state.resolved && state.status.isOk()) {
return;

Check warning on line 234 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L234

Added line #L234 was not covered by tests
}
if (state.result != null) {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
} else {
endpointNotFound = state.status;
}
if (addresses.isEmpty()) {
if (endpointNotFound.isOk()) {
endpointNotFound = Status.UNAVAILABLE.withDescription(
"No usable endpoint from cluster(s): " + clusters);
"No usable endpoint from cluster(s): " + cluster);
} else {
endpointNotFound =
Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
Expand Down Expand Up @@ -274,22 +271,13 @@
}

private void handleEndpointResolutionError() {
boolean allInError = true;
Status error = null;
for (String cluster : clusters) {
ClusterState state = clusterStates.get(cluster);
if (state.status.isOk()) {
allInError = false;
} else {
error = state.status;
}
}
if (allInError) {
ClusterState state = clusterStates.get(cluster);
if (!state.status.isOk()) {
if (childLb != null) {
childLb.handleNameResolutionError(error);
childLb.handleNameResolutionError(state.status);
} else {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(state.status)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -70,15 +69,15 @@
}

static final class ClusterResolverConfig {
// Ordered list of clusters to be resolved.
final List<DiscoveryMechanism> discoveryMechanisms;
// Cluster to be resolved.
final DiscoveryMechanism discoveryMechanism;
// GracefulSwitch configuration
final Object lbConfig;
private final boolean isHttp11ProxyAvailable;

ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig,
ClusterResolverConfig(DiscoveryMechanism discoveryMechanism, Object lbConfig,
boolean isHttp11ProxyAvailable) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.discoveryMechanism = checkNotNull(discoveryMechanism, "discoveryMechanism");
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
this.isHttp11ProxyAvailable = isHttp11ProxyAvailable;
}
Expand All @@ -89,7 +88,7 @@

@Override
public int hashCode() {
return Objects.hash(discoveryMechanisms, lbConfig);
return Objects.hash(discoveryMechanism, lbConfig);

Check warning on line 91 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java#L91

Added line #L91 was not covered by tests
}

@Override
Expand All @@ -101,14 +100,14 @@
return false;
}
ClusterResolverConfig that = (ClusterResolverConfig) o;
return discoveryMechanisms.equals(that.discoveryMechanisms)
return discoveryMechanism.equals(that.discoveryMechanism)
&& lbConfig.equals(that.lbConfig);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("discoveryMechanisms", discoveryMechanisms)
.add("discoveryMechanism", discoveryMechanism)

Check warning on line 110 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java#L110

Added line #L110 was not covered by tests
.add("lbConfig", lbConfig)
.toString();
}
Expand Down
Loading