Skip to content

xds: Support localities in multiple priorities #9683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ private void handleEndpointResourceUpdate() {
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> priorities = new ArrayList<>(); // totally ordered priority list
Map<Locality, Integer> localityWeights = new HashMap<>();

Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
Expand All @@ -229,7 +228,6 @@ private void handleEndpointResourceUpdate() {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
localityWeights.putAll(state.result.localityWeights);
} else {
endpointNotFound = state.status;
}
Expand Down Expand Up @@ -260,9 +258,6 @@ private void handleEndpointResourceUpdate() {
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(childConfig)
.setAddresses(Collections.unmodifiableList(addresses))
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS,
Collections.unmodifiableMap(localityWeights)).build())
.build());
}

Expand Down Expand Up @@ -396,7 +391,6 @@ public void run() {
}
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
update.localityLbEndpointsMap;
Map<Locality, Integer> localityWeights = new HashMap<>();
List<DropOverload> dropOverloads = update.dropPolicies;
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
Expand All @@ -415,6 +409,8 @@ public void run() {
Attributes attr =
endpoint.eag().getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
localityLbInfo.localityWeight())
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
.build();
EquivalentAddressGroup eag = new EquivalentAddressGroup(
Expand All @@ -429,7 +425,6 @@ public void run() {
"Discard locality {0} with 0 healthy endpoints", locality);
continue;
}
localityWeights.put(locality, localityLbInfo.localityWeight());
if (!prioritizedLocalityWeights.containsKey(priorityName)) {
prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
}
Expand All @@ -450,7 +445,7 @@ public void run() {
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
sortedPriorityNames, localityWeights);
sortedPriorityNames);
handleEndpointResourceUpdate();
}
}
Expand Down Expand Up @@ -690,23 +685,18 @@ private static class ClusterResolutionResult {
private final Map<String, PriorityChildConfig> priorityChildConfigs;
// List of priority names ordered in descending priorities.
private final List<String> priorities;
// Most recent view on how localities in the cluster should be wighted. Only set for EDS
// clusters that support the concept.
private final Map<Locality, Integer> localityWeights;

ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
PriorityChildConfig config) {
this(addresses, Collections.singletonMap(priority, config),
Collections.singletonList(priority), Collections.emptyMap());
Collections.singletonList(priority));
}

ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
Map<String, PriorityChildConfig> configs, List<String> priorities,
Map<Locality, Integer> localityWeights) {
Map<String, PriorityChildConfig> configs, List<String> priorities) {
this.addresses = addresses;
this.priorityChildConfigs = configs;
this.priorities = priorities;
this.localityWeights = localityWeights;
}
}

Expand Down
5 changes: 2 additions & 3 deletions xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.util.Map;

/**
* Internal attributes used for xDS implementation. Do not use.
Expand Down Expand Up @@ -58,8 +57,8 @@ public final class InternalXdsAttributes {
* Map from localities to their weights.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<Map<Locality, Integer>> ATTR_LOCALITY_WEIGHTS =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeights");
static final Attributes.Key<Integer> ATTR_LOCALITY_WEIGHT =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeight");

/**
* Name of the cluster that provides this EquivalentAddressGroup.
Expand Down
46 changes: 29 additions & 17 deletions xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;

import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
Expand Down Expand Up @@ -68,15 +70,33 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// to produce the weighted target LB config.
WrrLocalityConfig wrrLocalityConfig
= (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
Map<Locality, Integer> localityWeights = resolvedAddresses.getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);

// Not having locality weights is a misconfiguration, and we have to return with an error.
if (localityWeights == null) {
Status unavailable =
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weights provided");
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return false;

// A map of locality weights is built up from the locality weight attributes in each address.
Map<Locality, Integer> localityWeights = new HashMap<>();
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
Attributes eagAttrs = eag.getAttributes();
Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY);
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);

if (locality == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided")));
return false;
}
if (localityWeight == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription(
"wrr_locality error: no weight provided for locality " + locality)));
return false;
}

if (!localityWeights.containsKey(locality)) {
localityWeights.put(locality, localityWeight);
} else if (!localityWeights.get(locality).equals(localityWeight)) {
logger.log(XdsLogLevel.WARNING,
"Locality {0} has both weights {1} and {2}, using weight {1}", locality,
localityWeights.get(locality), localityWeight);
}
}

// Weighted target LB expects a WeightedPolicySelection for each locality as it will create a
Expand All @@ -88,14 +108,6 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
wrrLocalityConfig.childPolicy));
}

// Remove the locality weights attribute now that we have consumed it. This is done simply for
// ease of debugging for the unsupported (and unlikely) scenario where WrrLocalityConfig has
// another wrr_locality as the child policy. The missing locality weight attribute would make
// the child wrr_locality fail early.
resolvedAddresses = resolvedAddresses.toBuilder()
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS).build()).build();

switchLb.switchTo(lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME));
switchLb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
Expand Down
35 changes: 21 additions & 14 deletions xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() {
"least_request_experimental");

assertThat(
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
locality1, 100);
childBalancer.addresses.get(0).getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
}

@Test
Expand Down Expand Up @@ -410,8 +410,8 @@ public void edsClustersWithOutlierDetection() {
"least_request_experimental");

assertThat(
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
locality1, 100);
childBalancer.addresses.get(0).getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
}


Expand Down Expand Up @@ -507,11 +507,20 @@ public void onlyEdsClusters_receivedEndpoints() {
assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo(
"round_robin");

Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
assertThat(localityWeights).containsEntry(locality1, 70);
assertThat(localityWeights).containsEntry(locality2, 10);
assertThat(localityWeights).containsEntry(locality3, 20);
for (EquivalentAddressGroup eag : childBalancer.addresses) {
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality1) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(70);
}
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality2) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(10);
}
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality3) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(20);
}
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -687,9 +696,9 @@ public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
assertThat(localityWeights.keySet()).containsExactly(locality2);
for (EquivalentAddressGroup eag : childBalancer.addresses) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY)).isEqualTo(locality2);
}
}

@Test
Expand Down Expand Up @@ -1315,7 +1324,6 @@ private final class FakeLoadBalancer extends LoadBalancer {
private final Helper helper;
private List<EquivalentAddressGroup> addresses;
private Object config;
private Attributes attributes;
private Status upstreamError;
private boolean shutdown;

Expand All @@ -1328,7 +1336,6 @@ private final class FakeLoadBalancer extends LoadBalancer {
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
addresses = resolvedAddresses.getAddresses();
config = resolvedAddresses.getLoadBalancingPolicyConfig();
attributes = resolvedAddresses.getAttributes();
return true;
}

Expand Down
Loading