Skip to content

Commit

Permalink
threadpool: update to std::future (tokio-rs#1219)
Browse files Browse the repository at this point in the history
An initial pass at updating `tokio-threadpool` to `std::future`. The
codebase and tests both now run using `std::future` but the wake
mechanism is not ideal. Follow up work will be required to improve on
this.

Refs: tokio-rs#1200
  • Loading branch information
carllerche authored Jun 28, 2019
1 parent e4415d9 commit e7488d9
Show file tree
Hide file tree
Showing 17 changed files with 435 additions and 484 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
# "tokio-signal",
"tokio-sync",
"tokio-test",
# "tokio-threadpool",
"tokio-threadpool",
"tokio-timer",
"tokio-tcp",
# "tokio-tls",
Expand Down
11 changes: 6 additions & 5 deletions tokio-threadpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ publish = false

[dependencies]
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }
futures = "0.1.19"
tokio-sync = { version = "0.2.0", path = "../tokio-sync" }

arc-waker = { git = "https://github.com/tokio-rs/async" }
crossbeam-deque = "0.7.0"
crossbeam-queue = "0.1.0"
crossbeam-utils = "0.6.4"
Expand All @@ -35,7 +37,6 @@ log = "0.4"

[dev-dependencies]
env_logger = "0.5"

# For comparison benchmarks
futures-cpupool = "0.1.7"
threadpool = "1.7.1"
async-util = { git = "https://github.com/tokio-rs/async" }
tokio = { version = "0.2.0", path = "../tokio" }
tokio-test = { version = "0.2.0", path = "../tokio-test" }
37 changes: 0 additions & 37 deletions tokio-threadpool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,6 @@ threads.

[Documentation](https://docs.rs/tokio-threadpool/0.1.14/tokio_threadpool)

### Why not Rayon?

Rayon is designed to handle parallelizing single computations by breaking them
into smaller chunks. The scheduling for each individual chunk doesn't matter as
long as the root computation completes in a timely fashion. In other words,
Rayon does not provide any guarantees of fairness with regards to how each task
gets scheduled.

On the other hand, `tokio-threadpool` is a general purpose scheduler and
attempts to schedule each task fairly. This is the ideal behavior when
scheduling a set of unrelated tasks.

### Why not futures-cpupool?

It's 10x slower.

## Examples

```rust
use tokio_threadpool::ThreadPool;
use futures::{Future, lazy};
use futures::sync::oneshot;

pub fn main() {
let pool = ThreadPool::new();
let (tx, rx) = oneshot::channel();

pool.spawn(lazy(|| {
println!("Running on the pool");
tx.send("complete").map_err(|e| println!("send error, {}", e))
}));

println!("Result: {:?}", rx.wait());
pool.shutdown().wait().unwrap();
}
```

## License

This project is licensed under the [MIT license](LICENSE).
Expand Down
11 changes: 6 additions & 5 deletions tokio-threadpool/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::worker::Worker;
use futures::{try_ready, Poll};

use std::error::Error;
use std::fmt;
use std::task::Poll;

/// Error raised by `blocking`.
pub struct BlockingError {
Expand Down Expand Up @@ -116,15 +117,15 @@ pub struct BlockingError {
/// pool.shutdown_on_idle().wait().unwrap();
/// }
/// ```
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
pub fn blocking<F, T>(f: F) -> Poll<Result<T, BlockingError>>
where
F: FnOnce() -> T,
{
let res = Worker::with_current(|worker| {
let worker = match worker {
Some(worker) => worker,
None => {
return Err(BlockingError { _p: () });
return Poll::Ready(Err(BlockingError { _p: () }));
}
};

Expand All @@ -135,7 +136,7 @@ where
});

// If the transition cannot happen, exit early
try_ready!(res);
ready!(res)?;

// Currently in blocking mode, so call the inner closure
let ret = f();
Expand All @@ -148,7 +149,7 @@ where
});

// Return the result
Ok(ret.into())
Poll::Ready(Ok(ret))
}

impl fmt::Display for BlockingError {
Expand Down
13 changes: 11 additions & 2 deletions tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,30 @@

pub mod park;

macro_rules! ready {
($e:expr) => {
match $e {
::std::task::Poll::Ready(t) => t,
::std::task::Poll::Pending => return ::std::task::Poll::Pending,
}
};
}

mod blocking;
mod builder;
mod callback;
mod config;
mod notifier;
mod pool;
mod sender;
mod shutdown;
mod task;
mod thread_pool;
mod waker;
mod worker;

pub use crate::blocking::{blocking, BlockingError};
pub use crate::builder::Builder;
pub use crate::sender::Sender;
pub use crate::shutdown::Shutdown;
pub use crate::thread_pool::{SpawnHandle, ThreadPool};
pub use crate::thread_pool::ThreadPool;
pub use crate::worker::{Worker, WorkerId};
91 changes: 0 additions & 91 deletions tokio-threadpool/src/notifier.rs

This file was deleted.

8 changes: 6 additions & 2 deletions tokio-threadpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ use crate::task::{Blocking, Task};
use crate::worker::{self, Worker, WorkerId};
use crossbeam_deque::Injector;
use crossbeam_utils::CachePadded;
use futures::Poll;

use log::{debug, error, trace};
use rand;
use std::cell::Cell;
use std::num::Wrapping;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::sync::{Arc, Weak};
use std::task::Poll;
use std::thread;

#[derive(Debug)]
Expand Down Expand Up @@ -219,7 +220,10 @@ impl Pool {
}
}

pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), crate::BlockingError> {
pub fn poll_blocking_capacity(
&self,
task: &Arc<Task>,
) -> Poll<Result<(), crate::BlockingError>> {
self.blocking.poll_blocking_capacity(task)
}

Expand Down
39 changes: 11 additions & 28 deletions tokio-threadpool/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::pool::{self, Lifecycle, Pool, MAX_FUTURES};
use crate::task::Task;
use futures::{future, Future};

use tokio_executor::{self, SpawnError};

use log::trace;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::sync::Arc;
use tokio_executor::{self, SpawnError};

/// Submit futures to the associated thread pool for execution.
///
Expand Down Expand Up @@ -75,10 +78,10 @@ impl Sender {
/// ```
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
where
F: Future<Item = (), Error = ()> + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
let mut s = self;
tokio_executor::Executor::spawn(&mut s, Box::new(future))
tokio_executor::Executor::spawn(&mut s, Box::pin(future))
}

/// Logic to prepare for spawning
Expand Down Expand Up @@ -128,7 +131,7 @@ impl tokio_executor::Executor for Sender {

fn spawn(
&mut self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
future: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), SpawnError> {
let mut s = &*self;
tokio_executor::Executor::spawn(&mut s, future)
Expand All @@ -154,7 +157,7 @@ impl<'a> tokio_executor::Executor for &'a Sender {

fn spawn(
&mut self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
future: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), SpawnError> {
self.prepare_for_spawn()?;

Expand All @@ -175,34 +178,14 @@ impl<'a> tokio_executor::Executor for &'a Sender {

impl<T> tokio_executor::TypedExecutor<T> for Sender
where
T: Future<Item = (), Error = ()> + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
fn status(&self) -> Result<(), tokio_executor::SpawnError> {
tokio_executor::Executor::status(self)
}

fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
tokio_executor::Executor::spawn(self, Box::new(future))
}
}

impl<T> future::Executor<T> for Sender
where
T: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
if let Err(e) = tokio_executor::Executor::status(self) {
let kind = if e.is_at_capacity() {
future::ExecuteErrorKind::NoCapacity
} else {
future::ExecuteErrorKind::Shutdown
};

return Err(future::ExecuteError::new(kind, future));
}

let _ = self.spawn(future);
Ok(())
tokio_executor::Executor::spawn(self, Box::pin(future))
}
}

Expand Down
Loading

0 comments on commit e7488d9

Please sign in to comment.