Skip to content

Commit

Permalink
loadbalancer-experimental: thread the LoadBalancingPolicy into the De…
Browse files Browse the repository at this point in the history
…faultLoadBalancer (#2970)

Motivation:

We use `lbDescription` all over the place for observing the
load balancer, but we use the `targetResource` for the HostSelector.

Modifications:

- Change the HostSelector types to use `lbDescription`.
- Thread the LoadBalancingPolicy into the DefaultLoadBalancer so we
  can feed it the `lbDescription`.
  • Loading branch information
bryce-anderson authored Jun 14, 2024
1 parent 411ddc6 commit febb582
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnectio

private static final double ACCEPTABLE_PERCENT_ERROR = 0.01;

private final String targetResource;
private final String lbDescription;
private final List<? extends Host<ResolvedAddress, C>> hosts;
BaseHostSelector(final List<? extends Host<ResolvedAddress, C>> hosts, final String targetResource) {
BaseHostSelector(final List<? extends Host<ResolvedAddress, C>> hosts, final String lbDescription) {
this.hosts = hosts;
this.targetResource = requireNonNull(targetResource, "targetResource");
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
}

protected abstract Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMap context,
Expand Down Expand Up @@ -64,14 +64,14 @@ protected final List<? extends Host<ResolvedAddress, C>> hosts() {
return hosts;
}

protected final String getTargetResource() {
return targetResource;
protected final String lbDescription() {
return lbDescription;
}

protected final Single<C> noActiveHostsFailure(List<? extends Host<ResolvedAddress, C>> usedHosts) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
getTargetResource() + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
return failed(Exceptions.StacklessNoActiveHostException.newInstance(
lbDescription() + ": Failed to pick an active host. Either all are busy, expired, or unhealthy: " +
usedHosts, this.getClass(), "selectConnection(...)"));
}

// This method assumes the host is considered healthy.
Expand All @@ -93,7 +93,7 @@ protected final Single<C> noActiveHostsFailure(List<? extends Host<ResolvedAddre

private Single<C> noHostsFailure() {
return failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
lbDescription() + ": No hosts are available to connect.",
this.getClass(), "selectConnection(...)"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,7 +97,6 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
// reads and writes are protected by `sequentialExecutor`.
private boolean isClosed;

private final String targetResource;
private final SequentialExecutor sequentialExecutor;
private final String lbDescription;
private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
Expand All @@ -116,11 +116,11 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
* Creates a new instance.
*
* @param id a (unique) ID to identify the created {@link DefaultLoadBalancer}.
* @param targetResourceName {@link String} representation of the target resource for which this instance
* @param targetResource {@link String} representation of the target resource for which this instance
* is performing load balancing.
* @param eventPublisher provides a stream of addresses to connect to.
* @param priorityStrategyFactory a build of the {@link HostPriorityStrategy} to use with the load balancer.
* @param hostSelector initial host selector to use with this load balancer.
* @param loadBalancingPolicy a factory of the initial host selector to use with this load balancer.
* @param connectionPoolStrategyFactory factory of the connection pool strategy to use with this load balancer.
* @param connectionFactory a function which creates new connections.
* @param loadBalancerObserverFactory factory used to build a {@link LoadBalancerObserver} to use with this
Expand All @@ -133,18 +133,19 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
*/
DefaultLoadBalancer(
final String id,
final String targetResourceName,
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final Function<String, HostPriorityStrategy> priorityStrategyFactory,
final HostSelector<ResolvedAddress, C> hostSelector,
final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final ConnectionPoolStrategy.ConnectionPoolStrategyFactory<C> connectionPoolStrategyFactory,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final LoadBalancerObserverFactory loadBalancerObserverFactory,
@Nullable final HealthCheckConfig healthCheckConfig,
final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
this.lbDescription = makeDescription(
requireNonNull(id, "id"), requireNonNull(targetResource, "targetResource"));
this.hostSelector = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy")
.buildSelector(Collections.emptyList(), lbDescription);
this.priorityStrategy = requireNonNull(
priorityStrategyFactory, "priorityStrategyFactory").apply(lbDescription);
this.connectionPoolStrategy = requireNonNull(connectionPoolStrategyFactory,
Expand Down Expand Up @@ -603,7 +604,7 @@ private final class ClosedHostSelector implements HostSelector<ResolvedAddress,
@Override
public Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
return failed(new IllegalStateException(lbDescription + ": LoadBalancer has closed"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.servicetalk.transport.api.ExecutionStrategy;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -161,8 +160,7 @@ public LoadBalancer<C> newLoadBalancer(
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, C> connectionFactory, String targetResource) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
DefaultHostPriorityStrategy::new,
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource),
DefaultHostPriorityStrategy::new, loadBalancingPolicy,
connectionPoolStrategyFactory, connectionFactory,
loadBalancerObserverFactory, healthCheckConfig, outlierDetectorFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public abstract class LoadBalancingPolicy<ResolvedAddress, C extends LoadBalance
/**
* Construct a {@link HostSelector}.
* @param hosts the set of {@link Host}s to select from.
* @param targetResource the name of the target resource, useful for debugging purposes.
* @param lbDescription a description of the associated {@link io.servicetalk.client.api.LoadBalancer},
* useful for debugging purposes.
* @return a {@link HostSelector}
*/
abstract HostSelector<ResolvedAddress, C> buildSelector(
List<Host<ResolvedAddress, C>> hosts, String targetResource);
List<Host<ResolvedAddress, C>> hosts, String lbDescription);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ final class P2CLoadBalancingPolicy<ResolvedAddress, C extends LoadBalancedConnec

@Override
HostSelector<ResolvedAddress, C> buildSelector(
List<Host<ResolvedAddress, C>> hosts, String targetResource) {
return new P2CSelector<>(hosts, targetResource, ignoreWeights, maxEffort, failOpen, random);
List<Host<ResolvedAddress, C>> hosts, String lbDescription) {
return new P2CSelector<>(hosts, lbDescription, ignoreWeights, maxEffort, failOpen, random);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ final class P2CSelector<ResolvedAddress, C extends LoadBalancedConnection>
private final int maxEffort;
private final boolean failOpen;

P2CSelector(List<? extends Host<ResolvedAddress, C>> hosts, final String targetResource,
P2CSelector(List<? extends Host<ResolvedAddress, C>> hosts, final String lbDescription,
final boolean ignoreWeights, final int maxEffort, final boolean failOpen,
@Nullable final Random random) {
super(hosts, targetResource);
super(hosts, lbDescription);
this.ignoreWeights = ignoreWeights;
this.entrySelector = ignoreWeights ? new EqualWeightEntrySelector(hosts.size()) : buildAliasTable(hosts);
this.maxEffort = maxEffort;
Expand All @@ -62,7 +62,7 @@ final class P2CSelector<ResolvedAddress, C extends LoadBalancedConnection>

@Override
public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<? extends Host<ResolvedAddress, C>> hosts) {
return new P2CSelector<>(hosts, getTargetResource(), ignoreWeights, maxEffort, failOpen, random);
return new P2CSelector<>(hosts, lbDescription(), ignoreWeights, maxEffort, failOpen, random);
}

@Override
Expand All @@ -72,8 +72,7 @@ protected Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMa
switch (size) {
case 0:
// We shouldn't get called if the load balancer doesn't have any hosts.
throw new AssertionError("Selector for " + getTargetResource() +
" received an empty host set");
throw new AssertionError(lbDescription() + ": Selector received an empty host set");
case 1:
// There is only a single host, so we don't need to do any of the looping or comparison logic.
Host<ResolvedAddress, C> host = hosts().get(0);
Expand Down Expand Up @@ -175,7 +174,7 @@ private EntrySelector buildAliasTable(List<? extends Host<?, ?>> hosts) {
final double pi = hosts.get(i).weight();
if (pi < 0) {
LOGGER.warn("{}: host at address {} has negative weight ({}). Using unweighted selection.",
getTargetResource(), hosts.get(i).address(), pi);
lbDescription(), hosts.get(i).address(), pi);
return new EqualWeightEntrySelector(hosts.size());
}
probs[i] = pi;
Expand Down Expand Up @@ -249,7 +248,7 @@ int secondEntry(Random random, int firstPick) {
} while (result == firstPick && iteration++ < maxEffort);
if (firstPick == result) {
LOGGER.debug("{}: failed to pick two unique indices after {} selection attempts",
getTargetResource(), maxEffort);
lbDescription(), maxEffort);
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ final class RoundRobinLoadBalancingPolicy<ResolvedAddress, C extends LoadBalance

@Override
HostSelector<ResolvedAddress, C>
buildSelector(final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
return new RoundRobinSelector<>(hosts, targetResource, failOpen, ignoreWeights);
buildSelector(final List<Host<ResolvedAddress, C>> hosts, final String lbDescription) {
return new RoundRobinSelector<>(hosts, lbDescription, failOpen, ignoreWeights);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection
private final boolean failOpen;
private final boolean ignoreWeights;

RoundRobinSelector(final List<? extends Host<ResolvedAddress, C>> hosts, final String targetResource,
RoundRobinSelector(final List<? extends Host<ResolvedAddress, C>> hosts, final String lbDescription,
final boolean failOpen, final boolean ignoreWeights) {
this(new AtomicInteger(), hosts, targetResource, failOpen, ignoreWeights);
this(new AtomicInteger(), hosts, lbDescription, failOpen, ignoreWeights);
}

private RoundRobinSelector(final AtomicInteger index, final List<? extends Host<ResolvedAddress, C>> hosts,
Expand Down Expand Up @@ -104,7 +104,7 @@ protected Single<C> selectConnection0(

@Override
public HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<? extends Host<ResolvedAddress, C>> hosts) {
return new RoundRobinSelector<>(index, hosts, getTargetResource(), failOpen, ignoreWeights);
return new RoundRobinSelector<>(index, hosts, lbDescription(), failOpen, ignoreWeights);
}

private static Scheduler buildScheduler(AtomicInteger index, List<? extends Host<?, ?>> hosts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ TestableLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBalancer(
"test-service",
serviceDiscoveryPublisher,
hostPriorityStrategyFactory,
loadBalancingPolicy.buildSelector(new ArrayList<>(), "test-service"),
LinearSearchConnectionPoolStrategy.<TestLoadBalancedConnection>factory(DEFAULT_LINEAR_SEARCH_SPACE),
loadBalancingPolicy,
LinearSearchConnectionPoolStrategy.factory(DEFAULT_LINEAR_SEARCH_SPACE),
connectionFactory,
lbDescription -> NoopLoadBalancerObserver.instance(),
null,
Expand Down Expand Up @@ -371,7 +371,7 @@ public String toString() {

@Override
HostSelector<String, TestLoadBalancedConnection> buildSelector(
List<Host<String, TestLoadBalancedConnection>> hosts, String targetResource) {
List<Host<String, TestLoadBalancedConnection>> hosts, String lbDescription) {
return new TestSelector(hosts);
}

Expand Down

0 comments on commit febb582

Please sign in to comment.