Skip to content

Commit

Permalink
avoid allocation and dynamic dispatch
Browse files Browse the repository at this point in the history
Small performance improvement. Don't allocate a dynamic blocking
operation handler when it isn't necessary.
  • Loading branch information
HippoBaro committed Jan 14, 2022
1 parent 8e815c3 commit 0d9a4aa
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
35 changes: 23 additions & 12 deletions glommio/src/sys/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use crate::{executor::bind_to_cpu_set, sys::SleepNotifier, PoolPlacement};
use crate::{
executor::bind_to_cpu_set,
sys::{InnerSource, SleepNotifier},
PoolPlacement,
};
use ahash::AHashMap;
use alloc::rc::Rc;
use crossbeam::channel::{Receiver, Sender};
use std::{
cell::{Cell, RefCell},
convert::TryFrom,
convert::{TryFrom, TryInto},
ffi::CString,
io,
io::ErrorKind,
os::unix::{ffi::OsStrExt, prelude::*},
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
thread::JoinHandle,
};
Expand Down Expand Up @@ -110,8 +116,6 @@ pub(super) struct BlockingThreadResp {
res: BlockingThreadResult,
}

pub(super) type BlockingThreadHandler = Box<dyn Fn(BlockingThreadResult)>;

struct BlockingThread(JoinHandle<()>);

impl BlockingThread {
Expand Down Expand Up @@ -142,7 +146,7 @@ impl BlockingThread {
pub(crate) struct BlockingThreadPool {
tx: Sender<BlockingThreadReq>,
rx: Receiver<BlockingThreadResp>,
waiters: RefCell<AHashMap<u64, BlockingThreadHandler>>,
sources: RefCell<AHashMap<u64, Pin<Rc<RefCell<InnerSource>>>>>,
requests: Cell<u64>,
_threads: Vec<BlockingThread>,
}
Expand Down Expand Up @@ -174,15 +178,15 @@ impl BlockingThreadPool {
_threads: threads,
tx: in_tx,
rx: out_rx,
waiters: RefCell::new(Default::default()),
sources: RefCell::new(Default::default()),
requests: Cell::new(0),
})
}

pub(super) fn push(
&self,
op: BlockingThreadOp,
action: BlockingThreadHandler,
source: Pin<Rc<RefCell<InnerSource>>>,
) -> io::Result<()> {
let id = self.requests.get();
self.requests.set(id.overflowing_add(1).0);
Expand All @@ -196,19 +200,26 @@ impl BlockingThreadPool {
)
})?;

let mut waiters = self.waiters.borrow_mut();
waiters.insert(id, action);
let mut waiters = self.sources.borrow_mut();
assert!(waiters.insert(id, source).is_none());
Ok(())
}

pub(super) fn flush(&self) -> usize {
let mut woke = 0;
let mut waiters = self.waiters.borrow_mut();
let mut waiters = self.sources.borrow_mut();
for x in self.rx.try_iter() {
let id = x.id;
let res = x.res;
let func = waiters.remove(&id).unwrap();
func(res);

let src = waiters.remove(&id).unwrap();
let mut inner_source = src.borrow_mut();
inner_source.wakers.result.replace(
res.try_into()
.expect("not a valid blocking operation's result"),
);
inner_source.wakers.wake_waiters();

woke += 1;
}
woke
Expand Down
14 changes: 1 addition & 13 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use rlimit::Resource;
use std::{
cell::{Cell, Ref, RefCell, RefMut},
collections::VecDeque,
convert::TryInto,
ffi::CStr,
fmt,
io,
Expand Down Expand Up @@ -1399,19 +1398,8 @@ impl Reactor {
}

fn enqueue_blocking_request(&self, source: &Source, op: BlockingThreadOp) {
let src = source.inner.clone();

self.blocking_thread
.push(
op,
Box::new(move |res| {
let mut inner_source = src.borrow_mut();
let res: io::Result<usize> = res.try_into().expect("not a syscall result!");

inner_source.wakers.result.replace(res);
inner_source.wakers.wake_waiters();
}),
)
.push(op, source.inner.clone())
.expect("failed to spawn blocking request");
}

Expand Down

0 comments on commit 0d9a4aa

Please sign in to comment.