Skip to content

feat(router): implement RouterPermitLimiter for managing append permits#3166

Merged
Gezi-lzq merged 5 commits into
mainfrom
routerv2-limit
Jan 20, 2026
Merged

feat(router): implement RouterPermitLimiter for managing append permits#3166
Gezi-lzq merged 5 commits into
mainfrom
routerv2-limit

Conversation

@Gezi-lzq
Copy link
Copy Markdown
Contributor

No description provided.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements a RouterPermitLimiter class to manage append permits for controlling resource usage in router operations. The limiter provides three acquisition strategies: blocking acquire, entry-based acquire, and non-blocking acquireUpTo.

Changes:

  • Added RouterPermitLimiter class with permit acquisition/release logic and metrics tracking
  • Integrated permit limiting into RouterOutV2 for outbound append operations
  • Integrated permit limiting into RouterInV2 for inbound append operations with dynamic permit adjustment
  • Added comprehensive unit tests for the permit limiter functionality

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.

File Description
core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java New permit limiter class with blocking/non-blocking acquisition methods and statistics logging
core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java Unit tests covering permit acquisition, clamping behavior, and interrupt handling
core/src/main/java/kafka/automq/zerozone/RouterOutV2.java Integrated permit limiter to control outbound append operations with acquisition before append and release after completion
core/src/main/java/kafka/automq/zerozone/RouterInV2.java Integrated permit limiter with placeholder permits and dynamic adjustment based on actual data size
Comments suppressed due to low confidence (4)

core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java:144

  • The test testAcquirePreservesInterrupt has a race condition. The test sets the interrupt flag before calling acquire (line 138), then expects the interrupt flag to still be set after acquire completes (line 141). However, there's a 10ms sleep in the releaser thread (line 131) before releasing the permit. The acquire method may succeed immediately if there's a timing issue, or the interrupt handling may not behave as expected in a test environment. Consider using a more deterministic approach, such as a CountDownLatch to coordinate between threads.
        });
        releaser.start();

        Thread.currentThread().interrupt();
        limiter.acquire(1);

        assertTrue(Thread.currentThread().isInterrupted());
        Thread.interrupted();
        releaser.join();
    }
}

core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java:142

  • The test calls Thread.interrupted() at line 142 which clears the interrupt flag, but this call's return value is not verified. Consider using assertTrue(Thread.interrupted()) to both clear the flag and verify it was set, making the test more explicit about validating the interrupt preservation behavior.
    core/src/test/java/kafka/automq/zerozone/RouterPermitLimiterTest.java:145
  • The test suite lacks direct test coverage for the release method. While it's used indirectly in the interrupt test, there are no explicit tests verifying that: 1) releasing permits increases the available count, 2) releasing zero or negative permits is handled correctly, 3) permits can be acquired after being released. Consider adding dedicated tests for the release functionality.
public class RouterPermitLimiterTest {

    @Test
    public void testAcquireClampsToMaxPermits() {
        Time time = new MockTime();
        Semaphore semaphore = new Semaphore(10);
        Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
            .newHistogram("RouterPermitLimiterAcquireFailTimeNanos");
        RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 10, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));

        int acquired = limiter.acquire(100);

        assertEquals(10, acquired);
        assertEquals(0, semaphore.availablePermits());
    }

    @Test
    public void testAcquireUpToUsesAvailablePermits() {
        Time time = new MockTime();
        Semaphore semaphore = new Semaphore(5);
        Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
            .newHistogram("RouterPermitLimiterAcquireUpToFailTimeNanos");
        RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 10, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));

        int acquired = limiter.acquireUpTo(8);

        assertEquals(5, acquired);
        assertEquals(0, semaphore.availablePermits());
    }

    @Test
    public void testAcquireUpToClampsToMaxPermits() {
        Time time = new MockTime();
        Semaphore semaphore = new Semaphore(10);
        Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
            .newHistogram("RouterPermitLimiterAcquireUpToClampNanos");
        RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 3, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));

        int acquired = limiter.acquireUpTo(8);

        assertEquals(3, acquired);
        assertEquals(7, semaphore.availablePermits());
    }

    @Test
    public void testAcquireUpToReturnsZeroWhenNoPermits() {
        Time time = new MockTime();
        Semaphore semaphore = new Semaphore(0);
        Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
            .newHistogram("RouterPermitLimiterAcquireUpToNoPermitsNanos");
        RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 5, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));

        int acquired = limiter.acquireUpTo(4);

        assertEquals(0, acquired);
        assertEquals(0, semaphore.availablePermits());
    }

    @Test
    public void testAcquireUpToIgnoresNonPositiveSize() {
        Time time = new MockTime();
        Semaphore semaphore = new Semaphore(5);
        Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
            .newHistogram("RouterPermitLimiterAcquireUpToNonPositiveNanos");
        RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 5, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));

        assertEquals(0, limiter.acquireUpTo(0));
        assertEquals(0, limiter.acquireUpTo(-1));
        assertEquals(5, semaphore.availablePermits());
    }

    @Test
    public void testAcquirePreservesInterrupt() throws Exception {
        Time time = new MockTime();
        Semaphore semaphore = new Semaphore(0);
        Histogram hist = new KafkaMetricsGroup(RouterPermitLimiterTest.class)
            .newHistogram("RouterPermitLimiterInterruptAcquireFailTimeNanos");
        RouterPermitLimiter limiter = new RouterPermitLimiter("[TEST]", time, 1, semaphore, hist, LoggerFactory.getLogger(RouterPermitLimiterTest.class));

        Thread releaser = new Thread(() -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ignored) {
            }
            semaphore.release(1);
        });
        releaser.start();

        Thread.currentThread().interrupt();
        limiter.acquire(1);

        assertTrue(Thread.currentThread().isInterrupted());
        Thread.interrupted();
        releaser.join();
    }
}

core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java:139

  • The tryPermitStatistics method, which handles logging and metrics clearing every 60 seconds, has no test coverage. While it's a private method called from acquire, there are no tests verifying: 1) statistics are logged at the correct interval, 2) the histogram is cleared after logging, 3) the compareAndSet logic prevents concurrent logging. Consider adding tests that use MockTime to advance time and verify this behavior.
                maxPermits
            );
            acquireFailTimeHist.clear();
        }
    }
}


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterInV2.java Outdated
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java Outdated
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java Outdated
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/java/kafka/automq/zerozone/RouterOutV2.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterOutV2.java Outdated
Comment thread core/src/main/java/kafka/automq/zerozone/RouterInV2.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterInV2.java Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Comment thread core/src/main/java/kafka/automq/zerozone/RouterPermitLimiter.java
Comment thread core/src/main/java/kafka/automq/zerozone/RouterInV2.java Outdated
Comment thread core/src/main/java/kafka/automq/zerozone/RouterInV2.java Outdated
superhx
superhx previously approved these changes Jan 20, 2026
@Gezi-lzq Gezi-lzq merged commit 4420b19 into main Jan 20, 2026
6 checks passed
@Gezi-lzq Gezi-lzq deleted the routerv2-limit branch January 20, 2026 08:04
Gezi-lzq added a commit that referenced this pull request Jan 20, 2026
…ts (#3166)

* feat(router): implement RouterPermitLimiter for managing append permits

* fix(router): update permit handling in RouterPermitLimiter and related classes

* fix(router): refactor permit handling to use environment variables for append permit size

* fix(zerozone): optimize permit acquisition logic in RouterInV2

* fix(router): remove unused Systems import from RouterPermitLimiter
Gezi-lzq added a commit that referenced this pull request Jan 20, 2026
…ts (#3166) (#3168)

* feat(router): implement RouterPermitLimiter for managing append permits

* fix(router): update permit handling in RouterPermitLimiter and related classes

* fix(router): refactor permit handling to use environment variables for append permit size

* fix(zerozone): optimize permit acquisition logic in RouterInV2

* fix(router): remove unused Systems import from RouterPermitLimiter
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants