Skip to content

Conversation

leynos
Copy link
Owner

@leynos leynos commented Jun 15, 2025

Summary

  • pass broadcast receiver by value when spawning workers
  • update worker_task signature to take ownership of shutdown receiver

Testing

  • cargo clippy -- -D warnings
  • RUSTFLAGS="-D warnings" cargo test

https://chatgpt.com/codex/tasks/task_e_684f4b8d26a483228c8847d3d71f32c8

Summary by Sourcery

Refine shutdown handling by changing worker tasks to take ownership of their broadcast receiver, reducing channel capacity to 1, and updating recv logic to break on closed or lagged errors; add extensive tests for server creation, binding, worker configuration, preamble callbacks, shutdown behavior, extreme worker counts, multi-address binding, and debug guard; add rstest as a dev dependency.

Bug Fixes:

  • Correct shutdown receiver ownership in worker_task and handle closed or lagged recv errors to ensure proper shutdown.

Enhancements:

  • Reduce broadcast channel capacity to 1 for shutdown signaling.
  • Spawn worker tasks by passing each its shutdown receiver by value.

Build:

  • Add rstest as a dev-dependency in Cargo.toml.

Tests:

  • Add comprehensive tests covering server creation, binding, worker count, preamble callbacks, shutdown flows, extreme worker counts, multi-address binding, and debug compilation guard.

Summary by CodeRabbit

  • Tests
    • Added comprehensive unit and integration tests for server functionality, including worker management, shutdown behaviour, binding, and error handling.
  • Chores
    • Updated development dependencies to include the rstest crate.
    • Adjusted markdown linting configuration to disable a specific rule.
  • Refactor
    • Simplified server shutdown signal handling by updating worker task logic.

Copy link
Contributor

sourcery-ai bot commented Jun 15, 2025

Reviewer's Guide

This PR optimizes shutdown signaling by reducing the broadcast channel capacity to 1, refactors worker tasks to own and pattern-match on their shutdown receivers rather than borrow them, and introduces a comprehensive rstest-based test suite covering server binding, callbacks, worker management, and graceful shutdown.

Sequence diagram for worker task shutdown signal handling

sequenceDiagram
    participant Main as Main Server Task
    participant Worker as Worker Task
    participant Broadcast as broadcast::Receiver

    Main->>Broadcast: send shutdown signal
    Worker->>Broadcast: recv()
    alt Shutdown signal received
        Worker->>Worker: break loop and exit
    else Channel closed or lagged
        Worker->>Worker: break loop and exit
    end
Loading

File-Level Changes

Change Details Files
Reduce shutdown broadcast channel capacity
  • Set broadcast channel capacity to 1
  • Update comment explaining single shutdown message
src/server.rs
Refactor worker_task shutdown receiver ownership and handling
  • Pass shutdown receiver by value in spawn call
  • Change worker_task signature to take owned receiver
  • Enhance recv branch to match on Ok, Closed, or Lagged
src/server.rs
Add comprehensive rstest-based tests for server and shutdown behavior
  • Add tests module with unit and integration tests
  • Include fixtures and parametrized rstest tests covering binding and callbacks
  • Add rstest to dev-dependencies
src/server.rs
Cargo.toml

Possibly linked issues

  • #0: The PR changes the worker task signature to take ownership of the shutdown receiver by value, fixing the issue.
  • #684: The PR implements the issue's suggested solution by changing the worker_task signature and passing the shutdown receiver by value.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

coderabbitai bot commented Jun 15, 2025

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes update the server worker shutdown logic to pass ownership of the shutdown receiver to each worker task, eliminating the need for mutable references. A comprehensive suite of unit and integration tests for the server and worker behaviour is introduced. Additionally, a development dependency on rstest is added and a markdownlint rule is disabled.

Changes

File(s) Change Summary
.markdownlint-cli2.jsonc Disabled the "MD040" rule in the markdownlint configuration.
Cargo.toml Added rstest as a development dependency under [dev-dependencies].
src/server.rs Refactored worker task shutdown receiver to take ownership; reduced broadcast channel capacity; added extensive test module for server and workers.

Sequence Diagram(s)

sequenceDiagram
    participant Main
    participant Server
    participant Worker
    participant ShutdownChannel

    Main->>Server: run_with_shutdown()
    Server->>ShutdownChannel: subscribe()
    loop For each worker
        Server->>Worker: spawn(worker_task, owned shutdown_rx)
    end
    Main->>ShutdownChannel: send shutdown signal
    Worker->>ShutdownChannel: receives shutdown signal (owned receiver)
    Worker->>Worker: exits loop and shuts down
Loading

Possibly related issues

Possibly related PRs

Poem

A server’s song with workers anew,
Each owns its shutdown, no more to queue.
Tests abound, both swift and deep,
Ensuring the code runs steady, not cheap.
With markdown rules trimmed and crates in tow,
The rabbit hops on—let the server flow! 🐇

✨ Finishing Touches
🧪 Generate Unit Tests
  • Create PR with Unit Tests
  • Commit Unit Tests in branch codex/refactor-shutdown-receiver-ownership-in-workers
  • Post Copyable Unit Tests in Comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes - here's some feedback:

  • Verify that creating a fresh shutdown_tx.subscribe() for each worker still matches your intended shutdown semantics, since it differs from sharing a single receiver.
  • Consider inspecting each worker’s JoinHandle result after join_all to catch panics or errors instead of silently dropping them.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Verify that creating a fresh `shutdown_tx.subscribe()` for each worker still matches your intended shutdown semantics, since it differs from sharing a single receiver.
- Consider inspecting each worker’s `JoinHandle` result after `join_all` to catch panics or errors instead of silently dropping them.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@leynos leynos linked an issue Jun 15, 2025 that may be closed by this pull request
Copy link
Contributor

coderabbitai bot commented Jun 15, 2025

Note

Unit test generation is an Early Access feature. Expect some limitations and changes as we gather feedback and continue to improve it.


Generating unit tests... This may take up to 20 minutes.

Copy link
Contributor

coderabbitai bot commented Jun 15, 2025

Here are the copyable unit test edits:

Copyable Edits

src/server.rs

Insert at line 412:

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::{SocketAddr, Ipv4Addr};
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::sync::Mutex;
    use tokio::time::{timeout, Duration};
    use tokio::net::TcpStream;
    use bincode::{Encode, Decode};

    // Test preamble types
    #[derive(Debug, Clone, PartialEq, Encode, Decode)]
    struct TestPreamble {
        id: u32,
        message: String,
    }

    #[derive(Debug, Clone, PartialEq, Encode, Decode)]
    struct EmptyPreamble;

    // Helper to create a test factory
    fn test_factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static {
        || WireframeApp::default()
    }

    // Helper to get a free port
    fn get_free_port() -> SocketAddr {
        let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
        let listener = std::net::TcpListener::bind(addr).unwrap();
        listener.local_addr().unwrap()
    }

    #[test]
    fn test_new_server_creation() {
        let server = WireframeServer::new(test_factory());
        assert!(server.worker_count() >= 1);
        assert!(server.local_addr().is_none());
    }

    #[test]
    fn test_new_server_default_worker_count() {
        let server = WireframeServer::new(test_factory());
        let expected_workers = std::thread::available_parallelism()
            .map_or(1, std::num::NonZeroUsize::get)
            .max(1);
        assert_eq!(server.worker_count(), expected_workers);
    }

    #[test]
    fn test_workers_configuration() {
        let server = WireframeServer::new(test_factory());

        // Test setting various worker counts
        let server = server.workers(4);
        assert_eq!(server.worker_count(), 4);

        let server = server.workers(100);
        assert_eq!(server.worker_count(), 100);

        // Test that zero workers gets clamped to 1
        let server = server.workers(0);
        assert_eq!(server.worker_count(), 1);
    }

    #[test]
    fn test_with_preamble_type_conversion() {
        let server = WireframeServer::new(test_factory());
        let server_with_preamble = server.with_preamble::<TestPreamble>();
        assert_eq!(
            server_with_preamble.worker_count(),
            std::thread::available_parallelism()
                .map_or(1, std::num::NonZeroUsize::get)
                .max(1)
        );
    }

    #[test]
    fn test_bind_success() {
        let server = WireframeServer::new(test_factory());
        let addr = get_free_port();
        let server = server.bind(addr).expect("Failed to bind to address");
        assert!(server.local_addr().is_some());
        let bound_addr = server.local_addr().unwrap();
        assert_eq!(bound_addr.ip(), addr.ip());
    }

    #[test]
    fn test_bind_invalid_address() {
        let server = WireframeServer::new(test_factory());
        // Try to bind to an address that should fail (port 1 requires root privileges)
        let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1);
        let result = server.bind(addr);
        // This should succeed in tests since we're likely running as non-root
        // but we test that bind returns a result type
        assert!(result.is_ok() || result.is_err());
    }

    #[test]
    fn test_local_addr_before_bind() {
        let server = WireframeServer::new(test_factory());
        assert!(server.local_addr().is_none());
    }

    #[test]
    fn test_local_addr_after_bind() {
        let server = WireframeServer::new(test_factory());
        let addr = get_free_port();
        let server = server.bind(addr).expect("Failed to bind");
        let local_addr = server.local_addr();
        assert!(local_addr.is_some());
        assert_eq!(local_addr.unwrap().ip(), addr.ip());
    }

    #[tokio::test]
    async fn test_preamble_success_callback() {
        use std::sync::Arc;
        let callback_counter = Arc::new(AtomicUsize::new(0));
        let counter_clone = callback_counter.clone();

        let server = WireframeServer::new(test_factory())
            .with_preamble::<TestPreamble>()
            .on_preamble_decode_success(move |_preamble: &TestPreamble| {
                counter_clone.fetch_add(1, Ordering::SeqCst);
            });

        assert_eq!(callback_counter.load(Ordering::SeqCst), 0);
        // We can't easily test the callback execution without setting up a full server,
        // but we can verify the callback is stored
        assert!(server.on_preamble_success.is_some());
    }

    #[tokio::test]
    async fn test_preamble_failure_callback() {
        use std::sync::Arc;
        let callback_counter = Arc::new(AtomicUsize::new(0));
        let counter_clone = callback_counter.clone();

        let server = WireframeServer::new(test_factory())
            .with_preamble::<TestPreamble>()
            .on_preamble_decode_failure(move |_error: &DecodeError| {
                counter_clone.fetch_add(1, Ordering::SeqCst);
            });

        assert_eq!(callback_counter.load(Ordering::SeqCst), 0);
        assert!(server.on_preamble_failure.is_some());
    }

    #[tokio::test]
    async fn test_method_chaining() {
        let addr = get_free_port();
        let callback_invoked = Arc::new(AtomicUsize::new(0));
        let counter_clone = callback_invoked.clone();

        let server = WireframeServer::new(test_factory())
            .workers(2)
            .with_preamble::<TestPreamble>()
            .on_preamble_decode_success(move |_: &TestPreamble| {
                counter_clone.fetch_add(1, Ordering::SeqCst);
            })
            .on_preamble_decode_failure(|_: &DecodeError| {
                eprintln!("Preamble decode failed");
            })
            .bind(addr)
            .expect("Failed to bind");

        assert_eq!(server.worker_count(), 2);
        assert!(server.local_addr().is_some());
        assert!(server.on_preamble_success.is_some());
        assert!(server.on_preamble_failure.is_some());
    }

    #[tokio::test]
    #[should_panic(expected = "`bind` must be called before `run`")]
    async fn test_run_without_bind_panics() {
        let server = WireframeServer::new(test_factory());
        let _ = timeout(Duration::from_millis(100), server.run()).await;
    }

    #[tokio::test]
    #[should_panic(expected = "`bind` must be called before `run`")]
    async fn test_run_with_shutdown_without_bind_panics() {
        let server = WireframeServer::new(test_factory());
        let shutdown_future = async { tokio::time::sleep(Duration::from_millis(10)).await };
        let _ = timeout(Duration::from_millis(100), server.run_with_shutdown(shutdown_future)).await;
    }

    #[tokio::test]
    async fn test_run_with_immediate_shutdown() {
        let addr = get_free_port();
        let server = WireframeServer::new(test_factory())
            .workers(1)
            .bind(addr)
            .expect("Failed to bind");

        // Create a shutdown future that resolves immediately
        let shutdown_future = async { () };

        let result = timeout(
            Duration::from_millis(1000),
            server.run_with_shutdown(shutdown_future)
        ).await;

        assert!(result.is_ok());
        assert!(result.unwrap().is_ok());
    }

    #[tokio::test]
    async fn test_server_graceful_shutdown_with_ctrl_c_simulation() {
        let addr = get_free_port();
        let server = WireframeServer::new(test_factory())
            .workers(2)
            .bind(addr)
            .expect("Failed to bind");

        // Simulate shutdown signal after a short delay
        let shutdown_future = async {
            tokio::time::sleep(Duration::from_millis(50)).await;
        };

        let start = std::time::Instant::now();
        let result = timeout(
            Duration::from_millis(1000),
            server.run_with_shutdown(shutdown_future)
        ).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(result.unwrap().is_ok());
        // Should complete relatively quickly due to immediate shutdown
        assert!(elapsed < Duration::from_millis(500));
    }

    #[tokio::test]
    async fn test_multiple_worker_creation() {
        let addr = get_free_port();
        let call_count = Arc::new(AtomicUsize::new(0));
        let call_count_clone = call_count.clone();

        let factory = move || {
            call_count_clone.fetch_add(1, Ordering::SeqCst);
            WireframeApp::default()
        };

        let server = WireframeServer::new(factory)
            .workers(3)
            .bind(addr)
            .expect("Failed to bind");

        // Start server with quick shutdown
        let shutdown_future = async {
            tokio::time::sleep(Duration::from_millis(10)).await;
        };

        let result = timeout(
            Duration::from_millis(1000),
            server.run_with_shutdown(shutdown_future)
        ).await;

        assert!(result.is_ok());
        assert!(result.unwrap().is_ok());

        // We can't guarantee exact timing, but factory should be called for workers
        // Note: The factory might not be called immediately, depending on implementation
        // but we test that the server starts and shuts down correctly
    }

    #[test]
    fn test_server_configuration_persistence() {
        let factory = test_factory();
        let addr = get_free_port();

        let server = WireframeServer::new(factory)
            .workers(5);

        assert_eq!(server.worker_count(), 5);

        let server = server.bind(addr).expect("Failed to bind");
        assert_eq!(server.worker_count(), 5); // Should persist after bind
        assert!(server.local_addr().is_some());
    }

    #[test]
    fn test_preamble_callbacks_reset_on_type_change() {
        let server = WireframeServer::new(test_factory())
            .on_preamble_decode_success(|_: &()| {})
            .on_preamble_decode_failure(|_: &DecodeError| {});

        assert!(server.on_preamble_success.is_some());
        assert!(server.on_preamble_failure.is_some());

        // Changing preamble type should reset callbacks
        let server = server.with_preamble::<TestPreamble>();
        assert!(server.on_preamble_success.is_none());
        assert!(server.on_preamble_failure.is_none());
    }

    #[tokio::test]
    async fn test_worker_task_shutdown_signal() {
        // This is a more complex test that would require setting up mock streams
        // For now, we'll test that the worker task function exists and compiles
        let (tx, rx) = broadcast::channel(1);
        let listener = Arc::new(TcpListener::bind("127.0.0.1:0").await.unwrap());
        let factory = test_factory();

        // Send shutdown signal immediately
        let _ = tx.send(());

        // The worker task should exit quickly due to shutdown signal
        let task = tokio::spawn(worker_task::<_, ()>(
            listener,
            factory,
            None,
            None,
            rx,
        ));

        let result = timeout(Duration::from_millis(100), task).await;
        assert!(result.is_ok());
    }

    // Edge case tests
    #[test]
    fn test_extreme_worker_counts() {
        let server = WireframeServer::new(test_factory());

        // Test very large worker count
        let server = server.workers(usize::MAX);
        assert_eq!(server.worker_count(), usize::MAX);

        // Test that 0 still gets clamped to 1
        let server = server.workers(0);
        assert_eq!(server.worker_count(), 1);
    }

    #[tokio::test]
    async fn test_bind_to_multiple_addresses() {
        let server = WireframeServer::new(test_factory());
        let addr1 = get_free_port();
        let addr2 = get_free_port();

        let server = server.bind(addr1).expect("Failed to bind first address");
        let first_local_addr = server.local_addr().unwrap();

        // Binding again should replace the previous binding
        let server = server.bind(addr2).expect("Failed to bind second address");
        let second_local_addr = server.local_addr().unwrap();

        assert_ne!(first_local_addr.port(), second_local_addr.port());
        assert_eq!(second_local_addr.ip(), addr2.ip());
    }

    #[test]
    fn test_server_debug_compilation_guard() {
        // This test ensures the compile error only applies in release mode
        // In debug mode (which tests run in), this should compile fine
        assert!(cfg!(debug_assertions));
    }
}

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Prevent hotspot decline (1 hotspot with Code Duplication)
Enforce advisory code health rules (1 file with Code Duplication)

Gates Passed
4 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Prevent hotspot decline Violations Code Health Impact
server.rs 1 rule in this hotspot 10.00 → 9.39 Suppress
Enforce advisory code health rules Violations Code Health Impact
server.rs 1 advisory rule 10.00 → 9.39 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
.markdownlint-cli2.jsonc (1)

10-10: Reconsider disabling MD040 globally
Rule MD040 ensures fenced code blocks specify a language identifier, which is valuable for readability and syntax highlighting. Disabling it for all markdown may hide missing identifiers in new content. Consider scoping this override to specific files or revisiting the need for a full global disable.

Cargo.toml (1)

15-17: rstest addition looks good – consider pinning to a patch release.

Adding rstest under [dev-dependencies] is appropriate for the new test-suite.
If you want fully reproducible CI builds, consider locking the patch level as well ("0.18.2" at the time of writing). Otherwise cargo update may silently pull in a newer minor patch with different MSRV or behaviour.

src/server.rs (4)

288-302: Nice ownership fix – broadcast capacity can be reduced.

Passing shutdown_tx.subscribe() into every worker resolves the previous lifetime/borrow issues – good call.
Minor: the broadcast channel is created with a capacity of 16, yet the server only ever sends a single shutdown message. You can safely lower this to 1 to save a tiny amount of memory and silence clippy’s large_capacity lint if it’s enabled.

-        let (shutdown_tx, _) = broadcast::channel(16);
+        let (shutdown_tx, _) = broadcast::channel(1);

332-334: shutdown_rx.recv() result is discarded – handle lagged errors explicitly?

broadcast::Receiver::recv() returns Result<(), RecvError>.
Dropping the result is usually fine here, but if the channel ever overflows (Lagged) the worker will exit immediately without processing further connections. Consider matching on the result to ignore Lagged but still continue waiting for a real shutdown signal:

-            _ = shutdown_rx.recv() => break,
+            res = shutdown_rx.recv() => {
+                if matches!(res, Ok(()) | Err(broadcast::error::RecvError::Closed)) {
+                    break;
+                }
+                // Ignore lag and continue.
+            }

684-686: Avoid underscore-prefixed parameter that is still used (Clippy).

The parameter is consumed by the function’s type system even though the body ignores it, so Clippy flags used_underscore_binding.
Simplest fix: rename it without the leading underscore.

-async fn test_multiple_worker_creation(
-        _factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
+async fn test_multiple_worker_creation(
+        factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,

No other changes are required.

🧰 Tools
🪛 GitHub Actions: CI

[error] 684-684: Clippy error: used underscore-prefixed binding _factory. Consider removing the underscore prefix or add #[allow(clippy::used_underscore_binding)].


733-736: Use an explicit &() pattern instead of _ to silence Clippy.

-            .on_preamble_decode_success(|_: &()| {})
+            .on_preamble_decode_success(|&()| {})

This makes the intent explicit and clears the match_over_unit lint.

🧰 Tools
🪛 GitHub Actions: CI

[error] 734-734: Clippy error: matching over () is more explicit. Suggestion: use () instead of _ in .on_preamble_decode_success(|_: &()| {}).

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d3cbf7a and 3b7a806.

📒 Files selected for processing (3)
  • .markdownlint-cli2.jsonc (1 hunks)
  • Cargo.toml (1 hunks)
  • src/server.rs (3 hunks)
🧰 Additional context used
🪛 GitHub Actions: CI
src/server.rs

[error] 641-641: Clippy error: unneeded unit expression. Suggestion: remove the final () in let shutdown_future = async { () };.


[error] 684-684: Clippy error: used underscore-prefixed binding _factory. Consider removing the underscore prefix or add #[allow(clippy::used_underscore_binding)].


[error] 734-734: Clippy error: matching over () is more explicit. Suggestion: use () instead of _ in .on_preamble_decode_success(|_: &()| {}).

🔇 Additional comments (1)
.markdownlint-cli2.jsonc (1)

9-9: Proper JSONC syntax for additional config entries
Adding the comma after the closing brace of the MD013 block is required to introduce the next rule override. Everything looks correctly formatted.

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Prevent hotspot decline (1 hotspot with Code Duplication)
Enforce advisory code health rules (1 file with Code Duplication)

Gates Passed
4 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Prevent hotspot decline Violations Code Health Impact
server.rs 1 rule in this hotspot 10.00 → 9.39 Suppress
Enforce advisory code health rules Violations Code Health Impact
server.rs 1 advisory rule 10.00 → 9.39 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

@leynos
Copy link
Owner Author

leynos commented Jun 16, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jun 16, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/server.rs (1)

660-660: Needless () in async block still triggers Clippy

This is the same lint that was highlighted previously.

-let shutdown_future = async { () };
+let shutdown_future = async {};
🧰 Tools
🪛 GitHub Actions: CI

[error] 660-660: Clippy lint 'unused_unit': unneeded unit expression '()' found. Remove the final '()' to fix this issue.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b7a806 and 8fd0032.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • Cargo.toml (1 hunks)
  • src/server.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • Cargo.toml
🧰 Additional context used
🪛 GitHub Actions: CI
src/server.rs

[error] 660-660: Clippy lint 'unused_unit': unneeded unit expression '()' found. Remove the final '()' to fix this issue.

@leynos
Copy link
Owner Author

leynos commented Jun 16, 2025

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jun 16, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@leynos
Copy link
Owner Author

leynos commented Jun 16, 2025

@sourcery-ai review

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/server.rs (1)

305-317: Potential double-await on join_all may panic at runtime

join_all is first polled to completion by
_ = &mut join_all => {} inside the tokio::select!, and then awaited again immediately afterwards in the for res in join_all.await loop.
The JoinAll future is not Fused; a second poll after it has returned Ready can panic (see futures::stream::FuturesUnordered internals).

A minimal fix is to await the future only once and reuse the result:

 tokio::select! {
     () = shutdown => {
         let _ = shutdown_tx.send(());
-    }
-    _ = &mut join_all => {}
+        // wait for workers after the shutdown signal
+        _ = &mut join_all => {}
 }
 
-for res in join_all.await {
-    if let Err(e) = res {
-        eprintln!("worker task failed: {e}");
-    }
+for res in join_all.await {
+    if let Err(e) = res {
+        eprintln!("worker task failed: {e}");
+    }
 }

Or, capture the vector inside the select! branch and avoid a second await altogether.
Please refactor to guarantee the future is polled exactly once.

🧹 Nitpick comments (2)
src/server.rs (2)

286-288: Capacity comment is correct, but using 1 reduces observability

A channel size of 1 works because only a single shutdown is broadcast, yet it also discards any accidental extra notifications, making debugging harder. Consider broadcast::channel(self.workers.max(1)) to keep one message per worker without impacting memory meaningfully.


357-368: Match arm can be collapsed for clarity

broadcast::Receiver::recv only returns Ok(()), Err(Closed) or Err(Lagged(_)).
Since every variant is a shutdown condition you can simplify:

-res = shutdown_rx.recv() => {
-    if matches!(
-        res,
-        Ok(())
-            | Err(
-                broadcast::error::RecvError::Closed
-                    | broadcast::error::RecvError::Lagged(_),
-            )
-    ) {
-        break;
-    }
-},
+_ = shutdown_rx.recv() => break,

This shortens the branch and avoids an unnecessary pattern match.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8fd0032 and 5f563d6.

📒 Files selected for processing (1)
  • src/server.rs (4 hunks)
🔇 Additional comments (2)
src/server.rs (2)

334-335: Ownership fix looks good

Taking ownership of shutdown_rx removes the previous borrow-checker gymnastics and aligns with how broadcast::Receiver is designed to be used. No further action required.


666-666: Clippy warning resolved – empty async block is fine

The previous needless unit expression has been removed (async {}), satisfying Clippy.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes and they look great!


Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@leynos
Copy link
Owner Author

leynos commented Jun 16, 2025

@coderabbitai new github issue:

src/server.rs (1)

    305-317: Potential double-await on join_all may panic at runtime

    join_all is first polled to completion by
    _ = &mut join_all => {} inside the tokio::select!, and then awaited again immediately afterwards in the for res in join_all.await loop.
    The JoinAll future is not Fused; a second poll after it has returned Ready can panic (see futures::stream::FuturesUnordered internals).

    A minimal fix is to await the future only once and reuse the result:

     tokio::select! {
         () = shutdown => {
             let _ = shutdown_tx.send(());
    -    }
    -    _ = &mut join_all => {}
    +        // wait for workers after the shutdown signal
    +        _ = &mut join_all => {}
     }
     
    -for res in join_all.await {
    -    if let Err(e) = res {
    -        eprintln!("worker task failed: {e}");
    -    }
    +for res in join_all.await {
    +    if let Err(e) = res {
    +        eprintln!("worker task failed: {e}");
    +    }
     }

    Or, capture the vector inside the select! branch and avoid a second await altogether.
    Please refactor to guarantee the future is polled exactly once.

Copy link
Contributor

coderabbitai bot commented Jun 16, 2025

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

@leynos I'll create a GitHub issue for this potential double-await bug.


🏁 Script executed:

#!/bin/bash
gh issue create \
  --title "Potential double-await on join_all may cause runtime panic in server shutdown" \
  --body "## Description

A potential runtime panic has been identified in the server shutdown logic where the \`join_all\` future is polled twice:

1. First, it's polled to completion in \`tokio::select!\` with \`_ = &mut join_all => {}\`
2. Then it's awaited again in the subsequent for loop with \`for res in join_all.await\`

## Problem

The \`JoinAll\` future is not Fused, which means polling it after it has already returned Ready can cause a panic (related to \`futures::stream::FuturesUnordered\` internals).

## Location

- File: \`src/server.rs\`
- Lines: 305-317

## Context

This issue was identified during code review and needs to be addressed to prevent potential runtime panics during server shutdown.

## References

- PR: https://github.com/leynos/wireframe/pull/55
- Comment: https://github.com/leynos/wireframe/pull/55#issuecomment-$(gh pr view 55 --json comments --jq '.comments[-1].id')" \
  --assignee leynos

Length of output: 1090


Perfect! I've successfully created GitHub issue #58 for the potential double-await bug:

Issue created: #58

The issue has been:

  • ✅ Titled appropriately to describe the problem
  • ✅ Assigned to you (leynos)
  • ✅ Documented with clear description of the bug and its location
  • ✅ Referenced back to this PR and comment for context
  • ✅ Explained why this is a problem (JoinAll not being Fused can cause panics)

This will help track the fix for this potential runtime panic in the server shutdown logic.

@leynos leynos merged commit 41e5e25 into main Jun 16, 2025
5 checks passed
@leynos leynos deleted the codex/refactor-shutdown-receiver-ownership-in-workers branch June 16, 2025 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor: Pass shutdown receiver by value instead of mutable reference

1 participant