Skip to content

Commit

Permalink
loadbalancer: selectors consider health first and have configurable f…
Browse files Browse the repository at this point in the history
…ail-open behavior (#2787)

Motivation:

The health status of a connection is a course grained
indicator of whether a host is likely to be able to serve
traffic and should be the first consideration to selectors
when picking hosts.
A second issue is that it's not obvious what the desired
behavior is when a healthy host cannot be found: it's
going to be user specific whether to fail closed or just
give it a try and see what happens.

Modifications:

- Switch the RR and P2C selectors to consider health
  first  when picking hosts.
- Add fail open behavior to both round robin and P2C:
  if a healthy host cannot be found we will try the first
  active candidate evaluated.

Results:

- Health is now considered first. This doesn't change
  much right now but will make L7 health status much
  more useful.
- Fail open is supported, although off by default.
  • Loading branch information
bryce-anderson authored Jan 3, 2024
1 parent e5784f8 commit 6b2b65e
Show file tree
Hide file tree
Showing 17 changed files with 513 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static java.util.Objects.requireNonNull;

abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnection>
Expand Down Expand Up @@ -51,10 +52,10 @@ public final int hostSetSize() {
}

@Override
public final boolean isUnHealthy() {
public final boolean isHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
// so that we can compose a group of selectors into a priority set.
return allUnhealthy(hosts);
return anyHealthy(hosts);
}

protected final String getTargetResource() {
Expand All @@ -67,21 +68,36 @@ protected final Single<C> noActiveHostsFailure(List<Host<ResolvedAddress, C>> us
this.getClass(), "selectConnection(...)"));
}

// This method assumes the host is considered healthy.
protected final @Nullable Single<C> selectFromHost(Host<ResolvedAddress, C> host, Predicate<C> selector,
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
// First see if we can get an existing connection regardless of health status.
if (!forceNewConnectionAndReserve) {
C c = host.pickConnection(selector, contextMap);
if (c != null) {
return succeeded(c);
}
}
// We can only create a new connection if the host is active. It's possible for it to think that
// it's healthy based on having connections but not being active but we weren't able to pick an
// existing connection.
return host.canMakeNewConnections() ?
host.newConnection(selector, forceNewConnectionAndReserve, contextMap) : null;
}

private Single<C> noHostsFailure() {
return failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection(...)"));
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean anyHealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!host.isUnhealthy()) {
allUnhealthy = false;
break;
if (host.isHealthy()) {
return true;
}
}
return allUnhealthy;
return usedHosts.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ final class DefaultHost<Addr, C extends LoadBalancedConnection> implements Host<
*/
private static final float RANDOM_SEARCH_FACTOR = 0.75f;

private static final Object[] EMPTY_ARRAY = new Object[0];
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHost.class);

private enum State {
Expand Down Expand Up @@ -312,13 +311,15 @@ private void markUnhealthy(final Throwable cause) {
}

@Override
public boolean isActiveAndHealthy() {
return connState.isActive();
public boolean isHealthy() {
final State state = connState.state;
return state != State.UNHEALTHY && state != State.CLOSED;
}

@Override
public boolean isUnhealthy() {
return connState.isUnhealthy();
public boolean canMakeNewConnections() {
final State state = connState.state;
return state != State.EXPIRED && state != State.CLOSED;
}

private boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) {
Expand Down Expand Up @@ -380,8 +381,8 @@ private boolean addConnection(final C connection, final @Nullable HealthCheck cu
// remove the connection (previously considered as the last one) from the array
// in the next iteration.
&& connStateUpdater.compareAndSet(this, currentConnState, nextState.toClosed())) {
this.closeAsync().subscribe();
hostObserver.onExpiredHostRemoved(address);
closeAsync().subscribe();
hostObserver.onExpiredHostRemoved(address, nextState.connections.size());
break;
}
} else {
Expand Down Expand Up @@ -435,6 +436,8 @@ private Completable doClose(final boolean graceful) {
lbDescription, oldState.connections.size(), graceful ? "" : "un", address);
if (oldState.state == State.ACTIVE) {
hostObserver.onActiveHostRemoved(address, oldState.connections.size());
} else if (oldState.state == State.EXPIRED) {
hostObserver.onExpiredHostRemoved(address, oldState.connections.size());
}
final List<C> connections = oldState.connections;
return (connections.isEmpty() ? completed() :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
currentHosts, host);
// we only need to do anything else if we actually removed the host
if (nextHosts.size() != currentHosts.size()) {
loadBalancerObserver.hostObserver().onExpiredHostRemoved(host.address());
sequentialUpdateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
Expand Down Expand Up @@ -472,7 +471,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
Single<C> result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
return result.beforeOnError(exn -> {
if (exn instanceof NoActiveHostException) {
if (currentHostSelector.isUnHealthy()) {
if (!currentHostSelector.isHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
Expand Down Expand Up @@ -561,7 +560,7 @@ public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<Host<ResolvedAddre
}

@Override
public boolean isUnHealthy() {
public boolean isHealthy() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends Listen
ResolvedAddress address();

/**
* Whether the host is both considered active by service discovery and healthy by the failure
* detection mechanisms.
* @return whether the host is both active and healthy
* Determine the health status of this host.
* @return whether the host considers itself healthy enough to serve traffic. This is best effort and does not
* guarantee that the request will succeed.
*/
boolean isActiveAndHealthy();
boolean isHealthy();

/**
* Whether the host is considered unhealthy bo the failure detection mechanisms.
* @return whether the host is considered unhealthy.
* Determine whether the host is in a state where it can make new connections.
* @return whether the host is in a state where it can make new connections.
*/
boolean isUnhealthy();
boolean canMakeNewConnections();

/**
* Signal to the host that it has been re-discovered by the service-discovery mechanism and is expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
HostSelector<ResolvedAddress, C> rebuildWithHosts(List<Host<ResolvedAddress, C>> hosts);

/**
* Whether the load balancer believes itself to unhealthy for serving traffic.
* Whether the load balancer believes itself to be healthy enough for serving traffic.
* <p>
* Note that this is both racy and best effort: just because a {@link HostSelector} is
* unhealthy doesn't guarantee that a request will fail nor does a healthy status indicate
* that this selector is guaranteed to successfully serve a request.
* @return whether the load balancer believes itself unhealthy enough and unlikely to successfully serve traffic.
* healthy doesn't guarantee that a request will succeed nor does an unhealthy status
* indicate that this selector is guaranteed to fail a request.
* @return whether the load balancer believes itself healthy enough and likely to successfully serve traffic.
*/
boolean isUnHealthy();
boolean isHealthy();

/**
* The size of the host candidate pool for this host selector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ interface HostObserver<ResolvedAddress> {
/**
* Callback for when a host is removed by service discovery.
* @param address the resolved address.
* @param connectionCount the number of connections that were associated with the host.
* @param connectionCount the number of open connections when the host was removed.
*/
void onActiveHostRemoved(ResolvedAddress address, int connectionCount);

Expand All @@ -79,8 +79,9 @@ interface HostObserver<ResolvedAddress> {
/**
* Callback for when an expired host is removed.
* @param address the resolved address.
* @param connectionCount the number of open connections when the host was removed.
*/
void onExpiredHostRemoved(ResolvedAddress address);
void onExpiredHostRemoved(ResolvedAddress address, int connectionCount);

/**
* Callback for when a host is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void onHostMarkedExpired(ResolvedAddress resolvedAddress, int connectionC
}

@Override
public void onExpiredHostRemoved(ResolvedAddress resolvedAddress) {
public void onExpiredHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ final class P2CLoadBalancingPolicy<ResolvedAddress, C extends LoadBalancedConnec
implements LoadBalancingPolicy<ResolvedAddress, C> {

private final int maxEffort;
private final boolean failOpen;
@Nullable
private final Random random;

private P2CLoadBalancingPolicy(final int maxEffort, @Nullable final Random random) {
private P2CLoadBalancingPolicy(final int maxEffort, final boolean failOpen, @Nullable final Random random) {
this.maxEffort = maxEffort;
this.failOpen = failOpen;
this.random = random;
}

@Override
public HostSelector<ResolvedAddress, C> buildSelector(List<Host<ResolvedAddress, C>> hosts, String targetResource) {
return new P2CSelector<>(hosts, targetResource, maxEffort, random);
return new P2CSelector<>(hosts, targetResource, maxEffort, failOpen, random);
}

@Override
Expand All @@ -70,6 +72,7 @@ public static final class Builder {
private static final int DEFAULT_MAX_EFFORT = 5;

private int maxEffort = DEFAULT_MAX_EFFORT;
private boolean failOpen;
@Nullable
private Random random;

Expand All @@ -87,6 +90,16 @@ public Builder maxEffort(final int maxEffort) {
return this;
}

/**
* Set whether the host selector should attempt to use an unhealthy {@link Host} as a last resort.
* @param failOpen whether the host selector should attempt to use an unhealthy {@link Host} as a last resort.
* @return this {@link Builder}.
*/
public Builder failOpen(final boolean failOpen) {
this.failOpen = failOpen;
return this;
}

// For testing purposes only.
Builder random(Random random) {
this.random = random;
Expand All @@ -100,7 +113,7 @@ Builder random(Random random) {
* @return the concrete {@link P2CLoadBalancingPolicy}.
*/
public <ResolvedAddress, C extends LoadBalancedConnection> P2CLoadBalancingPolicy<ResolvedAddress, C> build() {
return new P2CLoadBalancingPolicy<>(maxEffort, random);
return new P2CLoadBalancingPolicy<>(maxEffort, failOpen, random);
}
}
}
Loading

0 comments on commit 6b2b65e

Please sign in to comment.