diff --git a/glommio/src/executor/mod.rs b/glommio/src/executor/mod.rs index 7234881d8..31ada7e64 100644 --- a/glommio/src/executor/mod.rs +++ b/glommio/src/executor/mod.rs @@ -440,6 +440,10 @@ pub struct LocalExecutorBuilder { preempt_timer_duration: Duration, /// Whether to record the latencies of individual IO requests record_io_latencies: bool, + /// The placement policy of the blocking thread pool + /// Defaults to one thread using the same placement strategy as the host + /// executor + blocking_thread_pool_placement: PoolPlacement, } impl LocalExecutorBuilder { @@ -450,13 +454,14 @@ impl LocalExecutorBuilder { /// how many and which CPUs to use. pub fn new(placement: Placement) -> LocalExecutorBuilder { LocalExecutorBuilder { - placement, + placement: placement.clone(), spin_before_park: None, name: String::from(DEFAULT_EXECUTOR_NAME), io_memory: DEFAULT_IO_MEMORY, ring_depth: DEFAULT_RING_SUBMISSION_DEPTH, preempt_timer_duration: DEFAULT_PREEMPT_TIMER, record_io_latencies: false, + blocking_thread_pool_placement: PoolPlacement::from(placement), } } @@ -525,6 +530,18 @@ impl LocalExecutorBuilder { self } + /// The placement policy of the blocking thread pool. + /// Defaults to one thread using the same placement strategy as the host + /// executor. + #[must_use = "The builder must be built to be useful"] + pub fn blocking_thread_pool_placement( + mut self, + placement: PoolPlacement, + ) -> LocalExecutorBuilder { + self.blocking_thread_pool_placement = placement; + self + } + /// Make a new [`LocalExecutor`] by taking ownership of the Builder, and /// returns a [`Result`](crate::Result) to the executor. /// # Examples @@ -539,12 +556,15 @@ impl LocalExecutorBuilder { let mut cpu_set_gen = placement::CpuSetGenerator::one(self.placement)?; let mut le = LocalExecutor::new( notifier, - self.io_memory, - self.ring_depth, - self.preempt_timer_duration, - self.record_io_latencies, cpu_set_gen.next().cpu_binding(), - self.spin_before_park, + LocalExecutorConfig { + io_memory: self.io_memory, + ring_depth: self.ring_depth, + preempt_timer: self.preempt_timer_duration, + record_io_latencies: self.record_io_latencies, + spin_before_park: self.spin_before_park, + thread_pool_placement: self.blocking_thread_pool_placement, + }, )?; le.init(); Ok(le) @@ -608,18 +628,22 @@ impl LocalExecutorBuilder { let preempt_timer_duration = self.preempt_timer_duration; let spin_before_park = self.spin_before_park; let record_io_latencies = self.record_io_latencies; + let blocking_thread_pool_placement = self.blocking_thread_pool_placement; Builder::new() .name(name) .spawn(move || { let mut le = LocalExecutor::new( notifier, - io_memory, - ring_depth, - preempt_timer_duration, - record_io_latencies, cpu_set_gen.next().cpu_binding(), - spin_before_park, + LocalExecutorConfig { + io_memory, + ring_depth, + preempt_timer: preempt_timer_duration, + record_io_latencies, + spin_before_park, + thread_pool_placement: blocking_thread_pool_placement, + }, )?; le.init(); le.run(async move { Ok(fut_gen().await) }) @@ -679,6 +703,10 @@ pub struct LocalExecutorPoolBuilder { placement: PoolPlacement, /// Whether to record the latencies of individual IO requests record_io_latencies: bool, + /// The placement policy of the blocking thread pools. Each executor has + /// its own pool. Defaults to 1 thread per pool, bound using the same + /// placement strategy as its host executor + blocking_thread_pool_placement: PoolPlacement, } impl LocalExecutorPoolBuilder { @@ -694,8 +722,9 @@ impl LocalExecutorPoolBuilder { io_memory: DEFAULT_IO_MEMORY, ring_depth: DEFAULT_RING_SUBMISSION_DEPTH, preempt_timer_duration: DEFAULT_PREEMPT_TIMER, - placement, + placement: placement.clone(), record_io_latencies: false, + blocking_thread_pool_placement: placement.shrink_to(1), } } @@ -752,6 +781,15 @@ impl LocalExecutorPoolBuilder { self } + /// The placement policy of the blocking thread pool. + /// Defaults to one thread using the same placement strategy as the host + /// executor. + #[must_use = "The builder must be built to be useful"] + pub fn blocking_thread_pool_placement(mut self, placement: PoolPlacement) -> Self { + self.blocking_thread_pool_placement = placement; + self + } + /// Spawn a pool of [`LocalExecutor`]s in a new thread according to the /// [`PoolPlacement`] policy, which is `Unbound` by default. /// @@ -814,6 +852,7 @@ impl LocalExecutorPoolBuilder { let preempt_timer_duration = self.preempt_timer_duration; let spin_before_park = self.spin_before_park; let record_io_latencies = self.record_io_latencies; + let blocking_thread_pool_placement = self.blocking_thread_pool_placement.clone(); let latch = Latch::clone(latch); move || { @@ -822,12 +861,15 @@ impl LocalExecutorPoolBuilder { if latch.arrive_and_wait() == LatchState::Ready { let mut le = LocalExecutor::new( notifier, - io_memory, - ring_depth, - preempt_timer_duration, - record_io_latencies, cpu_binding, - spin_before_park, + LocalExecutorConfig { + io_memory, + ring_depth, + preempt_timer: preempt_timer_duration, + record_io_latencies, + spin_before_park, + thread_pool_placement: blocking_thread_pool_placement, + }, )?; le.init(); le.run(async move { Ok(fut_gen().await) }) @@ -903,6 +945,15 @@ pub(crate) fn maybe_activate(tq: Rc>) { }) } +pub struct LocalExecutorConfig { + pub io_memory: usize, + pub ring_depth: usize, + pub preempt_timer: Duration, + pub record_io_latencies: bool, + pub spin_before_park: Option, + pub thread_pool_placement: PoolPlacement, +} + /// Single-threaded executor. /// /// The executor can only be run on the thread that created it. @@ -955,12 +1006,8 @@ impl LocalExecutor { fn new( notifier: Arc, - io_memory: usize, - ring_depth: usize, - preempt_timer: Duration, - record_io_latencies: bool, cpu_binding: Option>, - mut spin_before_park: Option, + mut config: LocalExecutorConfig, ) -> Result { // Linux's default memory policy is "local allocation" which allocates memory // on the NUMA node containing the CPU where the allocation takes place. @@ -972,10 +1019,10 @@ impl LocalExecutor { // https://www.kernel.org/doc/html/latest/admin-guide/mm/numa_memory_policy.html match cpu_binding { Some(cpu_set) => bind_to_cpu_set(cpu_set)?, - None => spin_before_park = None, + None => config.spin_before_park = None, } let p = parking::Parker::new(); - let queues = ExecutorQueues::new(preempt_timer, spin_before_park); + let queues = ExecutorQueues::new(config.preempt_timer, config.spin_before_park); trace!(id = notifier.id(), "Creating executor"); Ok(LocalExecutor { queues: Rc::new(RefCell::new(queues)), @@ -983,9 +1030,10 @@ impl LocalExecutor { id: notifier.id(), reactor: Rc::new(reactor::Reactor::new( notifier, - io_memory, - ring_depth, - record_io_latencies, + config.io_memory, + config.ring_depth, + config.record_io_latencies, + config.thread_pool_placement, )?), }) } @@ -2392,12 +2440,13 @@ mod test { }; use core::mem::MaybeUninit; use futures::{ - future::{join, join_all, poll_fn}, + future::{join, join_all, poll_fn, JoinAll}, join, }; use std::{ cell::Cell, collections::HashMap, + iter::FromIterator, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -3455,6 +3504,7 @@ mod test { Pending(Option), Ready, } + // following four tests are regression ones for https://github.com/DataDog/glommio/issues/379. // here we test against task reference count underflow // test includes two scenarios, with join handles and with sleep, for each case @@ -3538,6 +3588,7 @@ mod test { timer::sleep(Duration::from_millis(1)).await; }); } + #[test] fn wake_refcount_underflow_with_join_handle() { LocalExecutor::default().run(async { diff --git a/glommio/src/executor/placement/mod.rs b/glommio/src/executor/placement/mod.rs index 864d5e68a..5420538a7 100644 --- a/glommio/src/executor/placement/mod.rs +++ b/glommio/src/executor/placement/mod.rs @@ -190,7 +190,7 @@ impl PoolPlacement { /// If `len` is greater than the number of placements in the pool, this has /// no effect. pub fn shrink_to(self, count: usize) -> Self { - if count <= self.executor_count() { + if count > self.executor_count() { return self; } match self { diff --git a/glommio/src/reactor.rs b/glommio/src/reactor.rs index 4cd4297d8..62792ad3b 100644 --- a/glommio/src/reactor.rs +++ b/glommio/src/reactor.rs @@ -46,6 +46,7 @@ use crate::{ IoRequirements, IoStats, Latency, + PoolPlacement, TaskQueueHandle, }; use nix::poll::PollFlags; @@ -198,8 +199,9 @@ impl Reactor { io_memory: usize, ring_depth: usize, record_io_latencies: bool, + thread_pool_placement: PoolPlacement, ) -> io::Result { - let sys = sys::Reactor::new(notifier, io_memory, ring_depth)?; + let sys = sys::Reactor::new(notifier, io_memory, ring_depth, thread_pool_placement)?; let (preempt_ptr_head, preempt_ptr_tail) = sys.preempt_pointers(); Ok(Reactor { sys, diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index 9c8f7ab72..e439ab45a 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -1183,6 +1183,7 @@ impl Reactor { notifier: Arc, mut io_memory: usize, ring_depth: usize, + thread_pool_placement: PoolPlacement, ) -> io::Result { const MIN_MEMLOCK_LIMIT: u64 = 512 * 1024; let (memlock_limit, _) = Resource::MEMLOCK.get()?; @@ -1259,7 +1260,7 @@ impl Reactor { latency_ring: RefCell::new(latency_ring), poll_ring: RefCell::new(poll_ring), timeout_src: Cell::new(None), - blocking_thread: BlockingThreadPool::new(PoolPlacement::Unbound(1), notifier.clone())?, + blocking_thread: BlockingThreadPool::new(thread_pool_placement, notifier.clone())?, link_fd, notifier, eventfd_src, @@ -1850,7 +1851,7 @@ mod tests { #[test] fn timeout_smoke_test() { let notifier = sys::new_sleep_notifier().unwrap(); - let reactor = Reactor::new(notifier, 0, 128).unwrap(); + let reactor = Reactor::new(notifier, 0, 128, PoolPlacement::Unbound(1)).unwrap(); fn timeout_source(millis: u64) -> (Source, UringOpDescriptor) { let source = Source::new(