Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix rate limiter token bucket and clock consistency issues causing excessive throttling and connection timeouts #23930

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
cccc564
[fix][broker] Fix rate limiting causing connections to time out
lhotari Feb 5, 2025
79c9cce
Fix test
lhotari Feb 5, 2025
9fa43a9
Add test that checks that clock leaping backward or forward would be …
lhotari Feb 5, 2025
f28af85
Improve DefaultMonotonicSnapshotClock so that requests don't get dela…
lhotari Feb 5, 2025
a99fe0c
Also test with small offsets
lhotari Feb 5, 2025
9072713
Improve test case
lhotari Feb 5, 2025
f719dbc
Use JMH blackhole in test
lhotari Feb 5, 2025
c60c4b8
Improve code coverage
lhotari Feb 6, 2025
696f9fb
Refactor: Split out logic for leap detection and monotonic tick updating
lhotari Feb 6, 2025
d6d60f7
Remove invalid test
lhotari Feb 6, 2025
0886054
Revert "Remove invalid test"
lhotari Feb 6, 2025
f4feda1
Add test mode to DefaultMonotonicSnapshotClock so that thread updates…
lhotari Feb 6, 2025
13aa2f7
Use test mode to fix test
lhotari Feb 6, 2025
8b36d71
Improve testability
lhotari Feb 6, 2025
d9fb30f
Add failing test case
lhotari Feb 6, 2025
8c851e2
Fix test
lhotari Feb 6, 2025
846cc89
Add failing test case for the negative tokens case
lhotari Feb 6, 2025
58f8b35
Don't handle leaps forward since those cannot be detected properly
lhotari Feb 6, 2025
91ae4c6
Remove separate test mode since it's not needed
lhotari Feb 7, 2025
487419e
Fix parameter
lhotari Feb 7, 2025
571b2c2
Enable batching of update requests by using a AtomicLong for the requ…
lhotari Feb 7, 2025
532998d
Removing remaining parts of the test mode
lhotari Feb 7, 2025
a21eee3
Remove synchronization from tick updater since the usual path is sing…
lhotari Feb 7, 2025
eb82ab9
Improve comments
lhotari Feb 7, 2025
d1dd4b7
Reorder methods
lhotari Feb 7, 2025
d5ea482
Improve javadoc
lhotari Feb 7, 2025
b19e0de
Reduce excessive logging in test
lhotari Feb 7, 2025
b46fbab
Reduce duplication in JMH test
lhotari Feb 7, 2025
73c2b8c
Add JMH benchmark for DefaultMonotonicSnapshotClock
lhotari Feb 7, 2025
a22da98
Add more instructions for running JMH benchmarks
lhotari Feb 7, 2025
4287838
Add unit tests for leap detection
lhotari Feb 7, 2025
12cb400
Move updating of requestCount outside of the synchronization blocks t…
lhotari Feb 7, 2025
31a1b30
Fix typo and improve comment
lhotari Feb 7, 2025
6e4a94f
Optimize token calculation performance and correctness
lhotari Feb 7, 2025
7463891
Improve eventual consistency test
lhotari Feb 7, 2025
b04752d
If condition when newTokens should be 0
lhotari Feb 7, 2025
ea4f7d5
Improve logic to update lastNanos so that races are prevented with CAS
lhotari Feb 7, 2025
ca7cae4
Add "getTokensUpdatesTokens" to AsyncTokenBucket to reduce eventual c…
lhotari Feb 7, 2025
543c978
Reduce test flakiness
lhotari Feb 7, 2025
d159198
Let MessageDispatchThrottlingTest#reset handle deletion
lhotari Feb 7, 2025
936cdb0
Reduce test flakiness for waiting to new rate to be applied
lhotari Feb 7, 2025
dae8220
Prevent NPEs in DispatchRateLimiter when limit has changed
lhotari Feb 7, 2025
d62f97e
Fix switchToConsistentTokensView behavior
lhotari Feb 7, 2025
1d68522
Revert using getTokensUpdatesTokens mode by default since eventual co…
lhotari Feb 7, 2025
38e9f7b
Rename getTokensUpdatesTokens to consistentTokensView
lhotari Feb 7, 2025
f5c3e57
Use consistent tokens view for SubscribeRateLimiter
lhotari Feb 7, 2025
b79ed85
Fix issue with restartBroker in tests
lhotari Feb 7, 2025
5910b3c
Ignore metadata change when broker isn't running
lhotari Feb 7, 2025
fda2337
Move dispatch throttling tests to broker-api group
lhotari Feb 7, 2025
759fafe
Use AssertJ for better error message
lhotari Feb 7, 2025
5e0a327
Improve test cleanup for retries
lhotari Feb 7, 2025
f024203
Use unique namespaces
lhotari Feb 7, 2025
9277208
Extract common base class to avoid test duplication
lhotari Feb 7, 2025
96781e9
Reduce flakiness
lhotari Feb 7, 2025
234cfc9
Refactor common config
lhotari Feb 7, 2025
8ef12ca
Fix flakiness
lhotari Feb 7, 2025
68ba451
Move MessagePublishThrottlingTest to broker-api test group
lhotari Feb 7, 2025
2d99778
Fix issue in lookupUrl change in test class
lhotari Feb 7, 2025
c67ceb0
Revisit startBroker method in test base class
lhotari Feb 7, 2025
6de794b
Revisit logic one more time in test class
lhotari Feb 7, 2025
f9aab2e
Refactor consistency settings in AsyncTokenBucket and add Javadocs
lhotari Feb 8, 2025
e4f4689
Attempt to fix flaky test MessageDispatchThrottlingTest
lhotari Feb 8, 2025
42fb876
Use consistent tokens view in flaky RGUsageMTAggrWaitForAllMsgsTest
lhotari Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor consistency settings in AsyncTokenBucket and add Javadocs
  • Loading branch information
lhotari committed Feb 8, 2025
commit f9aab2e5f99d1d1e46030b355afa913204695f02
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
* connection or client from the throttling queue to unthrottle. Before unthrottling, the application should check
* for available tokens. If tokens are still not available, the application should continue with throttling and
* repeat the throttling loop.
* <p>By default, the AsyncTokenBucket is eventually consistent. This means that the token balance is updated
* with added tokens and consumed tokens at most once during each "increment", when time advances more than the
* configured resolution. There are settings for configuring consistency, please see {@link AsyncTokenBucketBuilder}
* for details.
* <p>This class does not produce side effects outside its own scope. It functions similarly to a stateful function,
* akin to a counter function. In essence, it is a sophisticated counter. It can serve as a foundational component for
* constructing higher-level asynchronous rate limiter implementations, which require side effects for throttling.
Expand Down Expand Up @@ -119,14 +123,28 @@ public static void resetToDefaultEventualConsistentTokensView() {
*/
private final LongAdder pendingConsumedTokens = new LongAdder();

private final boolean consistentTokensView;
/**
* By default, AsyncTokenBucket is eventually consistent. This means that the consumed tokens are subtracted from
* the total amount of tokens at most once during each "increment", when time advances more than the configured
* resolution. This setting determines if the consumed tokens are subtracted from tokens balance consistently.
* For high performance, it is recommended to keep this setting as false.
*/
private final boolean consistentConsumedTokens;
/**
* By default, AsyncTokenBucket is eventually consistent. This means that the added tokens are calculated based
* on elapsed time at most once during each "increment", when time advances more than the configured
* resolution. This setting determines if the added tokens are calculated and added to tokens balance consistently.
* For high performance, it is recommended to keep this setting as false.
*/
private final boolean consistentAddedTokens;

protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long resolutionNanos,
boolean consistentTokensView) {
boolean consistentConsumedTokens, boolean consistentAddedTokens) {
this.clockSource = clockSource;
this.resolutionNanos = resolutionNanos;
this.lastNanos = Long.MIN_VALUE;
this.consistentTokensView = consistentTokensView;
this.consistentConsumedTokens = consistentConsumedTokens;
this.consistentAddedTokens = consistentAddedTokens;
}

public static FinalRateAsyncTokenBucketBuilder builder() {
Expand All @@ -144,40 +162,46 @@ public static DynamicRateAsyncTokenBucketBuilder builderForDynamicRate() {
/**
* Consumes tokens and possibly updates the tokens balance. New tokens are calculated and added to the current
* tokens balance each time the update takes place. The update takes place once in every interval of the configured
* resolutionNanos or when the forceUpdateTokens parameter is true.
* resolutionNanos or when the forceConsistentTokens parameter is true.
* When the tokens balance isn't updated, the consumed tokens are added to the pendingConsumedTokens LongAdder
* counter which gets flushed the next time the tokens are updated. This makes the tokens balance
* eventually consistent. The reason for this design choice is to optimize performance by preventing CAS loop
* contention which could cause excessive CPU consumption.
*
* @param consumeTokens number of tokens to consume, can be 0 to update the tokens balance
* @param forceUpdateTokens if true, the tokens are updated even if the configured resolution hasn't passed
* @param forceConsistentTokens if true, the token balance is updated consistently
* @return the current number of tokens in the bucket or Long.MIN_VALUE when the number of tokens is unknown due
* to eventual consistency
*/
private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolean forceUpdateTokens) {
private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolean forceConsistentTokens) {
if (consumeTokens < 0) {
throw new IllegalArgumentException("consumeTokens must be >= 0");
}
long currentNanos = clockSource.getTickNanos(forceUpdateTokens);
boolean requestConsistentTickNanosSnapshot =
consistentAddedTokens || consistentConsumedTokens || forceConsistentTokens || resolutionNanos == 0;
long currentNanos = clockSource.getTickNanos(requestConsistentTickNanosSnapshot);
long newTokens = 0;
// check if the tokens should be updated immediately
if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
if (shouldAddTokensImmediately(currentNanos, forceConsistentTokens)) {
// calculate the number of new tokens since the last update
newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
newTokens = calculateNewTokensSinceLastUpdate(currentNanos, forceConsistentTokens);
}
// update tokens if there are new tokens or if resolutionNanos is set to 0 which is currently used for testing
if (newTokens > 0 || resolutionNanos == 0) {
if (newTokens > 0 || resolutionNanos == 0 || consistentConsumedTokens || forceConsistentTokens) {
// flush the pendingConsumedTokens by calling "sumThenReset"
long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset();
// calculate the token delta by subtracting the consumed tokens from the new tokens
long tokenDelta = newTokens - currentPendingConsumedTokens;
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
// limit the tokens to the capacity of the bucket
currentTokens -> Math.min(currentTokens + tokenDelta, getCapacity())
// subtract the consumed tokens from the capped tokens
- consumeTokens);
if (tokenDelta != 0 || consumeTokens != 0) {
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
// limit the tokens to the capacity of the bucket
currentTokens -> Math.min(currentTokens + tokenDelta, getCapacity())
// subtract the consumed tokens from the capped tokens
- consumeTokens);
} else {
return tokens;
}
} else {
// eventual consistent fast path, tokens are not updated immediately

Expand All @@ -186,13 +210,8 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
pendingConsumedTokens.add(consumeTokens);
}

if (forceUpdateTokens) {
// return the current tokens balance without updating the tokens and resetting the pendingConsumedTokens
return tokens - pendingConsumedTokens.sum();
} else {
// return Long.MIN_VALUE if the current value of tokens is unknown due to the eventual consistency
return Long.MIN_VALUE;
}
// return Long.MIN_VALUE if the current value of tokens is unknown due to the eventual consistency
return Long.MIN_VALUE;
}
}

Expand All @@ -201,19 +220,19 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
*
* The tokens will be updated once every resolutionNanos nanoseconds.
* This method checks if the configured resolutionNanos has passed since the last update.
* If the forceUpdateTokens is true, the tokens will be updated immediately.
* If the forceConsistentTokens is true, the tokens will be updated immediately.
*
* @param currentNanos the current monotonic clock time in nanoseconds
* @param forceUpdateTokens if true, the tokens will be updated immediately
* @param currentNanos the current monotonic clock time in nanoseconds
* @param forceConsistentTokens if true, the tokens are added even if the configured resolution hasn't fully passed
* @return true if the tokens should be updated immediately, false otherwise
*/
private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUpdateTokens) {
private boolean shouldAddTokensImmediately(long currentNanos, boolean forceConsistentTokens) {
long currentIncrement = resolutionNanos != 0 ? currentNanos / resolutionNanos : 0;
long currentLastIncrement = lastIncrement;
return currentIncrement == 0
|| (currentIncrement > currentLastIncrement
&& LAST_INCREMENT_UPDATER.compareAndSet(this, currentLastIncrement, currentIncrement))
|| forceUpdateTokens;
|| consistentAddedTokens || forceConsistentTokens;
}

/**
Expand All @@ -223,11 +242,13 @@ private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUp
* @param currentNanos the current monotonic clock time in nanoseconds
* @return the number of new tokens to add since the last update
*/
private long calculateNewTokensSinceLastUpdate(long currentNanos) {
private long calculateNewTokensSinceLastUpdate(long currentNanos, boolean forceConsistentTokens) {
long previousLastNanos = lastNanos;
long newLastNanos;
// update lastNanos only if at least resolutionNanos/2 nanoseconds has passed since the last update
if (currentNanos >= previousLastNanos + resolutionNanos / 2) {
// unless consistency is needed
long minimumIncrementNanos = forceConsistentTokens || consistentAddedTokens ? 0L : resolutionNanos / 2;
if (currentNanos > previousLastNanos + minimumIncrementNanos) {
newLastNanos = currentNanos;
} else {
newLastNanos = previousLastNanos;
Expand Down Expand Up @@ -291,15 +312,14 @@ public boolean consumeTokensAndCheckIfContainsTokens(long consumeTokens) {
}

/**
* Returns the current token balance. When forceUpdateTokens is true, the tokens balance is updated before
* returning. If forceUpdateTokens is false, the tokens balance could be updated if the last updated happened
* Returns the current token balance. When forceConsistentTokens is true, the tokens balance is updated before
* returning. If forceConsistentTokens is false, the tokens balance could be updated if the last updated happened
* more than resolutionNanos nanoseconds ago.
*
* @param forceUpdateTokens if true, the tokens balance is updated before returning
* @return the current token balance
*/
protected long tokens(boolean forceUpdateTokens) {
long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, forceUpdateTokens);
private long tokens() {
long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, false);
if (currentTokens != Long.MIN_VALUE) {
// when currentTokens isn't Long.MIN_VALUE, the current tokens balance is known
return currentTokens;
Expand All @@ -319,7 +339,7 @@ public long calculateThrottlingDuration() {
long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, true);
if (currentTokens == Long.MIN_VALUE) {
throw new IllegalArgumentException(
"Unexpected result from updateAndConsumeTokens with forceUpdateTokens set to true");
"Unexpected result from updateAndConsumeTokens with forceConsistentTokens set to true");
}
if (currentTokens > 0) {
return 0L;
Expand All @@ -334,10 +354,10 @@ public long calculateThrottlingDuration() {
/**
* Returns the current number of tokens in the bucket.
* The token balance is updated if the configured resolutionNanos has passed since the last update unless
* consistentTokensView is true.
* consistentConsumedTokens is true.
*/
public final long getTokens() {
return tokens(consistentTokensView);
return tokens();
}

public abstract long getRate();
Expand All @@ -346,25 +366,12 @@ public final long getTokens() {
* Checks if the bucket contains tokens.
* The token balance is updated before the comparison if the configured resolutionNanos has passed since the last
* update. It's possible that the returned result is not definite since the token balance is eventually consistent
* if consistentTokensView is false.
* if consistentConsumedTokens is false.
*
* @return true if the bucket contains tokens, false otherwise
*/
public boolean containsTokens() {
return containsTokens(consistentTokensView);
}

/**
* Checks if the bucket contains tokens.
* The token balance is updated before the comparison if the configured resolutionNanos has passed since the last
* update. The token balance is also updated when forceUpdateTokens is true.
* It's possible that the returned result is not definite since the token balance is eventually consistent.
*
* @param forceUpdateTokens if true, the token balance is updated before the comparison
* @return true if the bucket contains tokens, false otherwise
*/
protected boolean containsTokens(boolean forceUpdateTokens) {
return tokens(forceUpdateTokens) > 0;
return tokens() > 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
public abstract class AsyncTokenBucketBuilder<SELF extends AsyncTokenBucketBuilder<SELF>> {
protected MonotonicSnapshotClock clock = AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK;
protected long resolutionNanos = AsyncTokenBucket.defaultResolutionNanos;
protected boolean consistentTokensView;
protected boolean consistentConsumedTokens;
protected boolean consistentAddedTokens;

protected AsyncTokenBucketBuilder() {
}
Expand All @@ -32,18 +33,45 @@ protected SELF self() {
return (SELF) this;
}

/**
* Set the clock source for the token bucket. It's recommended to use the {@link DefaultMonotonicSnapshotClock}
* for most use cases.
*/
public SELF clock(MonotonicSnapshotClock clock) {
this.clock = clock;
return self();
}

/**
* By default, AsyncTokenBucket is eventually consistent. This means that the token balance is updated, when time
* advances more than the configured resolution. This setting determines the duration of the increment.
* Setting this value to 0 will make the token balance fully consistent. There's a performance trade-off
* when setting this value to 0.
*/
public SELF resolutionNanos(long resolutionNanos) {
this.resolutionNanos = resolutionNanos;
return self();
}

public SELF consistentTokensView(boolean consistentTokensView) {
this.consistentTokensView = consistentTokensView;
/**
* By default, AsyncTokenBucket is eventually consistent. This means that the consumed tokens are subtracted from
* the total amount of tokens at most once during each "increment", when time advances more than the configured
* resolution. This setting determines if the consumed tokens are subtracted from tokens balance consistently.
* For high performance, it is recommended to keep this setting as false.
*/
public SELF consistentConsumedTokens(boolean consistentConsumedTokens) {
this.consistentConsumedTokens = consistentConsumedTokens;
return self();
}

/**
* By default, AsyncTokenBucket is eventually consistent. This means that the added tokens are calculated based
* on elapsed time at most once during each "increment", when time advances more than the configured
* resolution. This setting determines if the added tokens are calculated and added to tokens balance consistently.
* For high performance, it is recommended to keep this setting as false.
*/
public SELF consistentAddedTokens(boolean consistentAddedTokens) {
this.consistentAddedTokens = consistentAddedTokens;
return self();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ public class DynamicRateAsyncTokenBucket extends AsyncTokenBucket {

protected DynamicRateAsyncTokenBucket(double capacityFactor, LongSupplier rateFunction,
MonotonicSnapshotClock clockSource, LongSupplier ratePeriodNanosFunction,
long resolutionNanos, boolean consistentTokensView,
double initialTokensFactor, double targetFillFactorAfterThrottling) {
super(clockSource, resolutionNanos, consistentTokensView);
long resolutionNanos, boolean consistentConsumedTokens,
boolean consistentAddedTokens, double initialTokensFactor,
double targetFillFactorAfterThrottling) {
super(clockSource, resolutionNanos, consistentConsumedTokens, consistentAddedTokens);
this.capacityFactor = capacityFactor;
this.rateFunction = rateFunction;
this.ratePeriodNanosFunction = ratePeriodNanosFunction;
this.targetFillFactorAfterThrottling = targetFillFactorAfterThrottling;
this.tokens = (long) (rateFunction.getAsLong() * initialTokensFactor);
tokens(false);
getTokens();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ public DynamicRateAsyncTokenBucketBuilder targetFillFactorAfterThrottling(
@Override
public AsyncTokenBucket build() {
return new DynamicRateAsyncTokenBucket(this.capacityFactor, this.rateFunction,
this.clock,
this.ratePeriodNanosFunction, this.resolutionNanos, this.consistentTokensView,
this.initialFillFactor,
targetFillFactorAfterThrottling);
this.clock, this.ratePeriodNanosFunction, this.resolutionNanos, this.consistentConsumedTokens,
this.consistentAddedTokens, this.initialFillFactor, targetFillFactorAfterThrottling);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ class FinalRateAsyncTokenBucket extends AsyncTokenBucket {
private final long targetAmountOfTokensAfterThrottling;

protected FinalRateAsyncTokenBucket(long capacity, long rate, MonotonicSnapshotClock clockSource,
long ratePeriodNanos, long resolutionNanos, boolean consistentTokensView,
long initialTokens) {
super(clockSource, resolutionNanos, consistentTokensView);
long ratePeriodNanos, long resolutionNanos, boolean consistentConsumedTokens,
boolean consistentAddedTokens, long initialTokens) {
super(clockSource, resolutionNanos, consistentConsumedTokens, consistentAddedTokens);
this.capacity = capacity;
this.rate = rate;
this.ratePeriodNanos = ratePeriodNanos != -1 ? ratePeriodNanos : ONE_SECOND_NANOS;
// The target amount of tokens is the amount of tokens made available in the resolution duration
this.targetAmountOfTokensAfterThrottling = Math.max(this.resolutionNanos * rate / ratePeriodNanos, 1);
this.tokens = initialTokens;
tokens(false);
getTokens();
}

@Override
Expand Down
Loading
Loading