Skip to content

Commit

Permalink
provide a way to configure the blocking thread pool placement strategies
Browse files Browse the repository at this point in the history
Add knobs in the local executor builders to configure how many blocking
threads to create in the pool and where to place them on the system. By
default, an executor spawns a single blocking thread in its pool and
collocates it with its thread.

It could be a good idea to loosen that default placement strategy to
bind the blocking thread to any CPUs on the same package as the executor
thread. This would maximize resource utilization and minimize memory
latency. However, there is no easy way to do that using our existing
Placement APIs.
  • Loading branch information
HippoBaro committed Jan 14, 2022
1 parent 3187c1a commit 8e815c3
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 32 deletions.
107 changes: 79 additions & 28 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ pub struct LocalExecutorBuilder {
preempt_timer_duration: Duration,
/// Whether to record the latencies of individual IO requests
record_io_latencies: bool,
/// The placement policy of the blocking thread pool
/// Defaults to one thread using the same placement strategy as the host
/// executor
blocking_thread_pool_placement: PoolPlacement,
}

impl LocalExecutorBuilder {
Expand All @@ -450,13 +454,14 @@ impl LocalExecutorBuilder {
/// how many and which CPUs to use.
pub fn new(placement: Placement) -> LocalExecutorBuilder {
LocalExecutorBuilder {
placement,
placement: placement.clone(),
spin_before_park: None,
name: String::from(DEFAULT_EXECUTOR_NAME),
io_memory: DEFAULT_IO_MEMORY,
ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
record_io_latencies: false,
blocking_thread_pool_placement: PoolPlacement::from(placement),
}
}

Expand Down Expand Up @@ -525,6 +530,18 @@ impl LocalExecutorBuilder {
self
}

/// The placement policy of the blocking thread pool.
/// Defaults to one thread using the same placement strategy as the host
/// executor.
#[must_use = "The builder must be built to be useful"]
pub fn blocking_thread_pool_placement(
mut self,
placement: PoolPlacement,
) -> LocalExecutorBuilder {
self.blocking_thread_pool_placement = placement;
self
}

/// Make a new [`LocalExecutor`] by taking ownership of the Builder, and
/// returns a [`Result`](crate::Result) to the executor.
/// # Examples
Expand All @@ -539,12 +556,15 @@ impl LocalExecutorBuilder {
let mut cpu_set_gen = placement::CpuSetGenerator::one(self.placement)?;
let mut le = LocalExecutor::new(
notifier,
self.io_memory,
self.ring_depth,
self.preempt_timer_duration,
self.record_io_latencies,
cpu_set_gen.next().cpu_binding(),
self.spin_before_park,
LocalExecutorConfig {
io_memory: self.io_memory,
ring_depth: self.ring_depth,
preempt_timer: self.preempt_timer_duration,
record_io_latencies: self.record_io_latencies,
spin_before_park: self.spin_before_park,
thread_pool_placement: self.blocking_thread_pool_placement,
},
)?;
le.init();
Ok(le)
Expand Down Expand Up @@ -608,18 +628,22 @@ impl LocalExecutorBuilder {
let preempt_timer_duration = self.preempt_timer_duration;
let spin_before_park = self.spin_before_park;
let record_io_latencies = self.record_io_latencies;
let blocking_thread_pool_placement = self.blocking_thread_pool_placement;

Builder::new()
.name(name)
.spawn(move || {
let mut le = LocalExecutor::new(
notifier,
io_memory,
ring_depth,
preempt_timer_duration,
record_io_latencies,
cpu_set_gen.next().cpu_binding(),
spin_before_park,
LocalExecutorConfig {
io_memory,
ring_depth,
preempt_timer: preempt_timer_duration,
record_io_latencies,
spin_before_park,
thread_pool_placement: blocking_thread_pool_placement,
},
)?;
le.init();
le.run(async move { Ok(fut_gen().await) })
Expand Down Expand Up @@ -679,6 +703,10 @@ pub struct LocalExecutorPoolBuilder {
placement: PoolPlacement,
/// Whether to record the latencies of individual IO requests
record_io_latencies: bool,
/// The placement policy of the blocking thread pools. Each executor has
/// its own pool. Defaults to 1 thread per pool, bound using the same
/// placement strategy as its host executor
blocking_thread_pool_placement: PoolPlacement,
}

impl LocalExecutorPoolBuilder {
Expand All @@ -694,8 +722,9 @@ impl LocalExecutorPoolBuilder {
io_memory: DEFAULT_IO_MEMORY,
ring_depth: DEFAULT_RING_SUBMISSION_DEPTH,
preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
placement,
placement: placement.clone(),
record_io_latencies: false,
blocking_thread_pool_placement: placement.shrink_to(1),
}
}

Expand Down Expand Up @@ -752,6 +781,15 @@ impl LocalExecutorPoolBuilder {
self
}

/// The placement policy of the blocking thread pool.
/// Defaults to one thread using the same placement strategy as the host
/// executor.
#[must_use = "The builder must be built to be useful"]
pub fn blocking_thread_pool_placement(mut self, placement: PoolPlacement) -> Self {
self.blocking_thread_pool_placement = placement;
self
}

/// Spawn a pool of [`LocalExecutor`]s in a new thread according to the
/// [`PoolPlacement`] policy, which is `Unbound` by default.
///
Expand Down Expand Up @@ -814,6 +852,7 @@ impl LocalExecutorPoolBuilder {
let preempt_timer_duration = self.preempt_timer_duration;
let spin_before_park = self.spin_before_park;
let record_io_latencies = self.record_io_latencies;
let blocking_thread_pool_placement = self.blocking_thread_pool_placement.clone();
let latch = Latch::clone(latch);

move || {
Expand All @@ -822,12 +861,15 @@ impl LocalExecutorPoolBuilder {
if latch.arrive_and_wait() == LatchState::Ready {
let mut le = LocalExecutor::new(
notifier,
io_memory,
ring_depth,
preempt_timer_duration,
record_io_latencies,
cpu_binding,
spin_before_park,
LocalExecutorConfig {
io_memory,
ring_depth,
preempt_timer: preempt_timer_duration,
record_io_latencies,
spin_before_park,
thread_pool_placement: blocking_thread_pool_placement,
},
)?;
le.init();
le.run(async move { Ok(fut_gen().await) })
Expand Down Expand Up @@ -903,6 +945,15 @@ pub(crate) fn maybe_activate(tq: Rc<RefCell<TaskQueue>>) {
})
}

pub struct LocalExecutorConfig {
pub io_memory: usize,
pub ring_depth: usize,
pub preempt_timer: Duration,
pub record_io_latencies: bool,
pub spin_before_park: Option<Duration>,
pub thread_pool_placement: PoolPlacement,
}

/// Single-threaded executor.
///
/// The executor can only be run on the thread that created it.
Expand Down Expand Up @@ -955,12 +1006,8 @@ impl LocalExecutor {

fn new(
notifier: Arc<sys::SleepNotifier>,
io_memory: usize,
ring_depth: usize,
preempt_timer: Duration,
record_io_latencies: bool,
cpu_binding: Option<impl IntoIterator<Item = usize>>,
mut spin_before_park: Option<Duration>,
mut config: LocalExecutorConfig,
) -> Result<LocalExecutor> {
// Linux's default memory policy is "local allocation" which allocates memory
// on the NUMA node containing the CPU where the allocation takes place.
Expand All @@ -972,20 +1019,21 @@ impl LocalExecutor {
// https://www.kernel.org/doc/html/latest/admin-guide/mm/numa_memory_policy.html
match cpu_binding {
Some(cpu_set) => bind_to_cpu_set(cpu_set)?,
None => spin_before_park = None,
None => config.spin_before_park = None,
}
let p = parking::Parker::new();
let queues = ExecutorQueues::new(preempt_timer, spin_before_park);
let queues = ExecutorQueues::new(config.preempt_timer, config.spin_before_park);
trace!(id = notifier.id(), "Creating executor");
Ok(LocalExecutor {
queues: Rc::new(RefCell::new(queues)),
parker: p,
id: notifier.id(),
reactor: Rc::new(reactor::Reactor::new(
notifier,
io_memory,
ring_depth,
record_io_latencies,
config.io_memory,
config.ring_depth,
config.record_io_latencies,
config.thread_pool_placement,
)?),
})
}
Expand Down Expand Up @@ -2392,12 +2440,13 @@ mod test {
};
use core::mem::MaybeUninit;
use futures::{
future::{join, join_all, poll_fn},
future::{join, join_all, poll_fn, JoinAll},
join,
};
use std::{
cell::Cell,
collections::HashMap,
iter::FromIterator,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -3455,6 +3504,7 @@ mod test {
Pending(Option<Waker>),
Ready,
}

// following four tests are regression ones for https://github.com/DataDog/glommio/issues/379.
// here we test against task reference count underflow
// test includes two scenarios, with join handles and with sleep, for each case
Expand Down Expand Up @@ -3538,6 +3588,7 @@ mod test {
timer::sleep(Duration::from_millis(1)).await;
});
}

#[test]
fn wake_refcount_underflow_with_join_handle() {
LocalExecutor::default().run(async {
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/executor/placement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl PoolPlacement {
/// If `len` is greater than the number of placements in the pool, this has
/// no effect.
pub fn shrink_to(self, count: usize) -> Self {
if count <= self.executor_count() {
if count > self.executor_count() {
return self;
}
match self {
Expand Down
4 changes: 3 additions & 1 deletion glommio/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
IoRequirements,
IoStats,
Latency,
PoolPlacement,
TaskQueueHandle,
};
use nix::poll::PollFlags;
Expand Down Expand Up @@ -198,8 +199,9 @@ impl Reactor {
io_memory: usize,
ring_depth: usize,
record_io_latencies: bool,
thread_pool_placement: PoolPlacement,
) -> io::Result<Reactor> {
let sys = sys::Reactor::new(notifier, io_memory, ring_depth)?;
let sys = sys::Reactor::new(notifier, io_memory, ring_depth, thread_pool_placement)?;
let (preempt_ptr_head, preempt_ptr_tail) = sys.preempt_pointers();
Ok(Reactor {
sys,
Expand Down
5 changes: 3 additions & 2 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,7 @@ impl Reactor {
notifier: Arc<sys::SleepNotifier>,
mut io_memory: usize,
ring_depth: usize,
thread_pool_placement: PoolPlacement,
) -> io::Result<Reactor> {
const MIN_MEMLOCK_LIMIT: u64 = 512 * 1024;
let (memlock_limit, _) = Resource::MEMLOCK.get()?;
Expand Down Expand Up @@ -1259,7 +1260,7 @@ impl Reactor {
latency_ring: RefCell::new(latency_ring),
poll_ring: RefCell::new(poll_ring),
timeout_src: Cell::new(None),
blocking_thread: BlockingThreadPool::new(PoolPlacement::Unbound(1), notifier.clone())?,
blocking_thread: BlockingThreadPool::new(thread_pool_placement, notifier.clone())?,
link_fd,
notifier,
eventfd_src,
Expand Down Expand Up @@ -1850,7 +1851,7 @@ mod tests {
#[test]
fn timeout_smoke_test() {
let notifier = sys::new_sleep_notifier().unwrap();
let reactor = Reactor::new(notifier, 0, 128).unwrap();
let reactor = Reactor::new(notifier, 0, 128, PoolPlacement::Unbound(1)).unwrap();

fn timeout_source(millis: u64) -> (Source, UringOpDescriptor) {
let source = Source::new(
Expand Down

0 comments on commit 8e815c3

Please sign in to comment.