Skip to content

util: Use acceptResolvedAddresses() for MultiChildLb children #11894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 22 additions & 52 deletions util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,22 @@
*/
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
try {
resolvingAddresses = true;

// process resolvedAddresses to update children
AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
Map<Object, ResolvedAddresses> newChildAddresses = createChildAddressesMap(resolvedAddresses);

// Handle error case
if (newChildAddresses.isEmpty()) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. " + resolvedAddresses);
handleNameResolutionError(unavailableStatus);
return unavailableStatus;
}

// Update the picker and our connectivity state
updateOverallBalancingState();

// shutdown removed children
shutdownRemoved(acceptRetVal.removedChildren);
return acceptRetVal.status;
return updateChildrenWithResolvedAddresses(newChildAddresses);
} finally {
resolvingAddresses = false;
}
Expand Down Expand Up @@ -149,31 +150,7 @@
childLbStates.clear();
}

/**
* This does the work to update the child map and calculate which children have been removed.
* You must call {@link #updateOverallBalancingState} to update the picker
* and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
*/
protected final AcceptResolvedAddrRetVal acceptResolvedAddressesInternal(
ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);

Map<Object, ResolvedAddresses> newChildAddresses = createChildAddressesMap(resolvedAddresses);

// Handle error case
if (newChildAddresses.isEmpty()) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. " + resolvedAddresses);
handleNameResolutionError(unavailableStatus);
return new AcceptResolvedAddrRetVal(unavailableStatus, null);
}

List<ChildLbState> removed = updateChildrenWithResolvedAddresses(newChildAddresses);
return new AcceptResolvedAddrRetVal(Status.OK, removed);
}

/** Returns removed children. */
private List<ChildLbState> updateChildrenWithResolvedAddresses(
private Status updateChildrenWithResolvedAddresses(
Map<Object, ResolvedAddresses> newChildAddresses) {
// Create a map with the old values
Map<Object, ChildLbState> oldStatesMap =
Expand All @@ -183,6 +160,7 @@
}

// Move ChildLbStates from the map to a new list (preserving the new map's order)
Status status = Status.OK;
List<ChildLbState> newChildLbStates = new ArrayList<>(newChildAddresses.size());
for (Map.Entry<Object, ResolvedAddresses> entry : newChildAddresses.entrySet()) {
ChildLbState childLbState = oldStatesMap.remove(entry.getKey());
Expand All @@ -191,21 +169,23 @@
}
newChildLbStates.add(childLbState);
if (entry.getValue() != null) {
childLbState.lb.handleResolvedAddresses(entry.getValue()); // update child LB
// update child LB
Status newStatus = childLbState.lb.acceptResolvedAddresses(entry.getValue());
if (!newStatus.isOk()) {
status = newStatus;

Check warning on line 175 in util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java#L175

Added line #L175 was not covered by tests
}
}
}

childLbStates = newChildLbStates;
// Remaining entries in map are orphaned
return new ArrayList<>(oldStatesMap.values());
}
// Update the picker and our connectivity state
updateOverallBalancingState();

protected final void shutdownRemoved(List<ChildLbState> removedChildren) {
// Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
// subchannel that has been shutdown.
for (ChildLbState childLbState : removedChildren) {
// Remaining entries in map are orphaned
for (ChildLbState childLbState : oldStatesMap.values()) {
childLbState.shutdown();
}
return status;
}

@Nullable
Expand Down Expand Up @@ -406,14 +386,4 @@
return addrs.toString();
}
}

protected static class AcceptResolvedAddrRetVal {
public final Status status;
public final List<ChildLbState> removedChildren;

public AcceptResolvedAddrRetVal(Status status, List<ChildLbState> removedChildren) {
this.status = status;
this.removedChildren = removedChildren;
}
}
}
90 changes: 37 additions & 53 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,62 +87,46 @@
return addressValidityStatus;
}

try {
resolvingAddresses = true;
AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
}

// Now do the ringhash specific logic with weights and building the ring
RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config == null) {
throw new IllegalArgumentException("Missing RingHash configuration");
// Now do the ringhash specific logic with weights and building the ring
RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config == null) {
throw new IllegalArgumentException("Missing RingHash configuration");

Check warning on line 93 in xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java#L93

Added line #L93 was not covered by tests
}
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
long totalWeight = 0L;
for (EquivalentAddressGroup eag : addrList) {
Long weight = eag.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT);
// Support two ways of server weighing: either multiple instances of the same address
// or each address contains a per-address weight attribute. If a weight is not provided,
// each occurrence of the address will be counted a weight value of one.
if (weight == null) {
weight = 1L;

Check warning on line 103 in xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java#L103

Added line #L103 was not covered by tests
}
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
long totalWeight = 0L;
for (EquivalentAddressGroup eag : addrList) {
Long weight = eag.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT);
// Support two ways of server weighing: either multiple instances of the same address
// or each address contains a per-address weight attribute. If a weight is not provided,
// each occurrence of the address will be counted a weight value of one.
if (weight == null) {
weight = 1L;
}
totalWeight += weight;
EquivalentAddressGroup addrKey = stripAttrs(eag);
if (serverWeights.containsKey(addrKey)) {
serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
} else {
serverWeights.put(addrKey, weight);
}
totalWeight += weight;
EquivalentAddressGroup addrKey = stripAttrs(eag);
if (serverWeights.containsKey(addrKey)) {
serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);

Check warning on line 108 in xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java#L108

Added line #L108 was not covered by tests
} else {
serverWeights.put(addrKey, weight);
}
// Calculate scale
long minWeight = Collections.min(serverWeights.values());
double normalizedMinWeight = (double) minWeight / totalWeight;
// Scale up the number of hashes per host such that the least-weighted host gets a whole
// number of hashes on the the ring. Other hosts might not end up with whole numbers, and
// that's fine (the ring-building algorithm can handle this). This preserves the original
// implementation's behavior: when weights aren't provided, all hosts should get an equal
// number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
// back down to fit.
double scale = Math.min(
Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
(double) config.maxRingSize);

// Build the ring
ring = buildRing(serverWeights, totalWeight, scale);

// Must update channel picker before return so that new RPCs will not be routed to deleted
// clusters and resolver can remove them in service config.
updateOverallBalancingState();

shutdownRemoved(acceptRetVal.removedChildren);
} finally {
this.resolvingAddresses = false;
}

return Status.OK;
// Calculate scale
long minWeight = Collections.min(serverWeights.values());
double normalizedMinWeight = (double) minWeight / totalWeight;
// Scale up the number of hashes per host such that the least-weighted host gets a whole
// number of hashes on the the ring. Other hosts might not end up with whole numbers, and
// that's fine (the ring-building algorithm can handle this). This preserves the original
// implementation's behavior: when weights aren't provided, all hosts should get an equal
// number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
// back down to fit.
double scale = Math.min(
Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
(double) config.maxRingSize);

// Build the ring
ring = buildRing(serverWeights, totalWeight, scale);

return super.acceptResolvedAddresses(resolvedAddresses);
}


Expand Down
28 changes: 7 additions & 21 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,31 +170,17 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
}
config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
AcceptResolvedAddrRetVal acceptRetVal;
try {
resolvingAddresses = true;
acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
}

if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
weightUpdateTimer.cancel();
}
updateWeightTask.run();

createAndApplyOrcaListeners();
if (weightUpdateTimer != null && weightUpdateTimer.isPending()) {
weightUpdateTimer.cancel();
}
updateWeightTask.run();

// Must update channel picker before return so that new RPCs will not be routed to deleted
// clusters and resolver can remove them in service config.
updateOverallBalancingState();
Status status = super.acceptResolvedAddresses(resolvedAddresses);

shutdownRemoved(acceptRetVal.removedChildren);
} finally {
resolvingAddresses = false;
}
createAndApplyOrcaListeners();

return acceptRetVal.status;
return status;
}

/**
Expand Down