-
Notifications
You must be signed in to change notification settings - Fork 0
Implement push rate limiter #185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR integrates a leaky-bucket rate limiter into the push subsystem by refactoring PushHandle to await rate tokens, exposing new constructors for custom and unlimited rate limits, updating default behavior, documenting the feature, and adding tests to verify throttling. Sequence diagram for push operation with rate limitingsequenceDiagram
participant Producer
participant PushHandle
participant RateLimiter
participant mpsc_Channel
Producer->>PushHandle: push_high_priority(frame)
alt RateLimiter configured
PushHandle->>RateLimiter: acquire(1)
RateLimiter-->>PushHandle: token granted
end
PushHandle->>mpsc_Channel: send(frame)
mpsc_Channel-->>PushHandle: result
PushHandle-->>Producer: Result
Class diagram for PushHandle and PushQueues with rate limitingclassDiagram
class PushHandleInner {
mpsc::Sender<F> high_prio_tx
mpsc::Sender<F> low_prio_tx
Option<RateLimiter> limiter
}
class PushHandle {
+push_high_priority(frame: F) Result<(), PushError>
+push_low_priority(frame: F) Result<(), PushError>
+push_with_priority(frame: F, priority: PushPriority) Result<(), PushError>
-Arc<PushHandleInner<F>>
}
class PushQueues {
+bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>)
+bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>)
+bounded_with_rate(high_capacity: usize, low_capacity: usize, rate: Option<usize>) -> (Self, PushHandle<F>)
}
PushQueues -- PushHandle : creates
PushHandle -- PushHandleInner : wraps
PushHandleInner o-- RateLimiter : optional
class RateLimiter {
+acquire(n: usize)
}
Class diagram for PushQueues constructors with rate limiting optionsclassDiagram
class PushQueues {
+bounded(high_capacity, low_capacity)
+bounded_unlimited(high_capacity, low_capacity)
+bounded_with_rate(high_capacity, low_capacity, rate: Option<usize>)
}
PushQueues : bounded() uses DEFAULT_PUSH_RATE
PushQueues : bounded_unlimited() disables rate limiting
PushQueues : bounded_with_rate() allows custom rate or disables
PushQueues ..> PushHandle : returns
PushQueues ..> RateLimiter : optional, via PushHandleInner
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Summary by CodeRabbit
Summary by CodeRabbit
WalkthroughThe changes introduce per-connection rate limiting for outbound push queues using a leaky bucket algorithm. This includes updating documentation, adding the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant PushHandle
participant RateLimiter
participant PushQueue
User->>PushHandle: push_high_priority(frame)
PushHandle->>RateLimiter: acquire(1).await
RateLimiter-->>PushHandle: token granted (or waits)
PushHandle->>PushQueue: send frame
PushQueue-->>User: Result
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🧰 Additional context used📓 Path-based instructions (2)`**/*.rs`: Comment why, not what. Explain assumptions, edge cases, trade-offs, o...
📄 Source: CodeRabbit Inference Engine (AGENTS.md) List of files the instruction was applied to:
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...
⚙️ Source: CodeRabbit Configuration File List of files the instruction was applied to:
⏰ Context from checks skipped due to timeout of 240000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (12)
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- You only test rate limiting for high‐priority pushes—add a test to verify that low‐priority pushes (and the combined limit across both queues) are also properly rate-limited.
- Consider making the DEFAULT_PUSH_RATE configurable at runtime or providing an option to disable rate limiting altogether, rather than hard-coding it in the constant.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- You only test rate limiting for high‐priority pushes—add a test to verify that low‐priority pushes (and the combined limit across both queues) are also properly rate-limited.
- Consider making the DEFAULT_PUSH_RATE configurable at runtime or providing an option to disable rate limiting altogether, rather than hard-coding it in the constant.
## Individual Comments
### Comment 1
<location> `src/push.rs:25` </location>
<code_context>
impl<T> FrameLike for T where T: Send + 'static {}
+/// Default maximum pushes allowed per second when no custom rate is specified.
+const DEFAULT_PUSH_RATE: usize = 100;
+
</code_context>
<issue_to_address>
The module is missing a `//!` comment at the top as required by the review instructions.
Please add a `//!` module-level comment at the very top of the file to describe the module's purpose, as per the review instructions.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gates Failed
Enforce advisory code health rules
(2 files with Code Duplication)
Gates Passed
5 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
Enforce advisory code health rules | Violations | Code Health Impact | |
---|---|---|---|
push.rs | 1 advisory rule | 10.00 → 9.39 | Suppress |
push.rs | 1 advisory rule | 10.00 → 9.39 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gates Failed
Enforce advisory code health rules
(1 file with Code Duplication)
Gates Passed
5 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
Enforce advisory code health rules | Violations | Code Health Impact | |
---|---|---|---|
push.rs | 1 advisory rule | 10.00 → 9.39 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/push.rs:260` </location>
<code_context>
+ ) -> (Self, PushHandle<F>) {
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
+ let limiter = rate.map(|r| {
+ RateLimiter::builder()
+ .initial(r)
+ .refill(r)
+ .interval(Duration::from_secs(1))
+ .max(r)
+ .build()
+ });
let inner = PushHandleInner {
</code_context>
<issue_to_address>
No validation for zero or extremely high rate values.
Validating the rate parameter and handling out-of-bounds values will prevent unexpected limiter behavior.
</issue_to_address>
### Comment 2
<location> `src/push.rs:229` </location>
<code_context>
+
+ /// Create queues without any rate limiting.
+ #[must_use]
+ pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
+ Self::bounded_with_rate(high_capacity, low_capacity, None)
+ }
</code_context>
<issue_to_address>
The name 'bounded_unlimited' may be confusing.
Consider renaming to 'bounded_no_rate_limit' or 'bounded_without_rate_limit' for better clarity.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
/// Create queues without any rate limiting.
#[must_use]
pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate(high_capacity, low_capacity, None)
}
=======
/// Create queues with no rate limiting.
#[must_use]
pub fn bounded_no_rate_limit(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate(high_capacity, low_capacity, None)
}
>>>>>>> REPLACE
</suggested_fix>
### Comment 3
<location> `tests/push.rs:9` </location>
<code_context>
+use tokio::time::{self, Duration};
use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};
#[tokio::test]
</code_context>
<issue_to_address>
Consider adding a test for burst pushes within the same interval.
Adding such a test would verify that the limiter correctly permits bursts up to its capacity and blocks excess pushes within the same interval.
Suggested implementation:
```rust
use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};
#[tokio::test]
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
// Assume a rate limiter with a burst capacity of 3 per interval.
// Adjust these values if your PushQueues or PushPolicy is configured differently.
let burst_capacity = 3;
let interval = Duration::from_millis(100);
let mut queues = PushQueues::new(PushPolicy {
high_rate: burst_capacity as u32,
high_interval: interval,
low_rate: burst_capacity as u32,
low_interval: interval,
});
// Fill the burst capacity
for i in 0..burst_capacity {
let res = queues.push(PushPriority::High, i).await;
assert!(res.is_ok(), "Push within burst capacity should succeed");
}
// This push should be blocked or error, as it exceeds the burst capacity within the interval
let res = queues.push(PushPriority::High, 99).await;
assert!(
matches!(res, Err(PushError::RateLimited)),
"Push exceeding burst capacity should be rate limited"
);
// Advance time to the next interval and ensure we can push again
time::advance(interval).await;
let res = queues.push(PushPriority::High, 100).await;
assert!(res.is_ok(), "Push after interval should succeed");
}
#[tokio::test]
let res = handle.push_low_priority(24u8).await;
assert!(matches!(res, Err(PushError::Closed)));
}
```
- If your `PushQueues::new` or `PushPolicy` constructors differ, adjust the instantiation accordingly.
- If the error variant for rate limiting is not `PushError::RateLimited`, replace it with the correct one.
- If your push method signature or usage is different, adapt the test to match your API.
</issue_to_address>
### Comment 4
<location> `src/push.rs:25` </location>
<code_context>
impl<T> FrameLike for T where T: Send + 'static {}
+/// Default maximum pushes allowed per second when no custom rate is specified.
+const DEFAULT_PUSH_RATE: usize = 100;
+
</code_context>
<issue_to_address>
Module-level `//!` doc comment is missing at the top of the file.
Please add a `//!` doc comment at the very top of the module to describe its purpose, as required by the review instructions.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
let limiter = rate.map(|r| { | ||
RateLimiter::builder() | ||
.initial(r) | ||
.refill(r) | ||
.interval(Duration::from_secs(1)) | ||
.max(r) | ||
.build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: No validation for zero or extremely high rate values.
Validating the rate parameter and handling out-of-bounds values will prevent unexpected limiter behavior.
src/push.rs
Outdated
/// Create queues without any rate limiting. | ||
#[must_use] | ||
pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) { | ||
Self::bounded_with_rate(high_capacity, low_capacity, None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The name 'bounded_unlimited' may be confusing.
Consider renaming to 'bounded_no_rate_limit' or 'bounded_without_rate_limit' for better clarity.
/// Create queues without any rate limiting. | |
#[must_use] | |
pub fn bounded_unlimited(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) { | |
Self::bounded_with_rate(high_capacity, low_capacity, None) | |
} | |
/// Create queues with no rate limiting. | |
#[must_use] | |
pub fn bounded_no_rate_limit(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) { | |
Self::bounded_with_rate(high_capacity, low_capacity, None) | |
} |
tests/push.rs
Outdated
let res = handle.push_low_priority(24u8).await; | ||
assert!(matches!(res, Err(PushError::Closed))); | ||
} | ||
|
||
#[rstest] | ||
#[case::high(PushPriority::High, "second push should block")] | ||
#[case::low(PushPriority::Low, "")] | ||
#[tokio::test] | ||
async fn rate_limiter_blocks_when_exceeded( | ||
#[case] priority: PushPriority, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider adding a test for burst pushes within the same interval.
Adding such a test would verify that the limiter correctly permits bursts up to its capacity and blocks excess pushes within the same interval.
Suggested implementation:
use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};
#[tokio::test]
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
// Assume a rate limiter with a burst capacity of 3 per interval.
// Adjust these values if your PushQueues or PushPolicy is configured differently.
let burst_capacity = 3;
let interval = Duration::from_millis(100);
let mut queues = PushQueues::new(PushPolicy {
high_rate: burst_capacity as u32,
high_interval: interval,
low_rate: burst_capacity as u32,
low_interval: interval,
});
// Fill the burst capacity
for i in 0..burst_capacity {
let res = queues.push(PushPriority::High, i).await;
assert!(res.is_ok(), "Push within burst capacity should succeed");
}
// This push should be blocked or error, as it exceeds the burst capacity within the interval
let res = queues.push(PushPriority::High, 99).await;
assert!(
matches!(res, Err(PushError::RateLimited)),
"Push exceeding burst capacity should be rate limited"
);
// Advance time to the next interval and ensure we can push again
time::advance(interval).await;
let res = queues.push(PushPriority::High, 100).await;
assert!(res.is_ok(), "Push after interval should succeed");
}
#[tokio::test]
let res = handle.push_low_priority(24u8).await;
assert!(matches!(res, Err(PushError::Closed)));
}
- If your
PushQueues::new
orPushPolicy
constructors differ, adjust the instantiation accordingly. - If the error variant for rate limiting is not
PushError::RateLimited
, replace it with the correct one. - If your push method signature or usage is different, adapt the test to match your API.
|
||
impl<T> FrameLike for T where T: Send + 'static {} | ||
|
||
/// Default maximum pushes allowed per second when no custom rate is specified. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (review_instructions): Module-level //!
doc comment is missing at the top of the file.
Please add a //!
doc comment at the very top of the module to describe its purpose, as required by the review instructions.
Review instructions:
Path patterns: **/*.rs
Instructions:
Every module must begin with a //!
comment.
@coderabbitai review |
✅ Actions performedReview triggered.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
docs/hardening-wireframe-a-guide-to-production-resilience.md
(1 hunks)src/push.rs
(7 hunks)tests/push.rs
(2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
`**/*.md`: * Avoid 2nd person or 1st person pronouns ("I", "you", "we") * Use en...
**/*.md
: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-oxendic spelling and grammar.
- Paragraphs and bullets must be wrapped to 80 columns, except where a long URL would prevent this (in which case, silence MD013 for that line)
- Code blocks should be wrapped to 120 columns.
- Headings must not be wrapped.
⚙️ Source: CodeRabbit Configuration File
List of files the instruction was applied to:
docs/hardening-wireframe-a-guide-to-production-resilience.md
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...
**/*.rs
: * Seek to keep the cyclomatic complexity of functions no more than 12.
Adhere to single responsibility and CQRS
Place function attributes after doc comments.
Do not use
return
in single-line functions.Move conditionals with >2 branches into a predicate function.
Avoid
unsafe
unless absolutely necessary.Every module must begin with a
//!
doc comment that explains the module's purpose and utility.Comments must use en-GB-oxendict spelling and grammar.
Lints must not be silenced except as a last resort.
#[allow]
is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]
is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:
with link if a fix is expected.Use
rstest
fixtures for shared setup and to avoid repetition between tests.Replace duplicated tests with
#[rstest(...)]
parameterised cases.Prefer
mockall
for mocks/stubs.Prefer
.expect()
over.unwrap()
Ensure that any API or behavioural changes are reflected in the documentation in
docs/
Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/
⚙️ Source: CodeRabbit Configuration File
List of files the instruction was applied to:
tests/push.rs
src/push.rs
🔇 Additional comments (9)
docs/hardening-wireframe-a-guide-to-production-resilience.md (1)
242-260
: Documentation accurately reflects the new rate limiting API.The updated examples correctly demonstrate the new
PushQueues
constructors with optional rate limiting. The code samples align with the implementation and provide clear usage patterns for both rate-limited and unlimited queues.tests/push.rs (3)
5-5
: Good addition of rstest import for parameterised testing.This addresses the previous code duplication concerns by enabling parameterised tests, which aligns with the coding guidelines preference for using rstest fixtures.
142-163
: Excellent test coverage for burst capacity behaviour.This test effectively validates the leaky bucket's burst capacity and rate limiting behaviour. The test correctly verifies that pushes within the burst limit succeed immediately, whilst excess pushes are blocked until tokens refill.
128-139
: Unlimited queues correctly skip rate limitingThe
bounded_no_rate_limit
constructor forwardsrate=None
tobounded_with_rate
, andpush_with_priority
only awaits when the limiter isSome(_)
. With no limiter present, neitherpush_high_priority
norpush_low_priority
will ever await a token, so the test’s use oftime::pause()
and a timeout accurately verifies that unlimited queues do not block. No changes are needed.src/push.rs (5)
1-8
: Excellent module documentation.The module-level doc comment clearly explains the purpose and utility of the push queues, addressing the previous review requirement for
//!
comments.
25-28
: Well-defined constants for rate limiting.The constants provide sensible defaults and limits. The 10,000 pushes per second maximum is reasonable for preventing resource exhaustion whilst allowing high throughput.
81-94
: Excellent refactoring to eliminate code duplication.The
push_with_priority
helper method successfully eliminates the duplication betweenpush_high_priority
andpush_low_priority
whilst maintaining clear separation of concerns. The rate limiting integration is clean and non-intrusive.
240-245
: Consider method name consistency.The method name
bounded_no_rate_limit
clearly conveys its purpose and is an improvement over the previously suggestedbounded_unlimited
which could be confusing.
285-292
: Confirm rate limiter configuration semanticsThe
RateLimiter::builder()
is currently set up as follows in src/push.rs (lines 285–292):RateLimiter::builder() .initial(r) // r tokens available immediately on creation .refill(r) // r tokens added every interval (1 sec) .interval(Duration::from_secs(1)) .max(r) // bucket capacity capped at r tokens .build()This means you start with r tokens, allow bursts up to r tokens, and replenish at r tokens/second. Please verify that:
- An initial burst equal to r is intended
- A maximum burst capacity of r aligns with your throughput requirements
- A refill rate of r per second matches the desired sustained rate
If any of these differ from the design (e.g. starting empty or allowing a larger burst), adjust
initial
and/ormax
accordingly.
Summary
PushHandle
PushQueues::bounded_with_rate
for custom limitsTesting
make fmt
make lint
make test
https://chatgpt.com/codex/tasks/task_e_6869c6cd49608322abbf4694a14283bf
Summary by Sourcery
Implement configurable per-connection push rate limiting using a leaky-bucket algorithm, refactor push logic to enforce limits, and provide corresponding documentation and tests.
New Features:
Enhancements:
Build:
Documentation:
Tests: