Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,28 @@ let app = WireframeApp::new()?
.frame_processor(LengthPrefixedProcessor::new(LengthFormat::u16_le()));
```

## Push Queues

Push queues buffer frames before they are written to a connection. Configure
them with capacities, rate limits, and an optional dead-letter queue:

```rust,no_run
use tokio::sync::mpsc;
use wireframe::push::PushQueues;

# async fn demo() {
let (dlq_tx, _dlq_rx) = mpsc::channel(8);
let (_queues, _handle) = PushQueues::<u8>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(Some(100)) // pass None to disable rate limiting
.dlq(Some(dlq_tx)) // frames drop if the DLQ is absent or full
.build()
.expect("failed to build PushQueues");
# drop((_queues, _handle));
# }
```

## Connection Lifecycle

Protocol callbacks are consolidated under the `WireframeProtocol` trait,
Expand Down
12 changes: 10 additions & 2 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,20 @@ classDiagram
class PushQueues~F~ {
+high_priority_rx: mpsc::Receiver<F>
+low_priority_rx: mpsc::Receiver<F>
+bounded(high_capacity: usize, low_capacity: usize): (PushQueues~F~, PushHandle~F~)
+builder(): PushQueuesBuilder~F~
+recv(): Option<(PushPriority, F)>
}
class PushQueuesBuilder~F~ {
+high_capacity(cap: usize): PushQueuesBuilder~F~
+low_capacity(cap: usize): PushQueuesBuilder~F~
+rate(rate: Option<usize>): PushQueuesBuilder~F~
+dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
+build(): (PushQueues~F~, PushHandle~F~)
}
Comment on lines +356 to +365
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix builder signature to return Result in public API diagram.

Docs elsewhere and code use build() -> Result<(PushQueues, PushHandle), PushConfigError>. Keep the surface consistent here.

-    class PushQueuesBuilder~F~ {
-        +high_capacity(cap: usize): PushQueuesBuilder~F~
-        +low_capacity(cap: usize): PushQueuesBuilder~F~
-        +rate(rate: Option<usize>): PushQueuesBuilder~F~
-        +dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
-        +build(): (PushQueues~F~, PushHandle~F~)
-    }
+    class PushQueuesBuilder~F~ {
+        +high_capacity(cap: usize): PushQueuesBuilder~F~
+        +low_capacity(cap: usize): PushQueuesBuilder~F~
+        +rate(rate: Option<usize>): PushQueuesBuilder~F~
+        +dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
+        +build(): Result<(PushQueues~F~, PushHandle~F~), PushConfigError>
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
+builder(): PushQueuesBuilder~F~
+recv(): Option<(PushPriority, F)>
}
class PushQueuesBuilder~F~ {
+high_capacity(cap: usize): PushQueuesBuilder~F~
+low_capacity(cap: usize): PushQueuesBuilder~F~
+rate(rate: Option<usize>): PushQueuesBuilder~F~
+dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
+build(): (PushQueues~F~, PushHandle~F~)
}
builder(): PushQueuesBuilder~F~
recv(): Option<(PushPriority, F)>
}
class PushQueuesBuilder~F~ {
high_capacity(cap: usize): PushQueuesBuilder~F~
low_capacity(cap: usize): PushQueuesBuilder~F~
rate(rate: Option<usize>): PushQueuesBuilder~F~
dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
build(): Result<(PushQueues~F~, PushHandle~F~), PushConfigError>
}
🤖 Prompt for AI Agents
In docs/asynchronous-outbound-messaging-design.md around lines 356 to 365 the
PushQueuesBuilder build() signature in the public API diagram incorrectly
returns (PushQueues~F~, PushHandle~F~); update it to return
Result<(PushQueues~F~, PushHandle~F~), PushConfigError> so the diagram matches
the rest of the docs and code; keep the generic and error type exactly as used
elsewhere (PushConfigError) and update any adjacent text if it repeats the old
signature.


PushHandleInner <.. PushHandle~F~ : contains
PushQueues~F~ o-- PushHandle~F~ : bounded(high_capacity, low_capacity)
PushQueues~F~ o-- PushQueuesBuilder~F~ : builder()
PushQueuesBuilder~F~ o-- PushHandle~F~ : build()
PushHandle --> PushPriority
PushHandle --> PushPolicy
PushHandle --> PushError
Expand Down
84 changes: 55 additions & 29 deletions docs/efficiency-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,86 @@

## Executive Summary

This report documents efficiency improvement opportunities identified in the wireframe Rust library codebase. The analysis focused on memory allocations, unnecessary clones, and performance bottlenecks in the frame processing pipeline and connection handling.
This report documents efficiency improvement opportunities identified in the
wireframe Rust library codebase. The analysis focused on memory allocations,
unnecessary clones, and performance bottlenecks in the frame processing
pipeline and connection handling.

## Key Findings

### 1. Frame Processor Unnecessary Allocation (HIGH IMPACT)

**Location**: `src/frame/processor.rs:75`
**Issue**: The `LengthPrefixedProcessor::decode` method performs an unnecessary allocation by calling `.to_vec()` on a `BytesMut` returned from `split_to()`.
**Location**: `src/frame/processor.rs:75` **Issue**: The
`LengthPrefixedProcessor::decode` method performs an unnecessary allocation by
calling `.to_vec()` on a `BytesMut` returned from `split_to()`.

```rust
// Current inefficient code:
Ok(Some(src.split_to(len).to_vec()))
```

**Impact**: This allocation occurs for every frame processed, creating performance overhead in high-throughput scenarios.
**Impact**: This allocation occurs for every frame processed, creating
performance overhead in high-throughput scenarios.

**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type to work directly with `Bytes` to avoid the conversion entirely.
**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type
to work directly with `Bytes` to avoid the conversion entirely.

**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more efficient.
**Status**: ✅ FIXED—Optimized to use `freeze().to_vec()` which is more
efficient.

### 2. Connection Actor Clone Operations (MEDIUM IMPACT)

**Location**: `src/connection.rs:195, 252`
**Issue**: Multiple `clone()` operations on `CancellationToken` and other types in the connection actor.
**Location**: `src/connection.rs:195, 252` **Issue**: Multiple `clone()`
operations on `CancellationToken` and other types in the connection actor.

```rust
pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() }
() = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown,
```

**Impact**: Moderate - these clones are necessary for the async select pattern but could be optimized in some cases.
**Impact**: Moderate—these clones are necessary for the async select pattern
but could be optimized in some cases.

**Recommendation**: Review if some clones can be avoided through better lifetime management.
**Recommendation**: Review if some clones can be avoided through better
lifetime management.

### 3. Middleware Chain Building (MEDIUM IMPACT)

**Location**: `src/app.rs:599`
**Issue**: Handler cloning during middleware chain construction.
**Location**: `src/app.rs:599` **Issue**: Handler cloning during middleware
chain construction.

```rust
let mut service = HandlerService::new(id, handler.clone());
```

**Impact**: Moderate - occurs during application setup, not in hot path.
**Impact**: Moderateoccurs during application setup, not in the hot path.

**Recommendation**: Consider using `Arc` references more efficiently.

### 4. Session Registry Operations (LOW-MEDIUM IMPACT)

**Location**: `src/session.rs:47-55`

**Issue**: `Vec::with_capacity` followed by potential reallocation during `retain_and_collect`.
**Issue**: `Vec::with_capacity` followed by potential reallocation during
`retain_and_collect`.

```rust
let mut out = Vec::with_capacity(self.0.len());
```

**Impact**: Low to medium - depends on registry size and pruning frequency.
**Impact**: Low to mediumdepends on registry size and pruning frequency.

**Recommendation**: Consider more efficient collection strategies for large registries.
**Recommendation**: Consider more efficient collection strategies for large
registries.

### 5. Vector Initializations (LOW IMPACT)

**Location**: Various files

**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is known.
**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is
known.

**Impact**: Low - minor allocation optimizations.
**Impact**: Lowminor allocation optimizations.

**Recommendation**: Use `with_capacity` when the expected size is known.

Expand All @@ -79,52 +91,62 @@ let mut out = Vec::with_capacity(self.0.len());

- **Bottleneck**: Frame decode/encode operations in high-throughput scenarios
- **Critical Path**: `LengthPrefixedProcessor::decode` method
- **Optimization Priority**: High - affects every incoming frame
- **Optimization Priority**: Highaffects every incoming frame

### Connection Handling

- **Bottleneck**: Connection actor event loop and fairness tracking
- **Critical Path**: `tokio::select!` in connection actor
- **Optimization Priority**: Medium - affects per-connection performance
- **Optimization Priority**: Mediumaffects per-connection performance

### Message Routing

- **Bottleneck**: HashMap lookups for route resolution
- **Critical Path**: Route handler lookup in `WireframeApp`
- **Optimization Priority**: Low - HashMap lookups are already efficient
- **Optimization Priority**: LowHashMap lookups are already efficient

## Implemented Optimizations

### Frame Processor Optimization
**Change**: Modified `LengthPrefixedProcessor::decode` to use `freeze().to_vec()` instead of direct `.to_vec()`.

**Change**: Modified `LengthPrefixedProcessor::decode` to use
`freeze().to_vec()` instead of direct `.to_vec()`.

**Before**:

```rust
Ok(Some(src.split_to(len).to_vec()))
```

**After**:

```rust
Ok(Some(src.split_to(len).freeze().to_vec()))
```

**Benefits**:

- Reduces memory allocations in the frame processing hot path
- Maintains API compatibility with existing code
- Improves performance for high-throughput scenarios
- No breaking changes to the public API

## Future Optimization Opportunities

1. **Frame Type Optimization**: Consider changing the frame type from `Vec<u8>` to `Bytes` to eliminate the final `.to_vec()` call entirely.
1. **Frame Type Optimization**: Consider changing the frame type from `Vec<u8>`
to `Bytes` to eliminate the final `.to_vec()` call entirely.

2. **Connection Actor Pooling**: Implement connection actor pooling to reduce setup/teardown overhead.
2. **Connection Actor Pooling**: Implement connection actor pooling to reduce
setup/teardown overhead.

3. **Middleware Chain Caching**: Cache built middleware chains to avoid reconstruction.
3. **Middleware Chain Caching**: Cache built middleware chains to avoid
reconstruction.

4. **Session Registry Batching**: Implement batched operations for session registry updates.
4. **Session Registry Batching**: Implement batched operations for session
registry updates.

5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where possible.
5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where
possible.

## Testing and Validation

Expand All @@ -138,6 +160,10 @@ All optimizations have been tested to ensure:

## Conclusion

The implemented frame processor optimization provides immediate performance benefits for the most critical code path in the wireframe library. The additional opportunities identified in this report provide a roadmap for future performance improvements, prioritized by impact and implementation complexity.
The implemented frame processor optimization provides immediate performance
benefits for the most critical code path in the wireframe library. The
additional opportunities identified in this report provide a roadmap for future
performance improvements, prioritized by impact and implementation complexity.

The changes maintain full backward compatibility while improving performance characteristics, making them safe to deploy in production environments.
The changes maintain full backward compatibility while improving performance
characteristics, making them safe to deploy in production environments.
13 changes: 11 additions & 2 deletions docs/hardening-wireframe-a-guide-to-production-resilience.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,20 @@ token-bucket algorithm is ideal.
use wireframe::push::{PushQueues, MAX_PUSH_RATE};

// Configure a connection to allow at most MAX_PUSH_RATE pushes per second.
let (queues, handle) = PushQueues::<Frame>::bounded_with_rate(8, 8, Some(MAX_PUSH_RATE))
let (queues, handle) = PushQueues::<Frame>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(Some(MAX_PUSH_RATE))
.build()
.expect("rate within supported bounds");

// Passing `None` disables rate limiting entirely:
let (_unlimited, _handle) = PushQueues::<Frame>::bounded_no_rate_limit(8, 8);
let (_unlimited, _handle) = PushQueues::<Frame>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(None)
.build()
.expect("failed to build unlimited queues");

// Inside PushHandle::push()
async fn push(&self, frame: Frame) -> Result<(), PushError> {
Expand Down
4 changes: 2 additions & 2 deletions docs/multi-packet-and-streaming-responses-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ The public API is designed for clarity, performance, and ergonomic flexibility.
### 4.1 The `Response` Enum

The `Response` enum is the primary return type for all handlers. It is enhanced
to provide optimised paths for common response patterns.
to provide optimized paths for common response patterns.

```Rust
use futures_core::stream::Stream;
Expand All @@ -82,7 +82,7 @@ pub enum Response<F, E> {
/// A single frame reply. The most common case.
Single(F),

/// An optimised response for a small, known list of frames.
/// An optimized response for a small, known list of frames.
/// Avoids the overhead of boxing and dynamic dispatch for simple multi-part replies.
Vec(Vec<F>),

Expand Down
14 changes: 11 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ impl Default for FairnessConfig {
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, handle) = PushQueues::<u8>::bounded(8, 8);
/// let (queues, handle) = PushQueues::<u8>::builder()
/// .high_capacity(8)
/// .low_capacity(8)
/// .build()
/// .expect("failed to build PushQueues");
/// let shutdown = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown);
/// # drop(actor);
Expand All @@ -125,11 +129,15 @@ where
///
/// # Examples
///
/// ```
/// ```no_run
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, handle) = PushQueues::<u8>::bounded(4, 4);
/// let (queues, handle) = PushQueues::<u8>::builder()
/// .high_capacity(4)
/// .low_capacity(4)
/// .build()
/// .expect("failed to build PushQueues");
/// let token = CancellationToken::new();
Comment on lines +136 to 141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Mark the doctest as no_run for consistency and faster docs.

Prevent unintended execution in docs and match the prior example style.

-    /// ```
+    /// ```no_run
     /// use tokio_util::sync::CancellationToken;
     /// use wireframe::{connection::ConnectionActor, push::PushQueues};
     ///
     /// let (queues, handle) = PushQueues::<u8>::builder()
🤖 Prompt for AI Agents
In src/connection.rs around lines 136 to 141, the documentation example should
be marked as a non-executing doctest for consistency and faster docs; update the
code block to use the no_run doctest tag (i.e., add the no_run fence/attribute
to the opening triple-backtick of the example) so the example is not executed by
rustdoc and matches the prior example style.

/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, token);
/// # drop(actor);
Expand Down
Loading