Skip to content

Commit

Permalink
do not panic when failing to create a LocalExecutor
Browse files Browse the repository at this point in the history
Errors should propagate to the end users instead. Additionally,
allow returning a struct from the boot future. It is possible to do so
when using the pool builder but not when using the simple one, so this
brings them to parity.
  • Loading branch information
HippoBaro committed Jan 14, 2022
1 parent e8b9d1d commit 2489d16
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 17 deletions.
4 changes: 2 additions & 2 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn server(conns: usize) -> Result<()> {
s.await.unwrap()?;
}

client_handle.join().unwrap();
client_handle.join().unwrap().unwrap();
Ok(())
}

Expand Down Expand Up @@ -108,6 +108,6 @@ fn main() -> Result<()> {
//
// Now can you adapt it, so it uses multiple executors and all CPUs in your
// system?
server_handle.join().unwrap();
server_handle.join().unwrap().unwrap();
Ok(())
}
38 changes: 29 additions & 9 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use placement::{CpuSet, Placement, PoolPlacement};
use tracing::trace;

use std::{
any::Any,
cell::RefCell,
collections::{hash_map::Entry, BinaryHeap},
future::Future,
Expand Down Expand Up @@ -384,6 +385,27 @@ impl ExecutorQueues {
}
}

/// A wrapper around a [`std::thread::JoinHandle`]
#[derive(Debug)]
pub struct ExecutorJoinHandle<T: Send + 'static>(JoinHandle<io::Result<T>>);

impl<T: Send + 'static> ExecutorJoinHandle<T> {
/// See [`std::thread::JoinHandle::thread()`]
#[must_use]
pub fn thread(&self) -> &std::thread::Thread {
self.0.thread()
}

/// See [`std::thread::JoinHandle::join()`]
pub fn join(self) -> std::thread::Result<T> {
match self.0.join() {
Err(err) => Err(err),
Ok(Err(err)) => Err(Box::new(err) as Box<dyn Any + Send>),
Ok(Ok(res)) => Ok(res),
}
}
}

/// A factory that can be used to configure and create a [`LocalExecutor`].
///
/// Methods can be chained on it in order to configure it.
Expand Down Expand Up @@ -592,10 +614,11 @@ impl LocalExecutorBuilder {
/// [`LocalExecutor::run`]:struct.LocalExecutor.html#method.run
#[must_use = "This spawns an executor on a thread, so you may need to call \
`JoinHandle::join()` to keep the main thread alive"]
pub fn spawn<G, F, T>(self, fut_gen: G) -> Result<JoinHandle<()>>
pub fn spawn<G, F, T>(self, fut_gen: G) -> io::Result<ExecutorJoinHandle<T>>
where
G: FnOnce() -> F + Send + 'static,
F: Future<Output = T> + 'static,
T: Send + 'static,
{
let notifier = sys::new_sleep_notifier()?;
let name = format!("{}-{}", self.name, notifier.id());
Expand All @@ -617,14 +640,12 @@ impl LocalExecutorBuilder {
record_io_latencies,
cpu_set_gen.next().cpu_binding(),
spin_before_park,
)
.unwrap();
)?;
le.init();
le.run(async move {
fut_gen().await;
})
le.run(async move { Ok(fut_gen().await) })
})
.map_err(Into::into)
.map(ExecutorJoinHandle)
}
}

Expand Down Expand Up @@ -828,8 +849,7 @@ impl LocalExecutorPoolBuilder {
record_io_latencies,
cpu_binding,
spin_before_park,
)
.unwrap();
)?;
le.init();
le.run(async move { Ok(fut_gen().await) })
} else {
Expand Down Expand Up @@ -987,7 +1007,7 @@ impl LocalExecutor {
io_memory,
ring_depth,
record_io_latencies,
)),
)?),
})
}

Expand Down
1 change: 1 addition & 0 deletions glommio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ pub use crate::{
spawn_scoped_local_into,
yield_if_needed,
CpuSet,
ExecutorJoinHandle,
ExecutorProxy,
ExecutorStats,
LocalExecutor,
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/net/tcp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ mod tests {
.spawn(move || async move {
let receiver = addr_receiver.connect().await;
let addr = receiver.recv().await.unwrap();
TcpStream::connect(addr).await.unwrap()
TcpStream::connect(addr).await.unwrap();
})
.unwrap();

Expand Down
9 changes: 4 additions & 5 deletions glommio/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,18 @@ impl Reactor {
io_memory: usize,
ring_depth: usize,
record_io_latencies: bool,
) -> Reactor {
let sys = sys::Reactor::new(notifier, io_memory, ring_depth)
.expect("cannot initialize I/O event notification");
) -> io::Result<Reactor> {
let sys = sys::Reactor::new(notifier, io_memory, ring_depth)?;
let (preempt_ptr_head, preempt_ptr_tail) = sys.preempt_pointers();
Reactor {
Ok(Reactor {
sys,
timers: RefCell::new(Timers::new()),
shared_channels: RefCell::new(SharedChannels::new()),
io_scheduler: Rc::new(IoScheduler::new()),
record_io_latencies,
preempt_ptr_head,
preempt_ptr_tail: preempt_ptr_tail as _,
}
})
}

pub(crate) fn io_stats(&self) -> IoStats {
Expand Down

0 comments on commit 2489d16

Please sign in to comment.