Skip to content

Commit

Permalink
loadbalancer-experimental: Narrow ewma config params from long to int (
Browse files Browse the repository at this point in the history
…#2994)

Motivation:

We currently allow configuring a long for the parameter values that
affect the RequestTracker ewma, specifically the cancel, error, and
concurrent request penalties. There are no realistic scenarios where
a long is necessary vs an int.

Modifications:

Change the configuration parameters to an int.
  • Loading branch information
bryce-anderson authored Jul 5, 2024
1 parent efb24b3 commit d78f85a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@
import java.util.concurrent.locks.StampedLock;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Integer.MIN_VALUE;
import static java.lang.Math.ceil;
import static java.lang.Math.exp;
import static java.lang.Math.log;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand All @@ -38,16 +34,16 @@
* (2019) Algorithms for Unevenly Spaced Time Series: Moving Averages and Other Rolling Operators (4.2 pp. 10)</a>
*/
abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
private static final long MAX_MS_TO_NS = NANOSECONDS.convert(MAX_VALUE, MILLISECONDS);
private static final long MAX_MS_TO_NS = NANOSECONDS.convert(Integer.MAX_VALUE, MILLISECONDS);

private final StampedLock lock = new StampedLock();
/**
* Mean lifetime, exponential decay. inverted tau
*/
private final double invTau;
private final long cancelPenalty;
private final long errorPenalty;
private final long concurrentRequestPenalty;
private final int cancelPenalty;
private final int errorPenalty;
private final int concurrentRequestPenalty;

/**
* Last inserted value to compute weight.
Expand All @@ -60,8 +56,8 @@ abstract class DefaultRequestTracker implements RequestTracker, ScoreSupplier {
private int concurrentCount;
private long concurrentStamp = Long.MIN_VALUE;

DefaultRequestTracker(final long halfLifeNanos, final long cancelPenalty, final long errorPenalty,
final long concurrentRequestPenalty) {
DefaultRequestTracker(final long halfLifeNanos, final int cancelPenalty, final int errorPenalty,
final int concurrentRequestPenalty) {
ensurePositive(halfLifeNanos, "halfLifeNanos");
this.invTau = Math.pow((halfLifeNanos / log(2)), -1);
this.cancelPenalty = cancelPenalty;
Expand Down Expand Up @@ -101,7 +97,7 @@ public void onRequestError(final long startTimeNanos, ErrorClass errorClass) {
onComplete(startTimeNanos, errorClass == ErrorClass.CANCELLED ? cancelPenalty : errorPenalty);
}

private void onComplete(final long startTimeNanos, long penalty) {
private void onComplete(final long startTimeNanos, int penalty) {
final long stamp = lock.writeLock();
try {
concurrentCount--;
Expand Down Expand Up @@ -146,32 +142,32 @@ public final int score() {
// maximum score to increase the likelihood this entity is selected. If there are concurrent requests we
// don't yet know the latency characteristics so we return the minimum score to decrease the
// likelihood this entity is selected.
return concurrentCount == 0 ? 0 : MIN_VALUE;
return concurrentCount == 0 ? 0 : Integer.MIN_VALUE;
}

if (concurrentCount > 0 && concurrentStamp != Long.MIN_VALUE) {
// If we have a request concurrent we should consider how long it has been concurrent so that sudden
// interruptions don't have to wait for timeouts before our scores can be adjusted.
currentEWMA = max(currentEWMA, nanoToMillis(currentTimeNanos - concurrentStamp));
currentEWMA = Integer.max(currentEWMA, nanoToMillis(currentTimeNanos - concurrentStamp));
}

// Add penalty for concurrent requests to account for "unaccounted" load.
// Penalty is the observed latency if known, else an arbitrarily high value which makes entities for which
// no latency data has yet been received (eg: request sent but not received), un-selectable.
final int concurrentPenalty = (int) min(MAX_VALUE,
(long) concurrentCount * concurrentRequestPenalty * currentEWMA);
int concurrentPenalty = safeMultiply(concurrentCount, safeMultiply(currentEWMA, concurrentRequestPenalty));
// Since we are measuring latencies and lower latencies are better, we turn the score as negative such that
// lower the latency, higher the score.
return MAX_VALUE - currentEWMA <= concurrentPenalty ? MIN_VALUE : -(currentEWMA + concurrentPenalty);
return Integer.MAX_VALUE - currentEWMA <= concurrentPenalty ?
Integer.MIN_VALUE : -(currentEWMA + concurrentPenalty);
}

private static int applyPenalty(int currentEWMA, int currentLatency, long penalty) {
private static int applyPenalty(int currentEWMA, int currentLatency, int penalty) {
// Relatively large latencies will have a bigger impact on the penalty, while smaller latencies (e.g. premature
// cancel/error) rely on the penalty.
return (int) min(MAX_VALUE, max(currentEWMA, currentLatency) * penalty);
return (int) Long.min(Integer.MAX_VALUE, Long.max(currentEWMA, currentLatency) * penalty);
}

private void updateEwma(long penalty, long startTimeNanos) {
private void updateEwma(int penalty, long startTimeNanos) {
assert lock.isWriteLocked();
// We capture the current time while holding the lock to exploit the monotonic time source
// properties which prevent the time duration from going negative. This will result in a latency penalty
Expand All @@ -190,18 +186,25 @@ private void updateEwma(long penalty, long startTimeNanos) {

// Peak EWMA from finagle for the score to be extremely sensitive to higher than normal latencies.
final int nextEWMA;
if (currentLatency > currentEWMA) {
nextEWMA = currentLatency;
} else {
final double tmp = (currentTimeNanos - lastTimeNanos) * invTau;
final double w = exp(-tmp);
nextEWMA = (int) ceil(currentEWMA * w + currentLatency * (1d - w));
}
nextEWMA = currentLatency > currentEWMA ? currentLatency :
calculateDecay(currentTimeNanos, lastTimeNanos, currentEWMA, currentLatency, invTau);
lastTimeNanos = currentTimeNanos;
ewma = nextEWMA;
}

private static int calculateDecay(long currentTimeNanos, long lastTimeNanos,
long currentEWMA, int currentLatency, double invTau) {
final double tmp = (currentTimeNanos - lastTimeNanos) * invTau;
final double w = exp(-tmp);
return (int) ceil(currentEWMA * w + currentLatency * (1d - w));
}

private static int nanoToMillis(long nanos) {
return (int) MILLISECONDS.convert(min(nanos, MAX_MS_TO_NS), NANOSECONDS);
return (int) Long.min(Integer.MAX_VALUE, MILLISECONDS.convert(Long.min(nanos, MAX_MS_TO_NS), NANOSECONDS));
}

private static int safeMultiply(int a, int b) {
long result = ((long) a) * b;
return (int) Long.min(Integer.MAX_VALUE, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public final class OutlierDetectorConfig {

// ServiceTalk specific settings
private final Duration ewmaHalfLife;
private final long ewmaCancellationPenalty;
private final long ewmaErrorPenalty;
private final long concurrentRequestPenalty;
private final int ewmaCancellationPenalty;
private final int ewmaErrorPenalty;
private final int concurrentRequestPenalty;
private final boolean cancellationIsError;
private final int failedConnectionsThreshold;
private final Duration failureDetectorIntervalJitter;
Expand All @@ -69,8 +69,8 @@ public final class OutlierDetectorConfig {
private final int failurePercentageRequestVolume;
private final Duration maxEjectionTime;

OutlierDetectorConfig(final Duration ewmaHalfLife, final long ewmaCancellationPenalty, final long ewmaErrorPenalty,
final long concurrentRequestPenalty, final boolean cancellationIsError, int failedConnectionsThreshold,
private OutlierDetectorConfig(final Duration ewmaHalfLife, final int ewmaCancellationPenalty, final int ewmaErrorPenalty,
final int concurrentRequestPenalty, final boolean cancellationIsError, int failedConnectionsThreshold,
final Duration failureDetectorIntervalJitter,
final Duration serviceDiscoveryResubscribeInterval, final Duration serviceDiscoveryResubscribeJitter,
// true xDS settings
Expand Down Expand Up @@ -126,7 +126,7 @@ public Duration ewmaHalfLife() {
* The latency of the cancelled request is multiplied by the provided penalty before incorporating it into the EWMA.
* @return the penalty factor for local cancellation of requests.
*/
public long ewmaCancellationPenalty() {
public int ewmaCancellationPenalty() {
return ewmaCancellationPenalty;
}

Expand All @@ -143,7 +143,7 @@ public boolean cancellationIsError() {
* The latency of the failed request is multiplied by the provided penalty before incorporating it into the EWMA.
* @return the penalty factor for requests that were classified as an error.
*/
public long ewmaErrorPenalty() {
public int ewmaErrorPenalty() {
return ewmaErrorPenalty;
}

Expand All @@ -153,7 +153,7 @@ public long ewmaErrorPenalty() {
* concurrent load the traffic distribution will be smoother for algorithms that consider load metrics.
* @return the penalty factory to use for concurrent load.
*/
public long concurrentRequestPenalty() {
public int concurrentRequestPenalty() {
return concurrentRequestPenalty;
}

Expand Down Expand Up @@ -366,9 +366,9 @@ public String toString() {
public static final class Builder {

static final Duration DEFAULT_EWMA_HALF_LIFE = Duration.ofSeconds(10);
static final long DEFAULT_CANCEL_PENALTY = 5L;
static final long DEFAULT_ERROR_PENALTY = 10L;
static final long DEFAULT_CONCURRENT_REQUEST_PENALTY = 1L;
static final int DEFAULT_CANCEL_PENALTY = 5;
static final int DEFAULT_ERROR_PENALTY = 10;
static final int DEFAULT_CONCURRENT_REQUEST_PENALTY = 1;
private boolean cancellationIsError = true;

// Default xDS outlier detector settings.
Expand All @@ -389,9 +389,9 @@ public static final class Builder {

// Non-xDS builder settings
private Duration ewmaHalfLife = DEFAULT_EWMA_HALF_LIFE;
private long ewmaCancellationPenalty = DEFAULT_CANCEL_PENALTY;
private long ewmaErrorPenalty = DEFAULT_ERROR_PENALTY;
private long concurrentRequestPenalty = DEFAULT_CONCURRENT_REQUEST_PENALTY;
private int ewmaCancellationPenalty = DEFAULT_CANCEL_PENALTY;
private int ewmaErrorPenalty = DEFAULT_ERROR_PENALTY;
private int concurrentRequestPenalty = DEFAULT_CONCURRENT_REQUEST_PENALTY;
private int failedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private Duration intervalJitter = DEFAULT_HEALTH_CHECK_JITTER;
private Duration serviceDiscoveryResubscribeInterval = DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL;
Expand Down Expand Up @@ -497,7 +497,7 @@ public Builder ewmaHalfLife(final Duration ewmaHalfLife) {
* @param ewmaCancellationPenalty the penalty factor for local cancellation of requests.
* @return {@code this}
*/
public Builder ewmaCancellationPenalty(final long ewmaCancellationPenalty) {
public Builder ewmaCancellationPenalty(final int ewmaCancellationPenalty) {
this.ewmaCancellationPenalty = ensureNonNegative(ewmaCancellationPenalty, "ewmaCancellationPenalty");
return this;
}
Expand All @@ -511,7 +511,7 @@ public Builder ewmaCancellationPenalty(final long ewmaCancellationPenalty) {
* @param ewmaErrorPenalty the penalty factor for requests that were classified as an error.
* @return {@code this}
*/
public Builder ewmaErrorPenalty(final long ewmaErrorPenalty) {
public Builder ewmaErrorPenalty(final int ewmaErrorPenalty) {
this.ewmaErrorPenalty = ensureNonNegative(ewmaErrorPenalty, "ewmaErrorPenalty");
return this;
}
Expand All @@ -537,7 +537,7 @@ public Builder cancellationIsError(final boolean cancellationIsError) {
* @param ewmaConcurrentRequestPenalty the penalty factory to apply for concurrent load.
* @return {@code this}
*/
public Builder ewmaConcurrentRequestPenalty(final long ewmaConcurrentRequestPenalty) {
public Builder ewmaConcurrentRequestPenalty(final int ewmaConcurrentRequestPenalty) {
this.concurrentRequestPenalty = ensureNonNegative(
ewmaConcurrentRequestPenalty, "ewmaConcurrentRequestPenalty");
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ abstract class XdsHealthIndicator<ResolvedAddress, C extends LoadBalancedConnect
private volatile Long evictedUntilNanos;

XdsHealthIndicator(final SequentialExecutor sequentialExecutor, final Executor executor,
final Duration ewmaHalfLife, final long cancellationPenalty, final long errorPenalty,
final long pendingRequestPenalty,
final Duration ewmaHalfLife, final int cancellationPenalty, final int errorPenalty,
final int pendingRequestPenalty,
final boolean cancellationIsError, final ResolvedAddress address, String lbDescription,
final HostObserver hostObserver) {
super(requireNonNull(ewmaHalfLife, "ewmaHalfLife").toNanos(),
Expand Down

0 comments on commit d78f85a

Please sign in to comment.