Skip to content

Add non-panicking poll option for Task #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ flaky_test = "0.1"
flume = { version = "0.10", default-features = false }
once_cell = "1"
smol = "1"

# rewrite dependencies to use the this version of async-task when running tests
[patch.crates-io]
async-task = { path = "." }
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ mod task;
mod utils;

pub use crate::runnable::{spawn, spawn_unchecked, Runnable};
pub use crate::task::Task;
pub use crate::task::{FallibleTask, Task};

#[cfg(feature = "std")]
pub use crate::runnable::spawn_local;
164 changes: 149 additions & 15 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,56 @@ impl<T> Task<T> {
pub async fn cancel(self) -> Option<T> {
let mut this = self;
this.set_canceled();
this.fallible().await
}

struct Fut<T>(Task<T>);

impl<T> Future for Fut<T> {
type Output = Option<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_task(cx)
}
}

Fut(this).await
/// Converts this task into a [`FallibleTask`].
///
/// Like [`Task`], a fallible task will poll the task's output until it is
/// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
/// dropped without being run. Resolves to the task's output when completed,
/// or [`None`] if it didn't complete.
///
/// # Examples
///
/// ```
/// use smol::{future, Executor};
/// use std::thread;
///
/// let ex = Executor::new();
///
/// // Spawn a future onto the executor.
/// let task = ex.spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// })
/// .fallible();
///
/// // Run an executor thread.
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
///
/// // Wait for the task's output.
/// assert_eq!(future::block_on(task), Some(3));
/// ```
///
/// ```
/// use smol::future;
///
/// // Schedule function which drops the runnable without running it.
/// let schedule = move |runnable| drop(runnable);
///
/// // Create a task with the future and the schedule function.
/// let (runnable, task) = async_task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// }, schedule);
/// runnable.schedule();
///
/// // Wait for the task's output.
/// assert_eq!(future::block_on(task.fallible()), None);
/// ```
pub fn fallible(self) -> FallibleTask<T> {
FallibleTask { task: self }
}

/// Puts the task in canceled state.
Expand Down Expand Up @@ -351,6 +389,12 @@ impl<T> Task<T> {
}
}
}

fn header(&self) -> &Header {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;
unsafe { &*header }
}
}

impl<T> Drop for Task<T> {
Expand All @@ -373,11 +417,101 @@ impl<T> Future for Task<T> {

impl<T> fmt::Debug for Task<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;

f.debug_struct("Task")
.field("header", unsafe { &(*header) })
.field("header", self.header())
.finish()
}
}

/// A spawned task with a fallible response.
///
/// This type behaves like [`Task`], however it produces an `Option<T>` when
/// polled and will return `None` if the executor dropped its
/// [`Runnable`][`super::Runnable`] without being run.
///
/// This can be useful to avoid the panic produced when polling the `Task`
/// future if the executor dropped its `Runnable`.
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
pub struct FallibleTask<T> {
task: Task<T>,
}

impl<T> FallibleTask<T> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```
/// use smol::{Executor, Timer};
/// use std::time::Duration;
///
/// let ex = Executor::new();
///
/// // Spawn a deamon future.
/// ex.spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .fallible()
/// .detach();
/// ```
pub fn detach(self) {
self.task.detach()
}

/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use smol::{future, Executor, Timer};
/// use std::thread;
/// use std::time::Duration;
///
/// let ex = Executor::new();
///
/// // Spawn a deamon future.
/// let task = ex.spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .fallible();
///
/// // Run an executor thread.
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
///
/// future::block_on(async {
/// Timer::after(Duration::from_secs(3)).await;
/// task.cancel().await;
/// });
/// ```
pub async fn cancel(self) -> Option<T> {
self.task.cancel().await
}
}

impl<T> Future for FallibleTask<T> {
type Output = Option<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.task.poll_task(cx)
}
}

impl<T> fmt::Debug for FallibleTask<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FallibleTask")
.field("header", self.task.header())
.finish()
}
}