feat(router): implement RouterPermitLimiter for managing append permits#3166
Conversation
b7990a9 to
e768d53
Compare
There was a problem hiding this comment.
Pull request overview
This pull request implements a RouterPermitLimiter class to manage append permits for controlling resource usage in router operations. The limiter provides three acquisition strategies: blocking acquire, entry-based acquire, and non-blocking acquireUpTo.
Changes:
- Added
RouterPermitLimiterclass with permit acquisition/release logic and metrics tracking - Integrated permit limiting into
RouterOutV2for outbound append operations - Integrated permit limiting into
RouterInV2for inbound append operations with dynamic permit adjustment - Added comprehensive unit tests for the permit limiter functionality
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java | New permit limiter class with blocking/non-blocking acquisition methods and statistics logging |
| core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java | Unit tests covering permit acquisition, clamping behavior, and interrupt handling |
| core/src/main/java/kafka/automq/zerozone/RouterOutV2.java | Integrated permit limiter to control outbound append operations with acquisition before append and release after completion |
| core/src/main/java/kafka/automq/zerozone/RouterInV2.java | Integrated permit limiter with placeholder permits and dynamic adjustment based on actual data size |
Comments suppressed due to low confidence (4)
core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java:144
- The test
testAcquirePreservesInterrupthas a race condition. The test sets the interrupt flag before callingacquire(line 138), then expects the interrupt flag to still be set afteracquirecompletes (line 141). However, there's a 10ms sleep in the releaser thread (line 131) before releasing the permit. Theacquiremethod may succeed immediately if there's a timing issue, or the interrupt handling may not behave as expected in a test environment. Consider using a more deterministic approach, such as a CountDownLatch to coordinate between threads.
});
releaser.start();
Thread.currentThread().interrupt();
limiter.acquire(1);
assertTrue(Thread.currentThread().isInterrupted());
Thread.interrupted();
releaser.join();
}
}
core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java:142
- The test calls
Thread.interrupted()at line 142 which clears the interrupt flag, but this call's return value is not verified. Consider usingassertTrue(Thread.interrupted())to both clear the flag and verify it was set, making the test more explicit about validating the interrupt preservation behavior.
core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java:145 - The test suite lacks direct test coverage for the
releasemethod. While it's used indirectly in the interrupt test, there are no explicit tests verifying that: 1) releasing permits increases the available count, 2) releasing zero or negative permits is handled correctly, 3) permits can be acquired after being released. Consider adding dedicated tests for the release functionality.
public class RouterPermitLimiterTest {
@Test
public void testAcquireClampsToMaxPermits() {
Time time = new MockTime();
Semaphore semaphore = new Semaphore(10);
Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
.newHistogram("RouterPermitLimiterAcquireFailTimeNanos");
RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 10, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));
int acquired = limiter.acquire(100);
assertEquals(10, acquired);
assertEquals(0, semaphore.availablePermits());
}
@Test
public void testAcquireUpToUsesAvailablePermits() {
Time time = new MockTime();
Semaphore semaphore = new Semaphore(5);
Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
.newHistogram("RouterPermitLimiterAcquireUpToFailTimeNanos");
RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 10, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));
int acquired = limiter.acquireUpTo(8);
assertEquals(5, acquired);
assertEquals(0, semaphore.availablePermits());
}
@Test
public void testAcquireUpToClampsToMaxPermits() {
Time time = new MockTime();
Semaphore semaphore = new Semaphore(10);
Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
.newHistogram("RouterPermitLimiterAcquireUpToClampNanos");
RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 3, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));
int acquired = limiter.acquireUpTo(8);
assertEquals(3, acquired);
assertEquals(7, semaphore.availablePermits());
}
@Test
public void testAcquireUpToReturnsZeroWhenNoPermits() {
Time time = new MockTime();
Semaphore semaphore = new Semaphore(0);
Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
.newHistogram("RouterPermitLimiterAcquireUpToNoPermitsNanos");
RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 5, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));
int acquired = limiter.acquireUpTo(4);
assertEquals(0, acquired);
assertEquals(0, semaphore.availablePermits());
}
@Test
public void testAcquireUpToIgnoresNonPositiveSize() {
Time time = new MockTime();
Semaphore semaphore = new Semaphore(5);
Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
.newHistogram("RouterPermitLimiterAcquireUpToNonPositiveNanos");
RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 5, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));
assertEquals(0, limiter.acquireUpTo(0));
assertEquals(0, limiter.acquireUpTo(-1));
assertEquals(5, semaphore.availablePermits());
}
@Test
public void testAcquirePreservesInterrupt() throws Exception {
Time time = new MockTime();
Semaphore semaphore = new Semaphore(0);
Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
.newHistogram("RouterPermitLimiterInterruptAcquireFailTimeNanos");
RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 1, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));
Thread releaser = new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
semaphore.release(1);
});
releaser.start();
Thread.currentThread().interrupt();
limiter.acquire(1);
assertTrue(Thread.currentThread().isInterrupted());
Thread.interrupted();
releaser.join();
}
}
core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java:139
- The
tryPermitStatisticsmethod, which handles logging and metrics clearing every 60 seconds, has no test coverage. While it's a private method called fromacquire, there are no tests verifying: 1) statistics are logged at the correct interval, 2) the histogram is cleared after logging, 3) the compareAndSet logic prevents concurrent logging. Consider adding tests that use MockTime to advance time and verify this behavior.
maxPermits
);
acquireFailTimeHist.clear();
}
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e874e37 to
6e97c49
Compare
…ts (#3166) * feat(router): implement RouterPermitLimiter for managing append permits * fix(router): update permit handling in RouterPermitLimiter and related classes * fix(router): refactor permit handling to use environment variables for append permit size * fix(zerozone): optimize permit acquisition logic in RouterInV2 * fix(router): remove unused Systems import from RouterPermitLimiter
…ts (#3166) (#3168) * feat(router): implement RouterPermitLimiter for managing append permits * fix(router): update permit handling in RouterPermitLimiter and related classes * fix(router): refactor permit handling to use environment variables for append permit size * fix(zerozone): optimize permit acquisition logic in RouterInV2 * fix(router): remove unused Systems import from RouterPermitLimiter
No description provided.