Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix rate limiter token bucket and clock consistency issues causing excessive throttling and connection timeouts #23930

Merged
Merged
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
cccc564
[fix][broker] Fix rate limiting causing connections to time out
lhotari Feb 5, 2025
79c9cce
Fix test
lhotari Feb 5, 2025
9fa43a9
Add test that checks that clock leaping backward or forward would be …
lhotari Feb 5, 2025
f28af85
Improve DefaultMonotonicSnapshotClock so that requests don't get dela…
lhotari Feb 5, 2025
a99fe0c
Also test with small offsets
lhotari Feb 5, 2025
9072713
Improve test case
lhotari Feb 5, 2025
f719dbc
Use JMH blackhole in test
lhotari Feb 5, 2025
c60c4b8
Improve code coverage
lhotari Feb 6, 2025
696f9fb
Refactor: Split out logic for leap detection and monotonic tick updating
lhotari Feb 6, 2025
d6d60f7
Remove invalid test
lhotari Feb 6, 2025
0886054
Revert "Remove invalid test"
lhotari Feb 6, 2025
f4feda1
Add test mode to DefaultMonotonicSnapshotClock so that thread updates…
lhotari Feb 6, 2025
13aa2f7
Use test mode to fix test
lhotari Feb 6, 2025
8b36d71
Improve testability
lhotari Feb 6, 2025
d9fb30f
Add failing test case
lhotari Feb 6, 2025
8c851e2
Fix test
lhotari Feb 6, 2025
846cc89
Add failing test case for the negative tokens case
lhotari Feb 6, 2025
58f8b35
Don't handle leaps forward since those cannot be detected properly
lhotari Feb 6, 2025
91ae4c6
Remove separate test mode since it's not needed
lhotari Feb 7, 2025
487419e
Fix parameter
lhotari Feb 7, 2025
571b2c2
Enable batching of update requests by using a AtomicLong for the requ…
lhotari Feb 7, 2025
532998d
Removing remaining parts of the test mode
lhotari Feb 7, 2025
a21eee3
Remove synchronization from tick updater since the usual path is sing…
lhotari Feb 7, 2025
eb82ab9
Improve comments
lhotari Feb 7, 2025
d1dd4b7
Reorder methods
lhotari Feb 7, 2025
d5ea482
Improve javadoc
lhotari Feb 7, 2025
b19e0de
Reduce excessive logging in test
lhotari Feb 7, 2025
b46fbab
Reduce duplication in JMH test
lhotari Feb 7, 2025
73c2b8c
Add JMH benchmark for DefaultMonotonicSnapshotClock
lhotari Feb 7, 2025
a22da98
Add more instructions for running JMH benchmarks
lhotari Feb 7, 2025
4287838
Add unit tests for leap detection
lhotari Feb 7, 2025
12cb400
Move updating of requestCount outside of the synchronization blocks t…
lhotari Feb 7, 2025
31a1b30
Fix typo and improve comment
lhotari Feb 7, 2025
6e4a94f
Optimize token calculation performance and correctness
lhotari Feb 7, 2025
7463891
Improve eventual consistency test
lhotari Feb 7, 2025
b04752d
If condition when newTokens should be 0
lhotari Feb 7, 2025
ea4f7d5
Improve logic to update lastNanos so that races are prevented with CAS
lhotari Feb 7, 2025
ca7cae4
Add "getTokensUpdatesTokens" to AsyncTokenBucket to reduce eventual c…
lhotari Feb 7, 2025
543c978
Reduce test flakiness
lhotari Feb 7, 2025
d159198
Let MessageDispatchThrottlingTest#reset handle deletion
lhotari Feb 7, 2025
936cdb0
Reduce test flakiness for waiting to new rate to be applied
lhotari Feb 7, 2025
dae8220
Prevent NPEs in DispatchRateLimiter when limit has changed
lhotari Feb 7, 2025
d62f97e
Fix switchToConsistentTokensView behavior
lhotari Feb 7, 2025
1d68522
Revert using getTokensUpdatesTokens mode by default since eventual co…
lhotari Feb 7, 2025
38e9f7b
Rename getTokensUpdatesTokens to consistentTokensView
lhotari Feb 7, 2025
f5c3e57
Use consistent tokens view for SubscribeRateLimiter
lhotari Feb 7, 2025
b79ed85
Fix issue with restartBroker in tests
lhotari Feb 7, 2025
5910b3c
Ignore metadata change when broker isn't running
lhotari Feb 7, 2025
fda2337
Move dispatch throttling tests to broker-api group
lhotari Feb 7, 2025
759fafe
Use AssertJ for better error message
lhotari Feb 7, 2025
5e0a327
Improve test cleanup for retries
lhotari Feb 7, 2025
f024203
Use unique namespaces
lhotari Feb 7, 2025
9277208
Extract common base class to avoid test duplication
lhotari Feb 7, 2025
96781e9
Reduce flakiness
lhotari Feb 7, 2025
234cfc9
Refactor common config
lhotari Feb 7, 2025
8ef12ca
Fix flakiness
lhotari Feb 7, 2025
68ba451
Move MessagePublishThrottlingTest to broker-api test group
lhotari Feb 7, 2025
2d99778
Fix issue in lookupUrl change in test class
lhotari Feb 7, 2025
c67ceb0
Revisit startBroker method in test base class
lhotari Feb 7, 2025
6de794b
Revisit logic one more time in test class
lhotari Feb 7, 2025
f9aab2e
Refactor consistency settings in AsyncTokenBucket and add Javadocs
lhotari Feb 8, 2025
e4f4689
Attempt to fix flaky test MessageDispatchThrottlingTest
lhotari Feb 8, 2025
42fb876
Use consistent tokens view in flaky RGUsageMTAggrWaitForAllMsgsTest
lhotari Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add test mode to DefaultMonotonicSnapshotClock so that thread updates…
… don't cause problems in certain tests
  • Loading branch information
lhotari committed Feb 6, 2025
commit f4feda1df2a37b94e253bfcfd5d531be32f8f0ca
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,21 @@
*/
public class DefaultMonotonicSnapshotClock implements MonotonicSnapshotClock, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMonotonicSnapshotClock.class);
private final LongSupplier clockSource;
private final TickUpdaterThread tickUpdaterThread;
private final long snapshotIntervalNanos;
private volatile long snapshotTickNanos;

public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier clockSource) {
this(snapshotIntervalNanos, clockSource, false);
}

public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier clockSource,
boolean updateOnlyWhenRequested) {
if (snapshotIntervalNanos < TimeUnit.MILLISECONDS.toNanos(1)) {
throw new IllegalArgumentException("snapshotIntervalNanos must be at least 1 millisecond");
}
this.clockSource = Objects.requireNonNull(clockSource, "clockSource must not be null");
this.snapshotIntervalNanos = snapshotIntervalNanos;
tickUpdaterThread = new TickUpdaterThread(snapshotIntervalNanos, clockSource, this::setSnapshotTickNanos);
tickUpdaterThread = new TickUpdaterThread(snapshotIntervalNanos,
Objects.requireNonNull(clockSource, "clockSource must not be null"), this::setSnapshotTickNanos,
updateOnlyWhenRequested);
tickUpdaterThread.start();
}

Expand Down Expand Up @@ -84,14 +87,20 @@ private static class TickUpdaterThread extends Thread {
private final long sleepMillis;
private final int sleepNanos;

TickUpdaterThread(long snapshotIntervalNanos, LongSupplier clockSource, LongConsumer setSnapshotTickNanos) {
TickUpdaterThread(long snapshotIntervalNanos, LongSupplier clockSource, LongConsumer setSnapshotTickNanos,
boolean updateOnlyWhenRequested) {
super(DefaultMonotonicSnapshotClock.class.getSimpleName() + "-update-loop");
// set as daemon thread so that it doesn't prevent the JVM from exiting
setDaemon(true);
// set the highest priority
setPriority(MAX_PRIORITY);
this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis));
if (updateOnlyWhenRequested) {
this.sleepMillis = -1;
this.sleepNanos = -1;
} else {
this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis));
}
tickUpdater = new MonotonicLeapDetectingTickUpdater(clockSource, setSnapshotTickNanos,
2 * snapshotIntervalNanos);
}
Expand All @@ -112,8 +121,14 @@ public void run() {
tickUpdateDelayMonitorNotified = false;
// only wait if no explicit request has been made since the last update
if (requestCount == updatedForRequestCount) {
// if no request has been made, sleep for the configured interval
tickUpdateDelayMonitor.wait(sleepMillis, sleepNanos);
if (sleepMillis > 0 || sleepNanos > 0) {
// if no request has been made, sleep for the configured interval
tickUpdateDelayMonitor.wait(sleepMillis, sleepNanos);
} else {
// when the sleepMillis is -1, the thread will wait indefinitely until notified.
// this is used only in testing with a test clock source that is manually updated.
tickUpdateDelayMonitor.wait();
}
waitedSnapshotInterval = !tickUpdateDelayMonitorNotified;
}
updatedForRequestCount = requestCount;
Expand Down Expand Up @@ -168,9 +183,9 @@ public void requestUpdate() {

@Override
public synchronized void start() {
super.start();
// wait until the thread is started and the tick value has been updated
synchronized (tickUpdatedMonitor) {
super.start();
try {
tickUpdatedMonitor.wait();
} catch (InterruptedException e) {
Expand Down