Skip to content

Add Selector::send_or_else to get the message back if not sent#180

Open
rumpuslabs wants to merge 2 commits into
zesterer:masterfrom
rumpuslabs:selector-unsend
Open

Add Selector::send_or_else to get the message back if not sent#180
rumpuslabs wants to merge 2 commits into
zesterer:masterfrom
rumpuslabs:selector-unsend

Conversation

@rumpuslabs
Copy link
Copy Markdown
Contributor

All the send methods in flume return an error type that gives you your original message back if it was not sent, except for SelectError. This makes sense because SelectError::Timeout might be caused by multiple send operations all timing out, or none if the Selector only has receives.

I wanted to wrap my existing flume-based API with a hidden interruptor channel, to allow blocking waits to be terminated early. So I was transforming what was previously calls to Sender::send_* into Selector::new().send(...).recv(&INTERRUPT_CHANNEL).wait_*, but this meant I could no longer get my original message back in the case of timeouts or interruptions.

My first attempt to fix this involved wrapping the message in an Arc<Mutex<Option<T>>> to allow me to claw it back if it wasn't claimed by the receiving end of the channel, which ran into an annoyance where I had to clone() the Arc in the Selector::send() closure because it was FnMut instead of FnOnce. The first commit in this PR fixes this issue. Since all FnMut closures are subtypes of FnOnce this is a non-breaking change to flume's API.

I didn't like the extra cost of the Arc<Mutex<...>> so I hypothesised a way to get my message back via a different closure if the send branch wasn't taken. That is the second commit in this PR (also a non-breaking change, adding a new method to Selector).

This allowed me to write:

    fn send_blocking(
        &mut self,
        msg: S,
        timeout: Option<Duration>,
    ) -> Result<(), SendTimeoutError<S, E>> {
        enum SendResult {
            Sent,
            Interrupted,
        }

        let mut unsent_msg = None;

        let selector = flume::Selector::new()
            .send_or_else(
                &self.channel.sender,
                msg,
                |res| {
                    res.and(Ok(SendResult::Sent))
                        .or_else(|flume::SendError(msg)| Err(SendTimeoutError::Closed(msg)))
                },
                |msg| unsent_msg = Some(msg),
            )
            .recv(&INTERRUPT_CHANNEL, |res| {
                res.and(Ok(SendResult::Interrupted)).or_else(|e| match e {
                    flume::RecvError::Disconnected => {
                        panic!("static channel should not get disconnected")
                    }
                })
            });

        let res = match selector.wait_timeout(timeout.unwrap_or(Duration::MAX)) {
            Ok(res) => res?,
            Err(flume::select::SelectError::Timeout) => {
                return Err(SendTimeoutError::Timeout(unsent_msg.take().unwrap()));
            }
        };

        match res {
            SendResult::Sent => Ok(()),
            SendResult::Interrupted => {
                Err(SendTimeoutError::Interrupted(unsent_msg.take().unwrap()))
            }
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant