Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,20 @@ classDiagram
class PushQueues~F~ {
+high_priority_rx: mpsc::Receiver<F>
+low_priority_rx: mpsc::Receiver<F>
+bounded(high_capacity: usize, low_capacity: usize): (PushQueues~F~, PushHandle~F~)
+builder(): PushQueuesBuilder~F~
+recv(): Option<(PushPriority, F)>
}
class PushQueuesBuilder~F~ {
+high_capacity(cap: usize): PushQueuesBuilder~F~
+low_capacity(cap: usize): PushQueuesBuilder~F~
+rate(rate: Option<usize>): PushQueuesBuilder~F~
+dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
+build(): (PushQueues~F~, PushHandle~F~)
}

PushHandleInner <.. PushHandle~F~ : contains
PushQueues~F~ o-- PushHandle~F~ : bounded(high_capacity, low_capacity)
PushQueues~F~ o-- PushQueuesBuilder~F~ : builder()
PushQueuesBuilder~F~ o-- PushHandle~F~ : build()
PushHandle --> PushPriority
PushHandle --> PushPolicy
PushHandle --> PushError
Expand Down
72 changes: 49 additions & 23 deletions docs/efficiency-report.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,53 @@

## Executive Summary

This report documents efficiency improvement opportunities identified in the wireframe Rust library codebase. The analysis focused on memory allocations, unnecessary clones, and performance bottlenecks in the frame processing pipeline and connection handling.
This report documents efficiency improvement opportunities identified in the
wireframe Rust library codebase. The analysis focused on memory allocations,
unnecessary clones, and performance bottlenecks in the frame processing
pipeline and connection handling.

## Key Findings

### 1. Frame Processor Unnecessary Allocation (HIGH IMPACT)

**Location**: `src/frame/processor.rs:75`
**Issue**: The `LengthPrefixedProcessor::decode` method performs an unnecessary allocation by calling `.to_vec()` on a `BytesMut` returned from `split_to()`.
**Location**: `src/frame/processor.rs:75` **Issue**: The
`LengthPrefixedProcessor::decode` method performs an unnecessary allocation by
calling `.to_vec()` on a `BytesMut` returned from `split_to()`.

```rust
// Current inefficient code:
Ok(Some(src.split_to(len).to_vec()))
```

**Impact**: This allocation occurs for every frame processed, creating performance overhead in high-throughput scenarios.
**Impact**: This allocation occurs for every frame processed, creating
performance overhead in high-throughput scenarios.

**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type to work directly with `Bytes` to avoid the conversion entirely.
**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type
to work directly with `Bytes` to avoid the conversion entirely.

**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more efficient.
**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more
efficient.

### 2. Connection Actor Clone Operations (MEDIUM IMPACT)

**Location**: `src/connection.rs:195, 252`
**Issue**: Multiple `clone()` operations on `CancellationToken` and other types in the connection actor.
**Location**: `src/connection.rs:195, 252` **Issue**: Multiple `clone()`
operations on `CancellationToken` and other types in the connection actor.

```rust
pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() }
() = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown,
```

**Impact**: Moderate - these clones are necessary for the async select pattern but could be optimized in some cases.
**Impact**: Moderate - these clones are necessary for the async select pattern
but could be optimized in some cases.

**Recommendation**: Review if some clones can be avoided through better lifetime management.
**Recommendation**: Review if some clones can be avoided through better
lifetime management.

### 3. Middleware Chain Building (MEDIUM IMPACT)

**Location**: `src/app.rs:599`
**Issue**: Handler cloning during middleware chain construction.
**Location**: `src/app.rs:599` **Issue**: Handler cloning during middleware
chain construction.

```rust
let mut service = HandlerService::new(id, handler.clone());
Expand All @@ -53,21 +62,24 @@ let mut service = HandlerService::new(id, handler.clone());

**Location**: `src/session.rs:47-55`

**Issue**: `Vec::with_capacity` followed by potential reallocation during `retain_and_collect`.
**Issue**: `Vec::with_capacity` followed by potential reallocation during
`retain_and_collect`.

```rust
let mut out = Vec::with_capacity(self.0.len());
```

**Impact**: Low to medium - depends on registry size and pruning frequency.

**Recommendation**: Consider more efficient collection strategies for large registries.
**Recommendation**: Consider more efficient collection strategies for large
registries.

### 5. Vector Initializations (LOW IMPACT)

**Location**: Various files

**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is known.
**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is
known.

**Impact**: Low - minor allocation optimizations.

Expand Down Expand Up @@ -96,35 +108,45 @@ let mut out = Vec::with_capacity(self.0.len());
## Implemented Optimizations

### Frame Processor Optimization
**Change**: Modified `LengthPrefixedProcessor::decode` to use `freeze().to_vec()` instead of direct `.to_vec()`.

**Change**: Modified `LengthPrefixedProcessor::decode` to use
`freeze().to_vec()` instead of direct `.to_vec()`.

**Before**:

```rust
Ok(Some(src.split_to(len).to_vec()))
```

**After**:

```rust
Ok(Some(src.split_to(len).freeze().to_vec()))
```

**Benefits**:

- Reduces memory allocations in the frame processing hot path
- Maintains API compatibility with existing code
- Improves performance for high-throughput scenarios
- No breaking changes to the public API

## Future Optimization Opportunities

1. **Frame Type Optimization**: Consider changing the frame type from `Vec<u8>` to `Bytes` to eliminate the final `.to_vec()` call entirely.
1. **Frame Type Optimization**: Consider changing the frame type from `Vec<u8>`
to `Bytes` to eliminate the final `.to_vec()` call entirely.

2. **Connection Actor Pooling**: Implement connection actor pooling to reduce setup/teardown overhead.
2. **Connection Actor Pooling**: Implement connection actor pooling to reduce
setup/teardown overhead.

3. **Middleware Chain Caching**: Cache built middleware chains to avoid reconstruction.
3. **Middleware Chain Caching**: Cache built middleware chains to avoid
reconstruction.

4. **Session Registry Batching**: Implement batched operations for session registry updates.
4. **Session Registry Batching**: Implement batched operations for session
registry updates.

5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where possible.
5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where
possible.

## Testing and Validation

Expand All @@ -138,6 +160,10 @@ All optimizations have been tested to ensure:

## Conclusion

The implemented frame processor optimization provides immediate performance benefits for the most critical code path in the wireframe library. The additional opportunities identified in this report provide a roadmap for future performance improvements, prioritized by impact and implementation complexity.
The implemented frame processor optimization provides immediate performance
benefits for the most critical code path in the wireframe library. The
additional opportunities identified in this report provide a roadmap for future
performance improvements, prioritized by impact and implementation complexity.

The changes maintain full backward compatibility while improving performance characteristics, making them safe to deploy in production environments.
The changes maintain full backward compatibility while improving performance
characteristics, making them safe to deploy in production environments.
13 changes: 11 additions & 2 deletions docs/hardening-wireframe-a-guide-to-production-resilience.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,20 @@ token-bucket algorithm is ideal.
use wireframe::push::{PushQueues, MAX_PUSH_RATE};

// Configure a connection to allow at most MAX_PUSH_RATE pushes per second.
let (queues, handle) = PushQueues::<Frame>::bounded_with_rate(8, 8, Some(MAX_PUSH_RATE))
let (queues, handle) = PushQueues::<Frame>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(Some(MAX_PUSH_RATE))
.build()
.expect("rate within supported bounds");

// Passing `None` disables rate limiting entirely:
let (_unlimited, _handle) = PushQueues::<Frame>::bounded_no_rate_limit(8, 8);
let (_unlimited, _handle) = PushQueues::<Frame>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(None)
.build()
.expect("failed to build unlimited queues");

// Inside PushHandle::push()
async fn push(&self, frame: Frame) -> Result<(), PushError> {
Expand Down
12 changes: 10 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ impl Default for FairnessConfig {
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, handle) = PushQueues::<u8>::bounded(8, 8);
/// let (queues, handle) = PushQueues::<u8>::builder()
/// .high_capacity(8)
/// .low_capacity(8)
/// .build()
/// .expect("failed to build PushQueues");
/// let shutdown = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown);
/// # drop(actor);
Expand Down Expand Up @@ -129,7 +133,11 @@ where
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, handle) = PushQueues::<u8>::bounded(4, 4);
/// let (queues, handle) = PushQueues::<u8>::builder()
/// .high_capacity(4)
/// .low_capacity(4)
/// .build()
/// .expect("failed to build PushQueues");
/// let token = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, token);
/// # drop(actor);
Expand Down
Loading
Loading