Skip to content

Conversation

leynos
Copy link
Owner

@leynos leynos commented Jul 6, 2025

Summary

  • add leaky-bucket rate limiting to PushHandle
  • expose PushQueues::bounded_with_rate for custom limits
  • document usage in resilience guide and roadmap
  • test that pushes are throttled

Testing

  • make fmt
  • make lint
  • make test

https://chatgpt.com/codex/tasks/task_e_6869c6cd49608322abbf4694a14283bf

Summary by Sourcery

Implement configurable per-connection push rate limiting using a leaky-bucket algorithm, refactor push logic to enforce limits, and provide corresponding documentation and tests.

New Features:

  • Support per-connection push rate limiting via an optional leaky-bucket RateLimiter.
  • Expose PushQueues::bounded_with_rate for custom limits and bounded_unlimited to disable rate limiting.

Enhancements:

  • Integrate RateLimiter into PushHandleInner and centralize frame sending through a new push_with_priority helper.
  • Introduce a DEFAULT_PUSH_RATE constant and add the leaky-bucket crate dependency.

Build:

  • Update Cargo.toml to include the leaky-bucket dependency.

Documentation:

  • Document rate limiting usage in the resilience guide and roadmap, marking the feature complete in the roadmap.

Tests:

  • Add tests verifying throttling behavior when limits are exceeded, token refill after delay, shared limits across priorities, and unlimited queue behavior.

Copy link
Contributor

sourcery-ai bot commented Jul 6, 2025

Reviewer's Guide

This PR integrates a leaky-bucket rate limiter into the push subsystem by refactoring PushHandle to await rate tokens, exposing new constructors for custom and unlimited rate limits, updating default behavior, documenting the feature, and adding tests to verify throttling.

Sequence diagram for push operation with rate limiting

sequenceDiagram
    participant Producer
    participant PushHandle
    participant RateLimiter
    participant mpsc_Channel
    Producer->>PushHandle: push_high_priority(frame)
    alt RateLimiter configured
        PushHandle->>RateLimiter: acquire(1)
        RateLimiter-->>PushHandle: token granted
    end
    PushHandle->>mpsc_Channel: send(frame)
    mpsc_Channel-->>PushHandle: result
    PushHandle-->>Producer: Result
Loading

Class diagram for PushHandle and PushQueues with rate limiting

classDiagram
    class PushHandleInner {
        mpsc::Sender<F> high_prio_tx
        mpsc::Sender<F> low_prio_tx
        Option<RateLimiter> limiter
    }
    class PushHandle {
        +push_high_priority(frame: F) Result<(), PushError>
        +push_low_priority(frame: F) Result<(), PushError>
        +push_with_priority(frame: F, priority: PushPriority) Result<(), PushError>
        -Arc<PushHandleInner<F>>
    }
    class PushQueues {
        +bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>)
        +bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>)
        +bounded_with_rate(high_capacity: usize, low_capacity: usize, rate: Option<usize>) -> (Self, PushHandle<F>)
    }
    PushQueues -- PushHandle : creates
    PushHandle -- PushHandleInner : wraps
    PushHandleInner o-- RateLimiter : optional
    class RateLimiter {
        +acquire(n: usize)
    }
Loading

Class diagram for PushQueues constructors with rate limiting options

classDiagram
    class PushQueues {
        +bounded(high_capacity, low_capacity)
        +bounded_unlimited(high_capacity, low_capacity)
        +bounded_with_rate(high_capacity, low_capacity, rate: Option<usize>)
    }
    PushQueues : bounded() uses DEFAULT_PUSH_RATE
    PushQueues : bounded_unlimited() disables rate limiting
    PushQueues : bounded_with_rate() allows custom rate or disables
    PushQueues ..> PushHandle : returns
    PushQueues ..> RateLimiter : optional, via PushHandleInner
Loading

File-Level Changes

Change Details Files
Integrate leaky-bucket rate limiter into PushHandle
  • Add limiter: Option<RateLimiter> to PushHandleInner
  • Introduce push_with_priority helper that awaits limiter before sending
  • Refactor push_high_priority and push_low_priority to use helper
src/push.rs
Expose custom and unlimited rate configurations
  • Add DEFAULT_PUSH_RATE constant
  • Implement bounded_with_rate and bounded_unlimited constructors
  • Alias existing bounded to use default rate
src/push.rs
Add leaky-bucket crate dependency
  • Include leaky-bucket = "1.1" in Cargo.toml
Cargo.toml
Update documentation to cover rate limiting
  • Revise resilience guide example to use bounded_with_rate
  • Mark rate limiting as completed in the roadmap
docs/hardening-wireframe-a-guide-to-production-resilience.md
docs/asynchronous-outbound-messaging-roadmap.md
Add tests verifying throttling behavior
  • Test blocking when rate exceeded and recovery after refill
  • Ensure shared limiter across priorities
  • Verify unlimited queues do not block
tests/push.rs

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

coderabbitai bot commented Jul 6, 2025

Summary by CodeRabbit

  • New Features

    • Introduced optional per-connection rate limiting for push queues, allowing configuration of push rates or disabling rate limiting entirely.
    • Added new constructors for push queues to support default, custom, or unlimited rate limits.
  • Documentation

    • Updated examples and guides to reflect the new rate-limiting functionality.
    • Marked the per-connection rate limiting task as completed in the project roadmap.
  • Tests

    • Added comprehensive tests to verify correct behaviour of rate-limited and unlimited push queues under various scenarios.

Summary by CodeRabbit

  • New Features
    • Introduced optional rate limiting for push queues, allowing control over the number of pushes per second.
    • Added new constructors to create queues with or without rate limiting.
  • Documentation
    • Updated guides and examples to reflect the new rate-limiting options.
    • Marked the per-connection rate limiting task as completed in the project roadmap.
  • Tests
    • Added comprehensive tests to verify correct behaviour of rate-limited and unlimited push queues.

Walkthrough

The changes introduce per-connection rate limiting for outbound push queues using a leaky bucket algorithm. This includes updating documentation, adding the leaky-bucket crate dependency, modifying push queue constructors and methods to enforce rate limits, and adding tests to verify rate limiting behaviour.

Changes

File(s) Change Summary
Cargo.toml Added leaky-bucket crate dependency.
docs/asynchronous-outbound-messaging-roadmap.md Marked the "Per-connection rate limiting on pushes via a token bucket" task as completed.
docs/hardening-wireframe-a-guide-to-production-resilience.md Updated example to use internal PushQueues with optional rate limiting parameter instead of external limiter.
src/push.rs Added optional leaky bucket rate limiting to push queues; new constructors, updated push methods, constants added.
tests/push.rs Added async tests verifying rate limiter blocks and allows pushes as expected, including priority and unlimited cases.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant PushHandle
    participant RateLimiter
    participant PushQueue

    User->>PushHandle: push_high_priority(frame)
    PushHandle->>RateLimiter: acquire(1).await
    RateLimiter-->>PushHandle: token granted (or waits)
    PushHandle->>PushQueue: send frame
    PushQueue-->>User: Result
Loading

Possibly related PRs

Poem

A bucket with leaks, but tokens in tow,
Now outbound frames must wait their turn to go.
Push queues with patience, tests running tight,
Rate limits enforce the traffic just right.
🐇✨

Slow and steady, the packets flow—
Wireframe’s resilience continues to grow!


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4bf21ab and 6c0d47b.

📒 Files selected for processing (2)
  • src/push.rs (7 hunks)
  • tests/push.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
`**/*.rs`: Comment why, not what. Explain assumptions, edge cases, trade-offs, o...

**/*.rs: Comment why, not what. Explain assumptions, edge cases, trade-offs, or complexity. Don't echo the obvious.
Comments must use en-GB-oxendict spelling and grammar.
Function documentation must include clear examples.
Every module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.
Do not use return in single-line functions.
Use predicate functions for conditional criteria with more than two branches.
Lints must not be silenced except as a last resort.
Lint rule suppressions must be tightly scoped and include a clear reason.
Prefer expect over allow.
Prefer .expect() over .unwrap().
Clippy warnings MUST be disallowed.
Fix any warnings emitted during tests in the code itself rather than silencing them.
Where a function is too long, extract meaningfully named helper functions adhering to separation of concerns and CQRS.
Where a function has too many parameters, group related parameters in meaningfully named structs.
Where a function is returning a large error consider using Arc to reduce the amount of data returned.
Write unit and behavioural tests for new functionality. Run both before and after making any change.
Prefer immutable data and avoid unnecessary mut bindings.
Handle errors with the Result type instead of panicking where feasible.
Avoid unsafe code unless absolutely necessary and document any usage clearly.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • tests/push.rs
  • src/push.rs
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments must use en-GB-oxendict spelling and grammar.

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • tests/push.rs
  • src/push.rs
⏰ Context from checks skipped due to timeout of 240000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build-test
🔇 Additional comments (12)
tests/push.rs (6)

5-5: LGTM: rstest import supports parameterised testing.

Good addition that enables the parameterised test approach used in the rate limiting tests, following the coding guidelines for using rstest fixtures.


53-86: Well-structured parameterised test eliminates duplication.

This test effectively consolidates the previous duplicated rate limiting tests using rstest parameterisation. The logic correctly verifies that rate limiting blocks excess pushes and allows them after token refill. The time control usage is appropriate for deterministic testing.


88-99: Good coverage of basic rate limiting flow.

This test verifies the positive case where pushes are allowed after waiting for token refill. The logic is clear and focused on a single concern.


101-119: Excellent test for cross-priority rate limiting.

This test validates an important aspect - that the rate limiter is shared across priority queues rather than being per-priority. The explicit priority assertions provide good verification.


121-132: Good validation of unlimited queue behaviour.

This test ensures that queues created without rate limiting behave as expected and don't introduce unexpected blocking. The use of bounded_no_rate_limit properly exercises that code path.


134-156: Thorough test of burst capacity behaviour.

This test validates the important burst capacity feature where multiple pushes within the capacity limit succeed immediately, but excess pushes are blocked. The iteration and frame verification are well-implemented.

src/push.rs (6)

9-12: Good import additions for rate limiting functionality.

The new imports properly support the rate limiting features - Duration for time-based operations and Arc/Weak for shared ownership patterns already used in the codebase.


14-14: Appropriate external dependency for rate limiting.

Using the leaky-bucket crate provides a well-tested implementation of the token bucket algorithm, which is suitable for this use case.


25-28: Sensible rate limiting constants with clear documentation.

The default rate of 100 pushes per second is reasonable for most use cases, and the maximum of 10,000 provides a sensible upper bound to prevent misconfiguration.


71-71: Clean integration of optional rate limiter.

The Option<RateLimiter> field allows for optional rate limiting without affecting the existing API when rate limiting is disabled.


225-227: Good default behaviour with rate limiting enabled.

Enabling rate limiting by default with a sensible rate provides protection against abuse whilst maintaining backwards compatibility through the same method signature.


285-292: Well-configured rate limiter implementation.

The leaky-bucket configuration is sensible:

  • Initial tokens equal to the rate allows for bursts
  • Refill rate matches the specified rate
  • 1-second interval is appropriate for per-second rate limiting
  • Maximum capacity prevents excessive token accumulation

This provides good burst handling whilst enforcing the overall rate limit.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch codex/implement-per-connection-rate-limiting

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes - here's some feedback:

  • You only test rate limiting for high‐priority pushes—add a test to verify that low‐priority pushes (and the combined limit across both queues) are also properly rate-limited.
  • Consider making the DEFAULT_PUSH_RATE configurable at runtime or providing an option to disable rate limiting altogether, rather than hard-coding it in the constant.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- You only test rate limiting for high‐priority pushes—add a test to verify that low‐priority pushes (and the combined limit across both queues) are also properly rate-limited.
- Consider making the DEFAULT_PUSH_RATE configurable at runtime or providing an option to disable rate limiting altogether, rather than hard-coding it in the constant.

## Individual Comments

### Comment 1
<location> `src/push.rs:25` </location>
<code_context>

 impl<T> FrameLike for T where T: Send + 'static {}

+/// Default maximum pushes allowed per second when no custom rate is specified.
+const DEFAULT_PUSH_RATE: usize = 100;
+
</code_context>

<issue_to_address>
The module is missing a `//!` comment at the top as required by the review instructions.

Please add a `//!` module-level comment at the very top of the file to describe the module's purpose, as per the review instructions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@leynos
Copy link
Owner Author

leynos commented Jul 6, 2025

@coderabbitai resume

Copy link
Contributor

coderabbitai bot commented Jul 6, 2025

✅ Actions performed

Reviews resumed.

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (2 files with Code Duplication)

Gates Passed
5 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
push.rs 1 advisory rule 10.00 → 9.39 Suppress
push.rs 1 advisory rule 10.00 → 9.39 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (1 file with Code Duplication)

Gates Passed
5 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
push.rs 1 advisory rule 10.00 → 9.39 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

@leynos
Copy link
Owner Author

leynos commented Jul 6, 2025

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `src/push.rs:260` </location>
<code_context>
+    ) -> (Self, PushHandle<F>) {
         let (high_tx, high_rx) = mpsc::channel(high_capacity);
         let (low_tx, low_rx) = mpsc::channel(low_capacity);
+        let limiter = rate.map(|r| {
+            RateLimiter::builder()
+                .initial(r)
+                .refill(r)
+                .interval(Duration::from_secs(1))
+                .max(r)
+                .build()
+        });
         let inner = PushHandleInner {
</code_context>

<issue_to_address>
No validation for zero or extremely high rate values.

Validating the rate parameter and handling out-of-bounds values will prevent unexpected limiter behavior.
</issue_to_address>

### Comment 2
<location> `src/push.rs:229` </location>
<code_context>
+
+    /// Create queues without any rate limiting.
+    #[must_use]
+    pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
+        Self::bounded_with_rate(high_capacity, low_capacity, None)
+    }
</code_context>

<issue_to_address>
The name 'bounded_unlimited' may be confusing.

Consider renaming to 'bounded_no_rate_limit' or 'bounded_without_rate_limit' for better clarity.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
    /// Create queues without any rate limiting.
    #[must_use]
    pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
        Self::bounded_with_rate(high_capacity, low_capacity, None)
    }
=======
    /// Create queues with no rate limiting.
    #[must_use]
    pub fn bounded_no_rate_limit(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
        Self::bounded_with_rate(high_capacity, low_capacity, None)
    }
>>>>>>> REPLACE

</suggested_fix>

### Comment 3
<location> `tests/push.rs:9` </location>
<code_context>
+use tokio::time::{self, Duration};
 use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};

 #[tokio::test]
</code_context>

<issue_to_address>
Consider adding a test for burst pushes within the same interval.

Adding such a test would verify that the limiter correctly permits bursts up to its capacity and blocks excess pushes within the same interval.

Suggested implementation:

```rust
use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};

#[tokio::test]
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
    // Assume a rate limiter with a burst capacity of 3 per interval.
    // Adjust these values if your PushQueues or PushPolicy is configured differently.
    let burst_capacity = 3;
    let interval = Duration::from_millis(100);

    let mut queues = PushQueues::new(PushPolicy {
        high_rate: burst_capacity as u32,
        high_interval: interval,
        low_rate: burst_capacity as u32,
        low_interval: interval,
    });

    // Fill the burst capacity
    for i in 0..burst_capacity {
        let res = queues.push(PushPriority::High, i).await;
        assert!(res.is_ok(), "Push within burst capacity should succeed");
    }

    // This push should be blocked or error, as it exceeds the burst capacity within the interval
    let res = queues.push(PushPriority::High, 99).await;
    assert!(
        matches!(res, Err(PushError::RateLimited)),
        "Push exceeding burst capacity should be rate limited"
    );

    // Advance time to the next interval and ensure we can push again
    time::advance(interval).await;
    let res = queues.push(PushPriority::High, 100).await;
    assert!(res.is_ok(), "Push after interval should succeed");
}

#[tokio::test]
    let res = handle.push_low_priority(24u8).await;
    assert!(matches!(res, Err(PushError::Closed)));
}

```

- If your `PushQueues::new` or `PushPolicy` constructors differ, adjust the instantiation accordingly.
- If the error variant for rate limiting is not `PushError::RateLimited`, replace it with the correct one.
- If your push method signature or usage is different, adapt the test to match your API.
</issue_to_address>

### Comment 4
<location> `src/push.rs:25` </location>
<code_context>

 impl<T> FrameLike for T where T: Send + 'static {}

+/// Default maximum pushes allowed per second when no custom rate is specified.
+const DEFAULT_PUSH_RATE: usize = 100;
+
</code_context>

<issue_to_address>
Module-level `//!` doc comment is missing at the top of the file.

Please add a `//!` doc comment at the very top of the module to describe its purpose, as required by the review instructions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +260 to +266
let limiter = rate.map(|r| {
RateLimiter::builder()
.initial(r)
.refill(r)
.interval(Duration::from_secs(1))
.max(r)
.build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: No validation for zero or extremely high rate values.

Validating the rate parameter and handling out-of-bounds values will prevent unexpected limiter behavior.

src/push.rs Outdated
Comment on lines 227 to 231
/// Create queues without any rate limiting.
#[must_use]
pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate(high_capacity, low_capacity, None)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: The name 'bounded_unlimited' may be confusing.

Consider renaming to 'bounded_no_rate_limit' or 'bounded_without_rate_limit' for better clarity.

Suggested change
/// Create queues without any rate limiting.
#[must_use]
pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate(high_capacity, low_capacity, None)
}
/// Create queues with no rate limiting.
#[must_use]
pub fn bounded_no_rate_limit(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate(high_capacity, low_capacity, None)
}

tests/push.rs Outdated
Comment on lines 49 to 58
let res = handle.push_low_priority(24u8).await;
assert!(matches!(res, Err(PushError::Closed)));
}

#[rstest]
#[case::high(PushPriority::High, "second push should block")]
#[case::low(PushPriority::Low, "")]
#[tokio::test]
async fn rate_limiter_blocks_when_exceeded(
#[case] priority: PushPriority,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a test for burst pushes within the same interval.

Adding such a test would verify that the limiter correctly permits bursts up to its capacity and blocks excess pushes within the same interval.

Suggested implementation:

use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};

#[tokio::test]
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
    // Assume a rate limiter with a burst capacity of 3 per interval.
    // Adjust these values if your PushQueues or PushPolicy is configured differently.
    let burst_capacity = 3;
    let interval = Duration::from_millis(100);

    let mut queues = PushQueues::new(PushPolicy {
        high_rate: burst_capacity as u32,
        high_interval: interval,
        low_rate: burst_capacity as u32,
        low_interval: interval,
    });

    // Fill the burst capacity
    for i in 0..burst_capacity {
        let res = queues.push(PushPriority::High, i).await;
        assert!(res.is_ok(), "Push within burst capacity should succeed");
    }

    // This push should be blocked or error, as it exceeds the burst capacity within the interval
    let res = queues.push(PushPriority::High, 99).await;
    assert!(
        matches!(res, Err(PushError::RateLimited)),
        "Push exceeding burst capacity should be rate limited"
    );

    // Advance time to the next interval and ensure we can push again
    time::advance(interval).await;
    let res = queues.push(PushPriority::High, 100).await;
    assert!(res.is_ok(), "Push after interval should succeed");
}

#[tokio::test]
    let res = handle.push_low_priority(24u8).await;
    assert!(matches!(res, Err(PushError::Closed)));
}
  • If your PushQueues::new or PushPolicy constructors differ, adjust the instantiation accordingly.
  • If the error variant for rate limiting is not PushError::RateLimited, replace it with the correct one.
  • If your push method signature or usage is different, adapt the test to match your API.


impl<T> FrameLike for T where T: Send + 'static {}

/// Default maximum pushes allowed per second when no custom rate is specified.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (review_instructions): Module-level //! doc comment is missing at the top of the file.

Please add a //! doc comment at the very top of the module to describe its purpose, as required by the review instructions.

Review instructions:

Path patterns: **/*.rs

Instructions:
Every module must begin with a //! comment.

@leynos
Copy link
Owner Author

leynos commented Jul 6, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jul 6, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@leynos
Copy link
Owner Author

leynos commented Jul 7, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jul 7, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20ae9ef and 4bf21ab.

📒 Files selected for processing (3)
  • docs/hardening-wireframe-a-guide-to-production-resilience.md (1 hunks)
  • src/push.rs (7 hunks)
  • tests/push.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
`**/*.md`: * Avoid 2nd person or 1st person pronouns ("I", "you", "we") * Use en...

**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")

  • Use en-oxendic spelling and grammar.
  • Paragraphs and bullets must be wrapped to 80 columns, except where a long URL would prevent this (in which case, silence MD013 for that line)
  • Code blocks should be wrapped to 120 columns.
  • Headings must not be wrapped.

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • docs/hardening-wireframe-a-guide-to-production-resilience.md
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments must use en-GB-oxendict spelling and grammar.

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • tests/push.rs
  • src/push.rs
🔇 Additional comments (9)
docs/hardening-wireframe-a-guide-to-production-resilience.md (1)

242-260: Documentation accurately reflects the new rate limiting API.

The updated examples correctly demonstrate the new PushQueues constructors with optional rate limiting. The code samples align with the implementation and provide clear usage patterns for both rate-limited and unlimited queues.

tests/push.rs (3)

5-5: Good addition of rstest import for parameterised testing.

This addresses the previous code duplication concerns by enabling parameterised tests, which aligns with the coding guidelines preference for using rstest fixtures.


142-163: Excellent test coverage for burst capacity behaviour.

This test effectively validates the leaky bucket's burst capacity and rate limiting behaviour. The test correctly verifies that pushes within the burst limit succeed immediately, whilst excess pushes are blocked until tokens refill.


128-139: Unlimited queues correctly skip rate limiting

The bounded_no_rate_limit constructor forwards rate=None to bounded_with_rate, and push_with_priority only awaits when the limiter is Some(_). With no limiter present, neither push_high_priority nor push_low_priority will ever await a token, so the test’s use of time::pause() and a timeout accurately verifies that unlimited queues do not block. No changes are needed.

src/push.rs (5)

1-8: Excellent module documentation.

The module-level doc comment clearly explains the purpose and utility of the push queues, addressing the previous review requirement for //! comments.


25-28: Well-defined constants for rate limiting.

The constants provide sensible defaults and limits. The 10,000 pushes per second maximum is reasonable for preventing resource exhaustion whilst allowing high throughput.


81-94: Excellent refactoring to eliminate code duplication.

The push_with_priority helper method successfully eliminates the duplication between push_high_priority and push_low_priority whilst maintaining clear separation of concerns. The rate limiting integration is clean and non-intrusive.


240-245: Consider method name consistency.

The method name bounded_no_rate_limit clearly conveys its purpose and is an improvement over the previously suggested bounded_unlimited which could be confusing.


285-292: Confirm rate limiter configuration semantics

The RateLimiter::builder() is currently set up as follows in src/push.rs (lines 285–292):

RateLimiter::builder()
    .initial(r)  // r tokens available immediately on creation
    .refill(r)   // r tokens added every interval (1 sec)
    .interval(Duration::from_secs(1))
    .max(r)      // bucket capacity capped at r tokens
    .build()

This means you start with r tokens, allow bursts up to r tokens, and replenish at r tokens/second. Please verify that:

  • An initial burst equal to r is intended
  • A maximum burst capacity of r aligns with your throughput requirements
  • A refill rate of r per second matches the desired sustained rate

If any of these differ from the design (e.g. starting empty or allowing a larger burst), adjust initial and/or max accordingly.

@leynos leynos merged commit 55cebdb into main Jul 7, 2025
5 checks passed
@leynos leynos deleted the codex/implement-per-connection-rate-limiting branch July 7, 2025 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant