Skip to content

Commit

Permalink
use a thread pool for blocking operations
Browse files Browse the repository at this point in the history
So far, we have had a single thread per executor used for blocking
operations. This has been fine to issue the odd syscall that couldn't go
into the ring, but now that we have `spawn_blocking,` we've opened
pandora's box. Users will submit all sorts of things in there, including
long-running jobs that could stall the entire system.

This commit introduces a static thread pool per executor instead. The
pool is static for now, meaning the threads are created once and live
for as long as the host executor does. Later on, we may want to be
smarter and automatically up/downscale the pool depending on usage.
  • Loading branch information
HippoBaro committed Jan 14, 2022
1 parent 2c81b94 commit 3187c1a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 71 deletions.
47 changes: 46 additions & 1 deletion glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl TaskQueue {
}
}

fn bind_to_cpu_set(cpus: impl IntoIterator<Item = usize>) -> Result<()> {
pub(crate) fn bind_to_cpu_set(cpus: impl IntoIterator<Item = usize>) -> Result<()> {
let mut cpuset = nix::sched::CpuSet::new();
for cpu in cpus {
cpuset.set(cpu).map_err(|e| to_io_error!(e))?;
Expand Down Expand Up @@ -3638,4 +3638,49 @@ mod test {
assert!(coop.as_millis() >= 100 && coop.as_millis() < 150);
});
}

#[test]
fn blocking_function_parallelism() {
LocalExecutorBuilder::new(Placement::Unbound)
.blocking_thread_pool_placement(PoolPlacement::Unbound(4))
.spawn(|| async {
let started = Instant::now();
let mut blocking = vec![];

for _ in 0..5 {
blocking.push(executor().spawn_blocking(enclose!((started) move || {
let now = Instant::now();
while now.elapsed() < Duration::from_millis(100) {}
started.elapsed()
})));
}

// we created 5 blocking jobs each taking 100ms but our thread pool only has 4
// threads. SWe expect one of those jobs to take twice as long as the others.

let mut ts = JoinAll::from_iter(blocking.into_iter()).await;
assert_eq!(ts.len(), 5);

ts.sort_unstable();
for ts in ts.iter().take(4) {
assert!(ts.as_millis() >= 100 && ts.as_millis() < 150);
}
assert!(ts[4].as_millis() >= 200 && ts[4].as_millis() < 250);
})
.unwrap()
.join()
.unwrap()
.unwrap();
}

#[test]
fn blocking_pool_invalid_placement() {
let ret = LocalExecutorBuilder::new(Placement::Unbound)
.blocking_thread_pool_placement(PoolPlacement::Unbound(0))
.spawn(|| async {})
.unwrap()
.join()
.unwrap();
assert!(ret.is_err());
}
}
134 changes: 83 additions & 51 deletions glommio/src/sys/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::sys::{create_eventfd, read_eventfd, write_eventfd, SleepNotifier};
use crossbeam::queue::ArrayQueue;
use crate::{executor::bind_to_cpu_set, sys::SleepNotifier, PoolPlacement};
use ahash::AHashMap;
use crossbeam::channel::{Receiver, Sender};
use std::{
cell::{Cell, RefCell},
collections::BTreeMap,
convert::TryFrom,
ffi::CString,
io,
io::ErrorKind,
os::unix::{ffi::OsStrExt, prelude::*},
path::{Path, PathBuf},
sync::Arc,
thread::JoinHandle,
};

// So hard to copy/clone io::Error, plus need to send between threads. Best to
Expand Down Expand Up @@ -110,35 +112,99 @@ pub(super) struct BlockingThreadResp {

pub(super) type BlockingThreadHandler = Box<dyn Fn(BlockingThreadResult)>;

pub(super) struct BlockingThread {
eventfd: RawFd,
queue: Arc<ArrayQueue<BlockingThreadReq>>,
responses: Arc<ArrayQueue<BlockingThreadResp>>,
waiters: RefCell<BTreeMap<u64, BlockingThreadHandler>>,
struct BlockingThread(JoinHandle<()>);

impl BlockingThread {
pub(super) fn new(
reactor_sleep_notifier: Arc<SleepNotifier>,
rx: Arc<Receiver<BlockingThreadReq>>,
tx: Arc<Sender<BlockingThreadResp>>,
bindings: Option<impl IntoIterator<Item = usize> + Send + 'static>,
) -> Self {
Self(std::thread::spawn(move || {
if let Some(bindings) = bindings {
bind_to_cpu_set(bindings).expect("failed to bind blocking thread");
}
while let Ok(el) = rx.recv() {
let res = el.op.execute();
let id = el.id;
let resp = BlockingThreadResp { id, res };

if tx.send(resp).is_err() {
panic!("failed to send response");
}
reactor_sleep_notifier.notify(false);
}
}))
}
}

pub(crate) struct BlockingThreadPool {
tx: Sender<BlockingThreadReq>,
rx: Receiver<BlockingThreadResp>,
waiters: RefCell<AHashMap<u64, BlockingThreadHandler>>,
requests: Cell<u64>,
_threads: Vec<BlockingThread>,
}

impl BlockingThread {
pub(super) fn push(&self, op: BlockingThreadOp, action: Box<dyn Fn(BlockingThreadResult)>) {
impl BlockingThreadPool {
pub(crate) fn new(
placement: PoolPlacement,
sleep_notifier: Arc<SleepNotifier>,
) -> io::Result<Self> {
let (in_tx, in_rx) = crossbeam::channel::unbounded();
let (out_tx, out_rx) = crossbeam::channel::unbounded();
let in_rx = Arc::new(in_rx);
let out_tx = Arc::new(out_tx);

let thread_count = placement.executor_count();
let mut placements = placement.generate_cpu_set()?;
let mut threads = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
let bindings = placements.next().cpu_binding();
threads.push(BlockingThread::new(
sleep_notifier.clone(),
in_rx.clone(),
out_tx.clone(),
bindings,
));
}

Ok(Self {
_threads: threads,
tx: in_tx,
rx: out_rx,
waiters: RefCell::new(Default::default()),
requests: Cell::new(0),
})
}

pub(super) fn push(
&self,
op: BlockingThreadOp,
action: BlockingThreadHandler,
) -> io::Result<()> {
let id = self.requests.get();
self.requests.set(id + 1);
self.requests.set(id.overflowing_add(1).0);

let req = BlockingThreadReq { op, id };

if self.queue.push(req).is_err() {
panic!("syscall queue full!");
}
self.tx.send(req).map_err(|_| {
io::Error::new(
ErrorKind::WouldBlock,
"failed to enqueue blocking operation",
)
})?;

let mut waiters = self.waiters.borrow_mut();
waiters.insert(id, action);

write_eventfd(self.eventfd);
Ok(())
}

pub(super) fn flush(&self) -> usize {
let mut woke = 0;
let mut waiters = self.waiters.borrow_mut();
while let Some(x) = self.responses.pop() {
for x in self.rx.try_iter() {
let id = x.id;
let res = x.res;
let func = waiters.remove(&id).unwrap();
Expand All @@ -147,38 +213,4 @@ impl BlockingThread {
}
woke
}

pub(super) fn new(reactor_sleep_notifier: Arc<SleepNotifier>) -> Self {
let eventfd = create_eventfd().unwrap();
let queue = Arc::new(ArrayQueue::<BlockingThreadReq>::new(1024));
let responses = Arc::new(ArrayQueue::<BlockingThreadResp>::new(1024));
let waiters = RefCell::new(BTreeMap::new());
let requests = Cell::new(0);

let tq = queue.clone();
let rsp = responses.clone();

std::thread::spawn(move || {
loop {
read_eventfd(eventfd);
while let Some(el) = tq.pop() {
let res = el.op.execute();
let id = el.id;
let resp = BlockingThreadResp { id, res };

if rsp.push(resp).is_err() {
panic!("Could not add response to syscall response queue");
}
}
reactor_sleep_notifier.notify(false);
}
});
BlockingThread {
eventfd,
queue,
responses,
waiters,
requests,
}
}
}
1 change: 1 addition & 0 deletions glommio/src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub(crate) fn write_eventfd(eventfd: RawFd) {
assert_eq!(ret, 8);
}

#[allow(dead_code)]
pub(crate) fn read_eventfd(eventfd: RawFd) {
let mut buf = [1u64; 1];
let ret = syscall!(read(eventfd, buf.as_mut_ptr() as _, 8)).unwrap();
Expand Down
41 changes: 22 additions & 19 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
iou::sqe::{FsyncFlags, SockAddrStorage, StatxFlags, StatxMode, SubmissionFlags, TimeoutFlags},
sys::{
self,
blocking::{BlockingThread, BlockingThreadOp},
blocking::{BlockingThreadOp, BlockingThreadPool},
dma_buffer::{BufferStorage, DmaBuffer},
DirectIo,
EnqueuedSource,
Expand All @@ -50,6 +50,7 @@ use crate::{
IoRequirements,
IoStats,
Latency,
PoolPlacement,
RingIoStats,
TaskQueueHandle,
};
Expand Down Expand Up @@ -1132,7 +1133,7 @@ pub(crate) struct Reactor {
eventfd_src: Source,
source_map: Rc<RefCell<SourceMap>>,

syscall_thread: BlockingThread,
blocking_thread: BlockingThreadPool,

rings_depth: usize,
}
Expand Down Expand Up @@ -1258,7 +1259,7 @@ impl Reactor {
latency_ring: RefCell::new(latency_ring),
poll_ring: RefCell::new(poll_ring),
timeout_src: Cell::new(None),
syscall_thread: BlockingThread::new(notifier.clone()),
blocking_thread: BlockingThreadPool::new(PoolPlacement::Unbound(1), notifier.clone())?,
link_fd,
notifier,
eventfd_src,
Expand Down Expand Up @@ -1396,24 +1397,26 @@ impl Reactor {
self.queue_standard_request(source, op);
}

fn blocking_syscall(&self, source: &Source, op: BlockingThreadOp) {
fn enqueue_blocking_request(&self, source: &Source, op: BlockingThreadOp) {
let src = source.inner.clone();

self.syscall_thread.push(
op,
Box::new(move |res| {
let mut inner_source = src.borrow_mut();
let res: io::Result<usize> = res.try_into().expect("not a syscall result!");
self.blocking_thread
.push(
op,
Box::new(move |res| {
let mut inner_source = src.borrow_mut();
let res: io::Result<usize> = res.try_into().expect("not a syscall result!");

inner_source.wakers.result.replace(res);
inner_source.wakers.wake_waiters();
}),
);
inner_source.wakers.result.replace(res);
inner_source.wakers.wake_waiters();
}),
)
.expect("failed to spawn blocking request");
}

pub(crate) fn truncate(&self, source: &Source, size: u64) {
let op = BlockingThreadOp::Truncate(source.raw(), size as _);
self.blocking_syscall(source, op);
self.enqueue_blocking_request(source, op);
}

pub(crate) fn rename(&self, source: &Source) {
Expand All @@ -1423,7 +1426,7 @@ impl Reactor {
};

let op = BlockingThreadOp::Rename(old_path, new_path);
self.blocking_syscall(source, op);
self.enqueue_blocking_request(source, op);
}

pub(crate) fn create_dir(&self, source: &Source, mode: libc::c_int) {
Expand All @@ -1433,7 +1436,7 @@ impl Reactor {
};

let op = BlockingThreadOp::CreateDir(path, mode);
self.blocking_syscall(source, op);
self.enqueue_blocking_request(source, op);
}

pub(crate) fn remove_file(&self, source: &Source) {
Expand All @@ -1443,14 +1446,14 @@ impl Reactor {
};

let op = BlockingThreadOp::Remove(path);
self.blocking_syscall(source, op);
self.enqueue_blocking_request(source, op);
}

pub(crate) fn run_blocking(&self, source: &Source, f: Box<dyn FnOnce() + Send + 'static>) {
assert!(matches!(&*source.source_type(), SourceType::BlockingFn));

let op = BlockingThreadOp::Fn(f);
self.blocking_syscall(source, op);
self.enqueue_blocking_request(source, op);
}

pub(crate) fn close(&self, source: &Source) {
Expand Down Expand Up @@ -1669,7 +1672,7 @@ impl Reactor {
}

pub(crate) fn flush_syscall_thread(&self) -> usize {
self.syscall_thread.flush()
self.blocking_thread.flush()
}

pub(crate) fn preempt_pointers(&self) -> (*const u32, *const u32) {
Expand Down

0 comments on commit 3187c1a

Please sign in to comment.