Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/metadata_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl FrameMetadata for HeaderSerializer {
struct Ping;

#[derive(bincode::Decode, bincode::Encode)]
#[expect(dead_code, reason = "used only in documentation example")]
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Revert the lint attribute change.

This change violates the coding guidelines which explicitly forbid #[allow] attributes. Use the narrowly scoped #[expect(...)] with a descriptive reason instead.

Apply this diff to fix the lint attribute:

-#[allow(dead_code)]
+#[expect(dead_code, reason = "used only in documentation example")]
🤖 Prompt for AI Agents
In examples/metadata_routing.rs at line 63, the use of #[allow(dead_code)]
violates coding guidelines that forbid #[allow] attributes. Replace
#[allow(dead_code)] with a narrowly scoped #[expect(dead_code, reason = "explain
why this code is expected to be unused")] attribute, providing a clear
descriptive reason for the expectation.

struct Pong;

#[tokio::main]
Expand Down
129 changes: 89 additions & 40 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl Drop for ActiveConnection {

/// Return the current number of active connections.
#[must_use]
pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) }
pub fn active_connection_count() -> u64 {
ACTIVE_CONNECTIONS.load(Ordering::Relaxed)
}

use crate::{
hooks::{ConnectionContext, ProtocolHooks},
Expand Down Expand Up @@ -117,6 +119,18 @@ pub struct ConnectionActor<F, E> {
peer_addr: Option<SocketAddr>,
}

/// Context for processing frames during actor execution.
struct ProcessContext<'a, F> {
state: &'a mut ActorState,
out: &'a mut Vec<F>,
}

impl<'a, F> ProcessContext<'a, F> {
fn new(state: &'a mut ActorState, out: &'a mut Vec<F>) -> Self {
Self { state, out }
}
}

impl<F, E> ConnectionActor<F, E>
where
F: FrameLike,
Expand Down Expand Up @@ -188,14 +202,20 @@ where
}

/// Replace the fairness configuration.
pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; }
pub fn set_fairness(&mut self, fairness: FairnessConfig) {
self.fairness = fairness;
}

/// Set or replace the current streaming response.
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) { self.response = stream; }
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) {
self.response = stream;
}

/// Get a clone of the shutdown token used by the actor.
#[must_use]
pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() }
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown.clone()
}

/// Drive the actor until all sources are exhausted or shutdown is triggered.
///
Expand Down Expand Up @@ -283,11 +303,12 @@ where
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
match self.next_event(state).await {
Event::Shutdown => self.process_shutdown(state),
Event::High(res) => self.process_high(res, state, out),
Event::Low(res) => self.process_low(res, state, out),
Event::Response(res) => self.process_response(res, state, out)?,
let mut ctx = ProcessContext::new(state, out);
match self.next_event(ctx.state).await {
Event::Shutdown => self.process_shutdown(ctx.state),
Event::High(res) => self.process_high(res, &mut ctx),
Event::Low(res) => self.process_low(res, &mut ctx),
Event::Response(res) => self.process_response(res, ctx.state, ctx.out)?,
Event::Idle => {}
}

Expand All @@ -301,23 +322,39 @@ where
}

/// Handle the result of polling the high-priority queue.
fn process_high(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
if let Some(frame) = res {
self.process_frame_common(frame, out);
self.after_high(out, state);
} else {
Self::handle_closed_receiver(&mut self.high_rx, state);
self.reset_high_counter();
}
fn process_high(&mut self, res: Option<F>, ctx: &mut ProcessContext<F>) {
self.process_push(res, ctx, Self::after_high, |this, state| {
Self::handle_closed_receiver(&mut this.high_rx, state);
this.reset_high_counter();
});
}

/// Handle the result of polling the low-priority queue.
fn process_low(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
fn process_low(&mut self, res: Option<F>, ctx: &mut ProcessContext<F>) {
self.process_push(
res,
ctx,
|this, _, _| this.after_low(),
|this, state| Self::handle_closed_receiver(&mut this.low_rx, state),
);
}

/// Helper to process push queue results with queue-specific callbacks.
fn process_push<OnSome, OnNone>(
&mut self,
res: Option<F>,
ctx: &mut ProcessContext<F>,
on_some: OnSome,
on_none: OnNone,
) where
OnSome: FnOnce(&mut Self, &mut Vec<F>, &mut ActorState),
OnNone: FnOnce(&mut Self, &mut ActorState),
{
if let Some(frame) = res {
self.process_frame_common(frame, out);
self.after_low();
self.process_frame_common(frame, ctx.out);
on_some(self, ctx.out, ctx.state);
} else {
Self::handle_closed_receiver(&mut self.low_rx, state);
on_none(self, ctx.state);
}
}

Expand Down Expand Up @@ -374,19 +411,19 @@ where
self.high_start = Some(Instant::now());
}

if self.should_yield_to_low_priority()
&& let Some(rx) = &mut self.low_rx
{
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
if self.should_yield_to_low_priority() {
if let Some(rx) = &mut self.low_rx {
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
}
}
}
}
Expand All @@ -405,7 +442,9 @@ where
}

/// Reset counters after processing a low-priority frame.
fn after_low(&mut self) { self.reset_high_counter(); }
fn after_low(&mut self) {
self.reset_high_counter();
}

/// Clear the burst counter and associated timestamp.
fn reset_high_counter(&mut self) {
Expand Down Expand Up @@ -448,11 +487,15 @@ where

/// Await cancellation on the provided shutdown token.
#[inline]
async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; }
async fn wait_shutdown(token: CancellationToken) {
token.cancelled_owned().await;
}

/// Receive the next frame from a push queue.
#[inline]
async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> { rx.recv().await }
async fn recv_push(rx: &mut mpsc::Receiver<F>) -> Option<F> {
rx.recv().await
}

/// Poll `f` if `opt` is `Some`, returning `None` otherwise.
#[expect(
Expand Down Expand Up @@ -535,11 +578,17 @@ impl ActorState {
}

/// Returns `true` while the actor is actively processing sources.
fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) }
fn is_active(&self) -> bool {
matches!(self.run_state, RunState::Active)
}

/// Returns `true` once shutdown has begun.
fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) }
fn is_shutting_down(&self) -> bool {
matches!(self.run_state, RunState::ShuttingDown)
}

/// Returns `true` when all sources have finished.
fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) }
fn is_done(&self) -> bool {
matches!(self.run_state, RunState::Finished)
}
}
21 changes: 15 additions & 6 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ impl std::fmt::Display for PushConfigError {

impl std::error::Error for PushConfigError {}

/// Shared state for [`PushHandle`] clones.
///
/// Holds the per-priority send channels, optional rate limiter and
/// optional dead letter queue. Wrapped in an [`Arc`] so handles can be
/// cloned cheaply.
pub(crate) struct PushHandleInner<F> {
high_prio_tx: mpsc::Sender<F>,
low_prio_tx: mpsc::Sender<F>,
Expand All @@ -97,7 +102,9 @@ pub(crate) struct PushHandleInner<F> {
pub struct PushHandle<F>(Arc<PushHandleInner<F>>);

impl<F: FrameLike> PushHandle<F> {
pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self { Self(arc) }
pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self {
Self(arc)
}

/// Internal helper to push a frame with the requested priority.
///
Expand Down Expand Up @@ -253,7 +260,9 @@ impl<F: FrameLike> PushHandle<F> {
}

/// Downgrade to a `Weak` reference for storage in a registry.
pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> { Arc::downgrade(&self.0) }
pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> {
Arc::downgrade(&self.0)
}
}

/// Receiver ends of the push queues stored by the connection actor.
Expand Down Expand Up @@ -382,10 +391,10 @@ impl<F: FrameLike> PushQueues<F> {
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
if let Some(r) = rate
&& (r == 0 || r > MAX_PUSH_RATE)
{
return Err(PushConfigError::InvalidRate(r));
if let Some(r) = rate {
if r == 0 || r > MAX_PUSH_RATE {
return Err(PushConfigError::InvalidRate(r));
}
}
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
Expand Down
14 changes: 8 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ where
/// ```
#[inline]
#[must_use]
pub const fn worker_count(&self) -> usize { self.workers }
pub const fn worker_count(&self) -> usize {
self.workers
}

/// Get the socket address the server is bound to, if available.
#[must_use]
Expand Down Expand Up @@ -469,10 +471,10 @@ async fn process_stream<F, T>(
{
match read_preamble::<_, T>(&mut stream).await {
Ok((preamble, leftover)) => {
if let Some(handler) = on_success.as_ref()
&& let Err(e) = handler(&preamble, &mut stream).await
{
eprintln!("preamble callback error: {e}");
if let Some(handler) = on_success.as_ref() {
if let Err(e) = handler(&preamble, &mut stream).await {
eprintln!("preamble callback error: {e}");
}
}
let stream = RewindStream::new(leftover, stream);
// Hand the connection to the application for processing.
Expand Down Expand Up @@ -520,7 +522,7 @@ mod tests {

/// Test helper preamble carrying no data.
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
#[expect(dead_code, reason = "test helper for unused preamble type")]
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Revert the lint attribute change.

This change violates the coding guidelines which explicitly forbid #[allow] attributes. Use the narrowly scoped #[expect(...)] with a descriptive reason instead.

Apply this diff to fix the lint attribute:

-#[allow(dead_code)]
+#[expect(dead_code, reason = "test helper struct")]
🤖 Prompt for AI Agents
In src/server.rs at line 525, revert the use of the #[allow(dead_code)]
attribute as it violates coding guidelines. Replace it with a narrowly scoped
#[expect(dead_code, reason = "...")] attribute, providing a clear descriptive
reason for why the code is expected to be dead. This maintains compliance with
guidelines while documenting the rationale.

struct EmptyPreamble;

#[fixture]
Expand Down