-
Notifications
You must be signed in to change notification settings - Fork 0
Implement connection handling #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement connection handling #76
Conversation
Reviewer's GuideThis PR implements a full connection handling pipeline in handle_connection by introducing an Envelope type, refactoring handle_connection to call new processing methods, and adding process_stream, read_into, and handle_frame to continuously read, decode, route, and respond to incoming frames with proper logging and timeouts. Sequence diagram for the new connection handling pipelinesequenceDiagram
participant Client
participant WireframeApp
participant Handler
Client->>WireframeApp: Send frame
WireframeApp->>WireframeApp: process_stream()
WireframeApp->>WireframeApp: decode(frame)
WireframeApp->>WireframeApp: deserialize Envelope
alt Handler exists
WireframeApp->>Handler: handler().await
Handler-->>WireframeApp: (response)
else No handler
WireframeApp->>WireframeApp: log warning
end
WireframeApp->>WireframeApp: send_response()
WireframeApp-->>Client: Send response
Class diagram for Envelope and connection handling methodsclassDiagram
class Envelope {
+u32 id
+Vec<u8> msg
}
class WireframeApp {
+handle_connection(stream)
+process_stream(stream)
+read_into(stream, buf)
+handle_frame(stream, frame)
+send_response(stream, env)
+routes: HashMap<u32, Handler>
+serializer
+frame_processor
}
WireframeApp --> Envelope : deserializes
WireframeApp --> Handler : executes
WireframeApp --> send_response : calls
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
## Walkthrough
The connection handling logic in the application has been updated from a placeholder to a functional implementation. The `handle_connection` method now processes incoming streams, decodes frames, dispatches messages to handlers, sends responses, and manages connection teardown, all within an asynchronous processing loop.
## Changes
| File(s) | Change Summary |
|-------------------|------------------------------------------------------------------------------------------------------------------|
| src/app.rs | Replaced placeholder connection handler with an async processing loop: reads, decodes, dispatches, and responds. Updated method signature and added private async methods for stream processing, frame handling, and reading with timeout. Added `Envelope` struct and updated `Service` type alias. Introduced `MAX_IDLE_POLLS` constant. |
| tests/lifecycle.rs | Modified `on_connection_teardown` callback closure parameter from ignored to explicit empty tuple in a test. |
| docs/roadmap.md | Updated example code to reflect new handler signature accepting an `Envelope` reference and corresponding routing closure changes. |
## Sequence Diagram(s)
```mermaid
sequenceDiagram
participant Client
participant App
participant FrameProcessor
participant Handler
Client->>App: Open connection (stream)
loop While stream open
App->>FrameProcessor: Read & decode frame
FrameProcessor-->>App: Decoded Envelope (msg_id, payload)
App->>Handler: Lookup & execute handler for msg_id
Handler-->>App: Handler response
App->>Client: Send response envelope
end
App->>App: Invoke teardown callback (if any) Possibly related PRs
Poem
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gates Failed
Prevent hotspot decline
(1 hotspot with Deep, Nested Complexity)
Enforce critical code health rules
(1 file with Deep, Nested Complexity)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
Prevent hotspot decline | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 rule in this hotspot | 10.00 → 9.39 | Suppress |
Enforce critical code health rules | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 critical rule | 10.00 → 9.39 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- The route handler closure is invoked without passing in the deserialized payload (
env.msg
); consider updating the handler signature to accept the message contents. - The read loop uses a hard-coded 10 ms timeout before breaking, which could lead to busy loops; consider making this configurable or relying on async readiness notifications instead.
- Using
warn!
for expected conditions (like missing handlers or deserialization failures) may spam logs; consider lowering these toinfo!
ordebug!
levels instead.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The route handler closure is invoked without passing in the deserialized payload (`env.msg`); consider updating the handler signature to accept the message contents.
- The read loop uses a hard-coded 10 ms timeout before breaking, which could lead to busy loops; consider making this configurable or relying on async readiness notifications instead.
- Using `warn!` for expected conditions (like missing handlers or deserialization failures) may spam logs; consider lowering these to `info!` or `debug!` levels instead.
## Individual Comments
### Comment 1
<location> `src/app.rs:293` </location>
<code_context>
}
}
+
+ async fn process_stream<W>(&self, stream: &mut W) -> io::Result<()>
+ where
+ W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a module-level doc comment at the top of the file using `//!` to describe the purpose of this module.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
src/app.rs (1)
274-290
: Sequential handler execution may throttle throughput
handler().await
runs in-line, so a slow handler blocks further frame processing for the same connection. Consider spawning each handler on a dedicated task (e.g.tokio::spawn
) or using buffered channels to decouple I/O from business logic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
src/app.rs
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build-test
- GitHub Check: Analyze (actions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gates Failed
Prevent hotspot decline
(1 hotspot with Deep, Nested Complexity)
Enforce critical code health rules
(1 file with Deep, Nested Complexity)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
Prevent hotspot decline | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 rule in this hotspot | 10.00 → 9.39 | Suppress |
Enforce critical code health rules | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 critical rule | 10.00 → 9.39 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
@sourcery-ai review |
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- Defining
Envelope
insideprocess_stream
duplicates your message type each time—consider extracting it to a shared module so the serializer can reuse a single type. - Using a fixed 10 ms timeout before each
read_buf
may spin under low traffic—consider awaitingread_buf
directly or using a backoff to reduce CPU usage. - You currently invoke handlers as
handler().await
without passing the deserializedenv.msg
payload—update your route APIs to accept the message content so handlers can process it.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Defining `Envelope` inside `process_stream` duplicates your message type each time—consider extracting it to a shared module so the serializer can reuse a single type.
- Using a fixed 10 ms timeout before each `read_buf` may spin under low traffic—consider awaiting `read_buf` directly or using a backoff to reduce CPU usage.
- You currently invoke handlers as `handler().await` without passing the deserialized `env.msg` payload—update your route APIs to accept the message content so handlers can process it.
## Individual Comments
### Comment 1
<location> `src/app.rs:293` </location>
<code_context>
}
}
+
+ async fn process_stream<W>(&self, stream: &mut W) -> io::Result<()>
+ where
+ W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a module-level doc comment at the top of the file using `//!` to describe the purpose of this module.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- Consider externalizing the Envelope struct to a shared module instead of defining it inside process_stream to ensure consistent serialization across the codebase.
- The timeout on read_buf currently breaks out of the loop on any idle period—treating timeouts as continue (instead of break) will prevent the connection from closing prematurely when no data arrives for a short while.
- You’re discarding the deserialized payload when invoking handlers (handler().await); consider changing the handler signature to accept env.msg so each route can process the actual message content.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider externalizing the Envelope struct to a shared module instead of defining it inside process_stream to ensure consistent serialization across the codebase.
- The timeout on read_buf currently breaks out of the loop on any idle period—treating timeouts as continue (instead of break) will prevent the connection from closing prematurely when no data arrives for a short while.
- You’re discarding the deserialized payload when invoking handlers (handler().await); consider changing the handler signature to accept env.msg so each route can process the actual message content.
## Individual Comments
### Comment 1
<location> `src/app.rs:293` </location>
<code_context>
}
}
+
+ async fn process_stream<W>(&self, stream: &mut W) -> io::Result<()>
+ where
+ W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a module-level doc comment at the top of the file using `//!` to describe the purpose of this module.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- The 10 ms timeout in process_stream will break out on any idle period—consider using a longer (or configurable) timeout or looping until EOF to avoid premature disconnects.
- You’re redefining the Envelope struct with bincode derives inside process_stream on every call—move it (and its derives) to a shared module-level type for reuse and clarity.
- Handlers are currently invoked as handler().await and never receive the deserialized payload—consider passing
env.msg
(or a parsed type) into the handler so it can actually process the message.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The 10 ms timeout in process_stream will break out on any idle period—consider using a longer (or configurable) timeout or looping until EOF to avoid premature disconnects.
- You’re redefining the Envelope struct with bincode derives inside process_stream on every call—move it (and its derives) to a shared module-level type for reuse and clarity.
- Handlers are currently invoked as handler().await and never receive the deserialized payload—consider passing `env.msg` (or a parsed type) into the handler so it can actually process the message.
## Individual Comments
### Comment 1
<location> `src/app.rs:293` </location>
<code_context>
}
}
+
+ async fn process_stream<W>(&self, stream: &mut W) -> io::Result<()>
+ where
+ W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a module-level doc comment at the top of the file using `//!` to describe the purpose of this module.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@sourcery-ai resolve |
@coderabbitai resolve |
✅ Actions performedComments resolved and changes approved. |
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- The current read_into implementation treats a 100 ms timeout as EOF and breaks the loop, which will close idle connections prematurely—consider looping on timeouts instead of exiting.
- handle_frame always calls send_response even when there’s no handler or deserialization fails; you should make sending a response conditional or return an explicit error frame.
- Route handlers are invoked without access to the deserialized payload (
env.msg
); consider passing the message bytes or a parsed struct into the handler for actual message processing.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The current read_into implementation treats a 100 ms timeout as EOF and breaks the loop, which will close idle connections prematurely—consider looping on timeouts instead of exiting.
- handle_frame always calls send_response even when there’s no handler or deserialization fails; you should make sending a response conditional or return an explicit error frame.
- Route handlers are invoked without access to the deserialized payload (`env.msg`); consider passing the message bytes or a parsed struct into the handler for actual message processing.
## Individual Comments
### Comment 1
<location> `src/app.rs:341` </location>
<code_context>
+ where
+ W: tokio::io::AsyncWrite + Unpin,
+ {
+ match self.serializer.deserialize::<Envelope>(frame) {
+ Ok((env, _)) => {
+ if let Some(handler) = self.routes.get(&env.id) {
+ handler().await;
+ } else {
+ log::warn!("no handler for message id {}", env.id);
+ }
+
+ let _ = self.send_response(stream, &env).await;
+ }
+ Err(e) => log::warn!("failed to deserialize message: {e}"),
+ }
+
</code_context>
<issue_to_address>
Deserialization errors are only logged, not surfaced.
Since errors are only logged, repeated malformed frames could be exploited or cause resource issues. Consider adding a limit on deserialization errors or closing the connection after multiple failures.
</issue_to_address>
### Comment 2
<location> `src/app.rs:88` </location>
<code_context>
+///
+/// Incoming frames are deserialized into an `Envelope` containing the
+/// message identifier and raw payload bytes.
+#[derive(bincode::Decode, bincode::Encode)]
+struct Envelope {
+ id: u32,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a `//!` comment at the top of the module to describe its purpose, as required by the review instructions.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/app.rs:88` </location>
<code_context>
+///
+/// Incoming frames are deserialized into an `Envelope` containing the
+/// message identifier and raw payload bytes.
+#[derive(bincode::Decode, bincode::Encode)]
+struct Envelope {
+ id: u32,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a module-level doc comment at the top of the file using `//!` to describe the purpose of this module, as required by the review instructions.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @leynos - I've reviewed your changes - here's some feedback:
- In read_into, a timeout is treated as EOF and breaks the loop, which will close connections on short idle periods—consider distinguishing between an actual EOF and a temporary timeout to keep idle connections alive.
- The current handler lookup only allows zero-argument closures and ignores the payload; consider updating your routes to pass the deserialized message into the handler so handlers can operate on the Envelope content.
- Errors from send_response are silently dropped; it would be more robust to at least log failures when sending a response instead of ignoring them.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In read_into, a timeout is treated as EOF and breaks the loop, which will close connections on short idle periods—consider distinguishing between an actual EOF and a temporary timeout to keep idle connections alive.
- The current handler lookup only allows zero-argument closures and ignores the payload; consider updating your routes to pass the deserialized message into the handler so handlers can operate on the Envelope content.
- Errors from send_response are silently dropped; it would be more robust to at least log failures when sending a response instead of ignoring them.
## Individual Comments
### Comment 1
<location> `src/app.rs:88` </location>
<code_context>
+///
+/// Incoming frames are deserialized into an `Envelope` containing the
+/// message identifier and raw payload bytes.
+#[derive(bincode::Decode, bincode::Encode)]
+struct Envelope {
+ id: u32,
</code_context>
<issue_to_address>
The module is missing a containing item comment (`//!`).
Please add a `//!` comment at the top of the module to describe its purpose, as required by the review instructions.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gates Failed
Prevent hotspot decline
(1 hotspot with Bumpy Road Ahead)
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
Prevent hotspot decline | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 rule in this hotspot | 10.00 → 9.84 | Suppress |
Enforce critical code health rules | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 critical rule | 10.00 → 9.84 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
@sourcery-ai resolve |
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/app.rs (3)
352-372
: Critical compilation error in response sendingLine 364 attempts to send
&env
(type&Envelope
) viasend_response
, butEnvelope
doesn't implement theMessage
trait required by the method signature.Either implement
Message
forEnvelope
or modify the response logic:-if let Err(e) = self.send_response(stream, &env).await { - log::warn!("failed to send response: {e}"); -} +// Option 1: Echo the original frame back +if let Err(e) = stream.write_all(frame).await { + log::warn!("failed to echo response: {e}"); +}
84-93
: Envelope struct missing Message trait implementationThe
Envelope
struct is used insend_response
(line 364) but doesn't implement the requiredMessage
trait, causing a compilation error.Implement the
Message
trait forEnvelope
:+use crate::message::Message; +impl Message for Envelope {} #[derive(bincode::Decode, bincode::Encode)] pub struct Envelope { id: u32, msg: Vec<u8>, }
368-368
: Consider limiting deserialization failuresRepeated deserialization errors are only logged, which could be exploited or cause resource issues. Consider implementing a failure threshold that closes the connection after multiple consecutive failures.
+let mut deser_failures = 0u32; +const MAX_DESER_FAILURES: u32 = 10; match self.serializer.deserialize::<Envelope>(frame) { Ok((env, _)) => { + deser_failures = 0; // ... existing handler logic } - Err(e) => log::warn!("failed to deserialize message: {e}"), + Err(e) => { + deser_failures += 1; + log::warn!("failed to deserialize message: {e}"); + if deser_failures >= MAX_DESER_FAILURES { + return Err(io::Error::new(io::ErrorKind::InvalidData, "too many deserialization failures")); + } + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
docs/roadmap.md
(1 hunks)src/app.rs
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build-test
🔇 Additional comments (7)
src/app.rs (5)
3-5
: Well-documented module purposeThe comprehensive module documentation clearly describes the
WireframeApp
builder and its role in managing connection state, routing, and middleware. This addresses the previous review feedback about missing module-level documentation.
41-41
: Handler signature correctly updated to receive message payloadThe
Service
type now accepts&Envelope
parameter, ensuring handlers can access the deserialized message content. This resolves the previous issue where the message payload never reached the handler.
94-96
: Reasonable idle timeout durationThe 5-second timeout (50 polls × 100ms) is much more appropriate than the previous 10ms timeout, allowing for normal connection idle periods without premature termination.
287-304
: Robust connection lifecycle managementThe connection handling now properly implements setup, processing, and teardown phases with error handling, replacing the previous placeholder implementation.
334-350
: Well-implemented timeout-based readingThe
read_into
method properly distinguishes between successful reads, I/O errors, and timeouts, providing the foundation for graceful idle connection handling.docs/roadmap.md (2)
42-43
: Documentation updated to match new handler signatureThe example correctly imports
Envelope
and updates the handler function to accept the&Envelope
parameter, maintaining consistency with the implementation changes insrc/app.rs
.
50-50
: Route closure properly passes envelope parameterThe route registration correctly passes the
env
argument to the handler, demonstrating the updated Service signature where handlers receive the deserialized message envelope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gates Failed
Prevent hotspot decline
(1 hotspot with Bumpy Road Ahead)
Enforce critical code health rules
(1 file with Bumpy Road Ahead)
Gates Passed
3 Quality Gates Passed
See analysis details in CodeScene
Reason for failure
Prevent hotspot decline | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 rule in this hotspot | 10.00 → 9.84 | Suppress |
Enforce critical code health rules | Violations | Code Health Impact | |
---|---|---|---|
app.rs | 1 critical rule | 10.00 → 9.84 | Suppress |
Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
Testing
cargo clippy -- -D warnings
RUSTFLAGS="-D warnings" cargo test
https://chatgpt.com/codex/tasks/task_e_6852d86d03bc83229328f6b1d3da20f3
Summary by Sourcery
Implement full connection handling by adding a streaming loop that decodes frames into
Envelope
messages, dispatches to registered handlers based on message ID, and sends responses using the existing serializer.New Features:
Envelope
struct for message framing and routing.Enhancements:
process_stream
,read_into
, andhandle_frame
methods to manage stream I/O and message processing.Tests:
on_connection_teardown
callback signature in lifecycle tests to accept a unit parameter.Summary by CodeRabbit