Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions physics/physics.dl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// DDlog definitions for the experimental physics engine.
//
// These types model spatial positions, individual blocks and
// the slopes that connect them. They will be extended as the
// engine evolves.

type Position = struct {
x: i32,
y: i32,
z: i32,
}

enum BlockType {
Air,
Solid,
Water,
}

type Block = struct {
pos: Position,
kind: BlockType,
}

type Slope = struct {
from: Position,
to: Position,
gradient: i32,
}

input relation Blocks(b: Block)
input relation Slopes(s: Slope)

// Derived relation listing all occupied positions.
output relation Occupied(p: Position)

// Use a single rule to avoid redundant facts if aggregation is added later.
Occupied(p) :-
Blocks(Block{pos=p, ..});
Slopes(Slope{from=p, ..});
Slopes(Slope{to=p, ..}).
63 changes: 26 additions & 37 deletions src/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,51 +56,17 @@ pub trait FromMessageRequest: Sized {
pub struct SharedState<T: Send + Sync>(Arc<T>);

impl<T: Send + Sync> SharedState<T> {
/// Construct a new [`SharedState`].
/// Construct a new [`SharedState`] wrapping the provided `Arc<T>`.
///
/// # Examples
///
/// ```ignore
/// ```
/// use std::sync::Arc;
/// use wireframe::extractor::SharedState;
///
/// let data = Arc::new(5u32);
/// let state = SharedState::new(Arc::clone(&data));
/// assert_eq!(*state, 5);
/// ```
#[must_use]

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn advance_consumes_bytes() {
let mut payload = Payload { data: b"hello" };
payload.advance(2);
assert_eq!(payload.data, b"llo");
payload.advance(10);
assert!(payload.data.is_empty());
}

#[test]
fn remaining_reports_length() {
let mut payload = Payload { data: b"abc" };
assert_eq!(payload.remaining(), 3);
payload.advance(1);
assert_eq!(payload.remaining(), 2);
}
}
/// Creates a new `SharedState` instance wrapping the provided `Arc<T>`.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// let state = Arc::new(42);
/// let shared = SharedState::new(state.clone());
/// assert_eq!(*shared, 42);
/// ```
#[must_use]
pub fn new(inner: Arc<T>) -> Self {
Self(inner)
}
Expand All @@ -117,6 +83,7 @@ impl<T: Send + Sync> std::ops::Deref for SharedState<T> {
///
/// ```
/// use std::sync::Arc;
/// use wireframe::extractor::SharedState;
/// let state = Arc::new(42);
/// let shared = SharedState::new(state.clone());
/// assert_eq!(*shared, 42);
Expand All @@ -125,3 +92,25 @@ impl<T: Send + Sync> std::ops::Deref for SharedState<T> {
&self.0
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn advance_consumes_bytes() {
let mut payload = Payload { data: b"hello" };
payload.advance(2);
assert_eq!(payload.data, b"llo");
payload.advance(10);
assert!(payload.data.is_empty());
}

#[test]
fn remaining_reports_length() {
let mut payload = Payload { data: b"abc" };
assert_eq!(payload.remaining(), 3);
payload.advance(1);
assert_eq!(payload.remaining(), 2);
}
}
60 changes: 6 additions & 54 deletions src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,91 +1,43 @@
use async_trait::async_trait;

/// Incoming request wrapper passed through middleware.
#[derive(Debug)]
pub struct ServiceRequest;

/// Response produced by a handler or middleware.
#[derive(Debug, Default)]
pub struct ServiceResponse;

/// Continuation used by middleware to call the next service in the chain.
pub struct Next<'a, S>
where
S: Service + ?Sized,
{
pub struct Next<'a, S: Service + ?Sized> {
service: &'a S,
}

impl<'a, S> Next<'a, S>
where
S: Service + ?Sized,
{
/// Creates a new `Next` instance wrapping a reference to the given service.
///
///
/// ```ignore
/// use wireframe::middleware::{ServiceRequest, ServiceResponse, Next, Service};
/// ```
/// Service produced by the middleware.
type Wrapped: Service;
async fn transform(&self, service: S) -> Self::Wrapped;
/// let service = MyService::default();
/// let next = Next::new(&service);
type Wrapped: Service;
async fn transform(&self, service: S) -> Self::Wrapped;
impl<'a, S: Service + ?Sized> Next<'a, S> {
pub fn new(service: &'a S) -> Self {
Self { service }
}

/// Call the next service with the given request.
/// Call the next service with the provided request.
///
/// # Errors
///
/// Asynchronously invokes the next service in the middleware chain with the given request.
///
/// Returns the response from the wrapped service, or propagates any error produced.
///
/// # Examples
///
/// ```
/// # use your_crate::{ServiceRequest, ServiceResponse, Next, Service};
/// # struct DummyService;
/// # #[async_trait::async_trait]
/// # impl Service for DummyService {
/// # type Error = std::convert::Infallible;
/// # async fn call(&self, _req: ServiceRequest) -> Result<ServiceResponse, Self::Error> {
/// # Ok(ServiceResponse::default())
/// # }
/// # }
/// # let service = DummyService;
/// let next = Next::new(&service);
/// let req = ServiceRequest {};
/// let res = tokio_test::block_on(next.call(req));
/// assert!(res.is_ok());
/// ```
pub async fn call(&self, req: ServiceRequest) -> Result<ServiceResponse, S::Error> {
/// Propagates any error returned by the wrapped service.
pub async fn call(&mut self, req: ServiceRequest) -> Result<ServiceResponse, S::Error> {
self.service.call(req).await
}
}

/// Trait representing an asynchronous service.
#[async_trait]
pub trait Service: Send + Sync {
/// Error type returned by the service.
type Error: std::error::Error + Send + Sync + 'static;

/// Handle the incoming request and produce a response.
async fn call(&self, req: ServiceRequest) -> Result<ServiceResponse, Self::Error>;
}

/// Factory for wrapping services with middleware.
#[async_trait]
pub trait Transform<S>: Send + Sync
where
S: Service,
{
/// Wrapped service produced by the middleware.
type Output: Service;

/// Create a new middleware service wrapping `service`.
async fn transform(&self, service: S) -> Self::Output;
}
109 changes: 16 additions & 93 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ use crate::app::WireframeApp;

/// Tokio-based server for `WireframeApp` instances.
///
/// `WireframeServer` spawns a worker task per thread. Each worker
/// receives its own `WireframeApp` from the provided factory
/// closure. The server listens for a shutdown signal using
/// `tokio::signal::ctrl_c` and notifies all workers to stop
/// accepting new connections.
/// Each worker receives its own application instance from the provided
/// factory. A Ctrl+C signal triggers graceful shutdown across all
/// workers.
pub struct WireframeServer<F>
where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
Expand All @@ -28,71 +26,28 @@ impl<F> WireframeServer<F>
where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
{
/// Constructs a new `WireframeServer` using the provided application factory closure.
///
/// The server is initialised with a default worker count equal to the number of CPU cores.
///
/// ```no_run
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// let factory = || WireframeApp::new().unwrap();
/// let server = WireframeServer::new(factory);
/// ```
/// Create a new server using the given application factory.
#[must_use]
pub fn new(factory: F) -> Self {
Self {
factory,
listener: None,
workers: num_cpus::get().max(1),
}
}

/// Set the number of worker tasks to spawn for the server.
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
/// let factory = || WireframeApp::new().unwrap();
/// WireframeServer::new(factory)
/// .workers(4)
/// .bind("127.0.0.1:0".parse().unwrap())?
/// .run()
/// .await
/// }
/// A new `WireframeServer` instance with the updated worker count.
///
/// # Examples
///
/// ```ignore
/// let server = WireframeServer::new(factory).workers(4);
/// Sets the number of worker tasks for the server, ensuring at least one worker.
///
/// # Examples
///
/// ```ignore
/// let server = WireframeServer::new(factory).workers(4);
/// ```
#[must_use]
pub fn workers(mut self, count: usize) -> Self {
self.workers = count.max(1);
self
}

/// Bind the server to the given address and create a listener.
/// Bind the server to the provided socket address.
///
/// # Errors
///
/// Binds the server to the specified socket address and prepares it for accepting TCP connections.
///
/// Returns an error if binding to the address or configuring the listener fails.
///
/// # Arguments
///
/// * `addr` - The socket address to bind the server to.
///
/// # Returns
///
/// An updated server instance with the listener configured, or an `io::Error` if binding fails.
///
/// # Examples
///
/// ```ignore
/// use std::net::SocketAddr;
/// let server = WireframeServer::new(factory);
/// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
/// let server = server.bind(addr).expect("Failed to bind address");
/// ```
/// Returns any I/O error produced while creating the TCP listener.
pub fn bind(mut self, addr: SocketAddr) -> io::Result<Self> {
let std_listener = StdTcpListener::bind(addr)?;
std_listener.set_nonblocking(true)?;
Expand All @@ -103,46 +58,17 @@ where

/// Run the server until a shutdown signal is received.
///
/// Each worker accepts connections concurrently and would
/// process them using its `WireframeApp`. Connection handling
/// logic is not yet implemented.
///
/// # Errors
///
/// Returns an [`io::Error`] if accepting a connection fails.
/// Returns any I/O error encountered while accepting connections.
///
/// # Panics
///
/// Runs the server, accepting TCP connections concurrently until shutdown.
///
/// Spawns the configured number of worker tasks, each accepting incoming connections using a shared listener and a separate `WireframeApp` instance. The server listens for a Ctrl+C signal to initiate graceful shutdown, signalling all workers to stop accepting new connections. Waits for all worker tasks to complete before returning.
///
/// # Panics
///
/// Panics if called before `bind` has been invoked.
///
/// # Returns
///
/// Returns `Ok(())` when the server shuts down gracefully, or an `io::Error` if accepting connections fails during runtime.
///
/// # Examples
///
/// ```ignore
/// # use std::net::SocketAddr;
/// # use mycrate::{WireframeServer, WireframeApp};
/// # async fn run_server() -> std::io::Result<()> {
/// let factory = || WireframeApp::new();
/// let server = WireframeServer::new(factory)
/// .workers(4)
/// .bind("127.0.0.1:8080".parse::<SocketAddr>().unwrap())?;
/// server.run().await
/// # }
/// ```
/// Panics if called before [`bind`] has configured the listener.
pub async fn run(self) -> io::Result<()> {
let listener = self.listener.expect("`bind` must be called before `run`");
let (shutdown_tx, _) = broadcast::channel(16);

// Spawn worker tasks using Tokio's runtime.
let mut handles = Vec::with_capacity(self.workers);
for _ in 0..self.workers {
let mut shutdown_rx = shutdown_tx.subscribe();
Expand All @@ -155,7 +81,6 @@ where
tokio::select! {
res = listener.accept() => match res {
Ok((_stream, _)) => {
// TODO: hand off stream to `app`
delay = Duration::from_millis(10);
}
Err(e) => {
Expand All @@ -171,7 +96,6 @@ where
}));
}

// Wait for Ctrl+C or workers finishing.
let join_all = futures::future::join_all(handles);
tokio::pin!(join_all);

Expand All @@ -182,7 +106,6 @@ where
_ = &mut join_all => {}
}

// Ensure all workers have exited before returning.
join_all.await;
Ok(())
}
Expand Down