Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ after formatting. Line numbers below refer to that file.
server::WireframeServer,
};

async fn handler() {}
use wireframe::app::Envelope;
async fn handler(_env: &Envelope) {}

#[tokio::main]
async fn main() -> std::io::Result<()> {
let factory = || {
WireframeApp::new()
.unwrap()
.route(1, Box::new(|| Box::pin(handler())))
.route(1, Box::new(|env| Box::pin(handler(env))))
.unwrap()
};

Expand Down
158 changes: 146 additions & 12 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Application builder configuring routes and middleware.
//!
//! `WireframeApp` stores registered routes, services, and middleware
//! for a [`WireframeServer`]. Most builder methods return [`Result<Self>`]
//! so callers can chain registrations ergonomically.
//! This module defines [`WireframeApp`], an Actix-inspired builder for
//! managing connection state, routing, and middleware in a `WireframeServer`.
//! It exposes convenience methods to register handlers and lifecycle hooks.

use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin, sync::Arc};

Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct WireframeApp<S: Serializer = BincodeSerializer, C: Send + 'static = (
///
/// A `Service` is a boxed function returning a [`Future`], enabling
/// asynchronous execution of message handlers.
pub type Service = Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type Service = Box<dyn Fn(&Envelope) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;

/// Trait representing middleware components.
pub trait Middleware: Send + Sync {}
Expand Down Expand Up @@ -81,6 +81,21 @@ impl From<io::Error> for SendError {
fn from(e: io::Error) -> Self { SendError::Io(e) }
}

/// Basic envelope type used by [`handle_connection`].
///
/// Incoming frames are deserialized into an `Envelope` containing the
/// message identifier and raw payload bytes.
#[derive(bincode::Decode, bincode::Encode)]
pub struct Envelope {
id: u32,
msg: Vec<u8>,
}

/// Number of idle polls before terminating a connection.
const MAX_IDLE_POLLS: u32 = 50; // ~5s with 100ms timeout
/// Maximum consecutive deserialization failures before closing a connection.
const MAX_DESER_FAILURES: u32 = 10;

/// Result type used throughout the builder API.
pub type Result<T> = std::result::Result<T, WireframeError>;

Expand Down Expand Up @@ -171,8 +186,8 @@ where
/// # Type Parameters
///
/// This method changes the connection state type parameter from `C` to `C2`.
/// This means that any subsequent builder methods will operate on the new connection state type `C2`.
/// Be aware of this type transition when chaining builder methods.
/// This means that any subsequent builder methods will operate on the new connection state type
/// `C2`. Be aware of this type transition when chaining builder methods.
///
/// # Errors
///
Expand Down Expand Up @@ -271,7 +286,7 @@ where
/// This placeholder immediately closes the connection after the
/// preamble phase. A warning is logged so tests and callers are
/// aware of the current limitation.
pub async fn handle_connection<W>(&self, _stream: W)
pub async fn handle_connection<W>(&self, mut stream: W)
where
W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
Expand All @@ -281,14 +296,133 @@ where
None
};

log::warn!(
"`WireframeApp::handle_connection` called, but connection handling is not \
implemented; closing stream"
);
tokio::task::yield_now().await;
if let Err(e) = self.process_stream(&mut stream).await {
log::warn!("connection terminated with error: {e}");
}

if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) {
teardown(state).await;
}
}

async fn process_stream<W>(&self, stream: &mut W) -> io::Result<()>
where
W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let mut buf = BytesMut::with_capacity(1024);
let mut idle = 0u32;
let mut deser_failures = 0u32;

loop {
if let Some(frame) = self.frame_processor.decode(&mut buf)? {
self.handle_frame(stream, &frame, &mut deser_failures)
.await?;
idle = 0;
continue;
}

if self.read_and_update(stream, &mut buf, &mut idle).await? {
break;
}
}

Ok(())
}

async fn read_and_update<W>(
&self,
stream: &mut W,
buf: &mut BytesMut,
idle: &mut u32,
) -> io::Result<bool>
where
W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
match self.read_into(stream, buf).await {
Ok(Some(0)) => Ok(true),
Ok(Some(_)) => {
*idle = 0;
Ok(false)
}
Ok(None) => {
*idle += 1;
Ok(*idle >= MAX_IDLE_POLLS)
}
Err(e) if Self::is_transient_error(&e) => Ok(false),
Err(e) if Self::is_fatal_error(&e) => Ok(true),
Err(e) => Err(e),
}
}

fn is_transient_error(e: &io::Error) -> bool {
matches!(
e.kind(),
io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted
)
}

fn is_fatal_error(e: &io::Error) -> bool {
matches!(
e.kind(),
io::ErrorKind::UnexpectedEof
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::BrokenPipe
)
}

async fn read_into<W>(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result<Option<usize>>
where
W: tokio::io::AsyncRead + Unpin,
{
use tokio::{
io::AsyncReadExt,
time::{Duration, timeout},
};

const READ_TIMEOUT: Duration = Duration::from_millis(100);

match timeout(READ_TIMEOUT, stream.read_buf(buf)).await {
Ok(Ok(n)) => Ok(Some(n)),
Ok(Err(e)) => Err(e),
Err(_) => Ok(None),
}
}

async fn handle_frame<W>(
&self,
stream: &mut W,
frame: &[u8],
deser_failures: &mut u32,
) -> io::Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
{
match self.serializer.deserialize::<Envelope>(frame) {
Ok((env, _)) => {
*deser_failures = 0;
if let Some(handler) = self.routes.get(&env.id) {
handler(&env).await;
} else {
log::warn!("no handler for message id {}", env.id);
}

if let Err(e) = self.send_response(stream, &env).await {
log::warn!("failed to send response: {e}");
}
}
Err(e) => {
*deser_failures += 1;
log::warn!("failed to deserialize message: {e}");
if *deser_failures >= MAX_DESER_FAILURES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"too many deserialization failures",
));
}
}
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion tests/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn teardown_without_setup_does_not_run() {

let app = WireframeApp::new()
.unwrap()
.on_connection_teardown(move |_| {
.on_connection_teardown(move |()| {
let teardown_clone = teardown_clone.clone();
async move {
teardown_clone.fetch_add(1, Ordering::SeqCst);
Expand Down