diff --git a/examples/echo.rs b/examples/echo.rs index b12943c3d..7827ddc29 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -52,7 +52,7 @@ async fn server(conns: usize) -> Result<()> { s.await.unwrap()?; } - client_handle.join().unwrap(); + client_handle.join().unwrap().unwrap(); Ok(()) } @@ -108,6 +108,6 @@ fn main() -> Result<()> { // // Now can you adapt it, so it uses multiple executors and all CPUs in your // system? - server_handle.join().unwrap(); + server_handle.join().unwrap().unwrap(); Ok(()) } diff --git a/glommio/src/executor/mod.rs b/glommio/src/executor/mod.rs index a1884ec3b..da0160ca7 100644 --- a/glommio/src/executor/mod.rs +++ b/glommio/src/executor/mod.rs @@ -41,6 +41,7 @@ pub use placement::{CpuSet, Placement, PoolPlacement}; use tracing::trace; use std::{ + any::Any, cell::RefCell, collections::{hash_map::Entry, BinaryHeap}, future::Future, @@ -384,6 +385,27 @@ impl ExecutorQueues { } } +/// A wrapper around a [`std::thread::JoinHandle`] +#[derive(Debug)] +pub struct ExecutorJoinHandle(JoinHandle>); + +impl ExecutorJoinHandle { + /// See [`std::thread::JoinHandle::thread()`] + #[must_use] + pub fn thread(&self) -> &std::thread::Thread { + self.0.thread() + } + + /// See [`std::thread::JoinHandle::join()`] + pub fn join(self) -> std::thread::Result { + match self.0.join() { + Err(err) => Err(err), + Ok(Err(err)) => Err(Box::new(err) as Box), + Ok(Ok(res)) => Ok(res), + } + } +} + /// A factory that can be used to configure and create a [`LocalExecutor`]. /// /// Methods can be chained on it in order to configure it. @@ -592,10 +614,11 @@ impl LocalExecutorBuilder { /// [`LocalExecutor::run`]:struct.LocalExecutor.html#method.run #[must_use = "This spawns an executor on a thread, so you may need to call \ `JoinHandle::join()` to keep the main thread alive"] - pub fn spawn(self, fut_gen: G) -> Result> + pub fn spawn(self, fut_gen: G) -> io::Result> where G: FnOnce() -> F + Send + 'static, F: Future + 'static, + T: Send + 'static, { let notifier = sys::new_sleep_notifier()?; let name = format!("{}-{}", self.name, notifier.id()); @@ -617,14 +640,12 @@ impl LocalExecutorBuilder { record_io_latencies, cpu_set_gen.next().cpu_binding(), spin_before_park, - ) - .unwrap(); + )?; le.init(); - le.run(async move { - fut_gen().await; - }) + le.run(async move { Ok(fut_gen().await) }) }) .map_err(Into::into) + .map(ExecutorJoinHandle) } } @@ -828,8 +849,7 @@ impl LocalExecutorPoolBuilder { record_io_latencies, cpu_binding, spin_before_park, - ) - .unwrap(); + )?; le.init(); le.run(async move { Ok(fut_gen().await) }) } else { @@ -987,7 +1007,7 @@ impl LocalExecutor { io_memory, ring_depth, record_io_latencies, - )), + )?), }) } diff --git a/glommio/src/lib.rs b/glommio/src/lib.rs index 4764506d0..1d7c108f7 100644 --- a/glommio/src/lib.rs +++ b/glommio/src/lib.rs @@ -483,6 +483,7 @@ pub use crate::{ spawn_scoped_local_into, yield_if_needed, CpuSet, + ExecutorJoinHandle, ExecutorProxy, ExecutorStats, LocalExecutor, diff --git a/glommio/src/net/tcp_socket.rs b/glommio/src/net/tcp_socket.rs index 4e48fc768..bf72120d6 100644 --- a/glommio/src/net/tcp_socket.rs +++ b/glommio/src/net/tcp_socket.rs @@ -857,7 +857,7 @@ mod tests { .spawn(move || async move { let receiver = addr_receiver.connect().await; let addr = receiver.recv().await.unwrap(); - TcpStream::connect(addr).await.unwrap() + TcpStream::connect(addr).await.unwrap(); }) .unwrap(); diff --git a/glommio/src/reactor.rs b/glommio/src/reactor.rs index 7fc5985af..ba8ab0a6b 100644 --- a/glommio/src/reactor.rs +++ b/glommio/src/reactor.rs @@ -198,11 +198,10 @@ impl Reactor { io_memory: usize, ring_depth: usize, record_io_latencies: bool, - ) -> Reactor { - let sys = sys::Reactor::new(notifier, io_memory, ring_depth) - .expect("cannot initialize I/O event notification"); + ) -> io::Result { + let sys = sys::Reactor::new(notifier, io_memory, ring_depth)?; let (preempt_ptr_head, preempt_ptr_tail) = sys.preempt_pointers(); - Reactor { + Ok(Reactor { sys, timers: RefCell::new(Timers::new()), shared_channels: RefCell::new(SharedChannels::new()), @@ -210,7 +209,7 @@ impl Reactor { record_io_latencies, preempt_ptr_head, preempt_ptr_tail: preempt_ptr_tail as _, - } + }) } pub(crate) fn io_stats(&self) -> IoStats {