Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infallible version of spawn_blocking #6796

Open
mineichen opened this issue Aug 22, 2024 · 3 comments
Open

Infallible version of spawn_blocking #6796

mineichen opened this issue Aug 22, 2024 · 3 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-task Module: tokio/task

Comments

@mineichen
Copy link

mineichen commented Aug 22, 2024

Is your feature request related to a problem? Please describe.
spawn_blocking requires it's FnOnce to be 'static. In practice, we often end up extracting parts of some &mut state and sending it back to the caller after the calculation completed. Unfortunately, tokio doesn't provide a method which guarantees the FnOnce to finish infallible. In practice, spawn_blocking almost never fails, but if it does, the state cannot be restored and the only path forward is to panic, which is bad in the infrastructure of a production app.

Describe the solution you'd like
Provide a method on the Runtime-Handle with the following signature:

pub fn spawn_blocking_infallible<F, R>(&self, func: F) -> impl Future<Output=R> 
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,

The only difference to spawn_blocking is, that the return-value is infallible, e.g. it will always be executed. Instead of impl Future, a dedicated struct should probbably be introduced.

Describe alternatives you've considered
This functionality could be implemented with a external thread-pool like rayon::spawn.

Another possibility would be to add a async fn spawn_blocking_with_state<T: ..., F : FnOnce(T) -> R + ..., R: ...>(state: T, func: F) -> Result<R, T>, which always returns the state in case it was not executed.

Yet another possibility would be to return the func: F in the Result<R, F>, so the application can choose a different strategy to execute the FnOnce. The FeatureRequest API proposal: tokio::task::try_spawn_blocking could be extended to return the FnOnce instead of NoAvailableThreads-Error

Additional context
The following code illustrates, how this allows for much more ergonomic calls in user_code, using std::thread to implement such a wrapper in a way that only panics, if FnOnce panics. If we had a spawn_blocking_infallible in tokio, the helpers could be implemented based on this functionality:

struct Foo(String);

impl Foo {
    async fn user_code(&mut self) {
        spawn_blocking_non_static_default(&mut self.0, |x| print!("{x}")).await;
        spawn_blocking_non_static(&mut self.0, String::new(), |x| print!("{x}")).await;
    }
}

async fn spawn_blocking_non_static_default<T: Send + Default + 'static, R: Send + 'static>(
    mut state: &mut T,
    x: impl FnOnce(&mut T) -> R + Send + 'static,
) -> R {
    spawn_blocking_non_static(state, Default::default(), x).await
}

async fn spawn_blocking_non_static<T: Send + 'static, R: Send + 'static>(
    mut state: &mut T,
    mut tmp: T,
    x: impl FnOnce(&mut T) -> R + Send + 'static,
) -> R {
    std::mem::swap(&mut tmp, &mut state);
    let (mut a, b) = spawn_blocking_infallible(move || {
        let r = x(&mut tmp);
        (tmp, r)
    })
    .await;
    std::mem::swap(&mut a, &mut state);
    b
}

async fn spawn_blocking_infallible<R: Send + 'static>(x: impl FnOnce() -> R + Send + 'static) -> R {
    let (send, recv) = tokio::sync::oneshot::channel();
    let t = std::thread::spawn(move || {
        send.send(x()).ok();
    });
    let r = recv.await;
    t.join().unwrap();
    r.expect("send is always called")
}
@mineichen mineichen added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Aug 22, 2024
@mineichen
Copy link
Author

mineichen commented Aug 22, 2024

There is a possibility, to implement it today, but it requires a lot of indirection:

async fn spawn_blocking_non_static_fallible<T: Send + 'static, R: Send + 'static>(
    mut state: &mut T,
    mut tmp: T,
    x: impl FnOnce(&mut T) -> R + Send + 'static,
) -> Result<R, JoinError> {
    struct ReturnOnDrop<T, S>(Option<(S, tokio::sync::oneshot::Sender<Result<(S, T), S>>)>);
    impl<T, S> Drop for ReturnOnDrop<T, S> {
        fn drop(&mut self) {
            if let Some((v, tx)) = self.0.take() {
                tx.send(Err(v)).ok();
            }
        }
    }

    let (tx, rx) = tokio::sync::oneshot::channel();
    std::mem::swap(&mut tmp, &mut state);
    let mut returner = ReturnOnDrop(Some((tmp, tx)));
    let r = tokio::task::spawn_blocking(move || {
        let (mut tmp, tx) = returner.0.take().expect("Is always initialized with Some() and this is a FnOnce()");
        let r = x(&mut tmp);
        tx.send(Ok((tmp, r))).ok();
    })
    .await;
    match rx.await.expect("Value is always sent (not forgotten or kept internally)") {
        Ok((mut s, r)) => {
            std::mem::swap(state, &mut s);
            Ok(r)
        }
        Err(mut s) => {
            std::mem::swap(state, &mut s);
            Err(r.unwrap_err())
        }
    }
}

@Darksonn Darksonn added the M-task Module: tokio/task label Aug 23, 2024
@Darksonn
Copy link
Contributor

I don't think we can guarantee that the spawn_blocking call will not fail. What do you want it to do if there's no runtime? What do you want it to do if spawning a new OS thread to run it on fails?

@mineichen
Copy link
Author

mineichen commented Aug 23, 2024

I thought about a UnboundChannel for the blocking ThreadPool, so the only way of a panic would be a OutOfMemory error. We don't really need anything from the tokio-Runtime to run FnOnce to completion.

In the current implementation, if a system is overloaded, many tasks (e.g. From Webservers) start, use resources and at some point run into a "OutOfThreads" error. The errornous Response will most likely be rescheduled by the client, causing even more pressure on the server. The backpressure, IMHO, should be in the infrastructure (e.g. Tower has the method poll_ready to cancel requests as soon as possible). Tokio should just publish metrics to indicate overuse, so entry-points into the async world (like web requests) can implement backpressure. Of course, this backpressure can be implemented today, so OutOfThreads is most likely never seen in practice, but still has to be handled. But I see this as a duty of the infrastructure, not the application code.
I claim, that today, the duty of handling such errors in most places is just postphoned to their callee. If you need the control, you can still use the regular spawn_blocking.

EDIT:
If there is no runtime, and therefore no queue or running thread which handles this FnOnces, we could do the same as if the future runs with another executor but tokio itself: panic!. This is a programmer error, nothing which might rarely happen in production

  1. Edit:
    Maybe we don't even need to guarante, that the lambda is executed even if the call to spawn_blocking happened, if the runtime was just terminated. If the future which awaits the spawn_blocking() will be dropped, so the FnOnce doesn't have to be scheduled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-task Module: tokio/task
Projects
None yet
Development

No branches or pull requests

2 participants