Skip to content

Commit

Permalink
Fix Unbound background thread pool affinity (#607)
Browse files Browse the repository at this point in the history
* fix CI: Bump minimum version to 1.65

backtrace picked up a new addr2line in a patch release that depends on
1.65.

* Fix Cargo warning about resolver

```
warning: some crates are on edition 2021 which defaults to `resolver = "2"`, but virtual workspaces default to `resolver = "1"`
note: to keep the current resolver, specify `workspace.resolver = "1"` in the workspace root's manifest
note: to use the edition 2021 resolver, specify `workspace.resolver = "2"` in the workspace root's manifest
```

* Fix clippy warnings in newer Rust compilers.

* Upgrade to bitflags 2

Resolves clippy errors that are now caught: bitflags/bitflags#373

* Add test highlighting the problem

* Fix background thread pool unbound placement

Any threads we spawn inherit our affinity. This means that background
thread pools for any executors that don't have an affinity of "Unbound"
will only ever run on the executor's CPU rather than distributing work
across many CPUs. This seems undesirable given that the background
thread pool placement can be set independent of the executor it's
associated with. Additionally, the background thread pool placement by
default uses the pool placement so this has no impact on background
threads that didn't have a custom placement set (& if the placement was
set, it was likely the wrong thing happening).
  • Loading branch information
vlovich authored Oct 6, 2023
1 parent d93c460 commit e3bdf40
Show file tree
Hide file tree
Showing 24 changed files with 127 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
cat << EOF > "run-gha-workflow.sh"
PATH=$PATH:/usr/share/rust/.cargo/bin
echo "`nproc` CPU(s) available"
rustup install 1.58
rustup install 1.65
rustup show
rustup default stable
cargo install cargo-sort
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ members = [
"examples",
"glommio",
]
resolver = "2"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ an [introductory article.](https://www.datadoghq.com/blog/engineering/introducin

## Supported Rust Versions

Glommio is built against the latest stable release. The minimum supported version is 1.58. The current Glommio version
Glommio is built against the latest stable release. The minimum supported version is 1.65. The current Glommio version
is not guaranteed to build on Rust versions earlier than the minimum supported version.

## Supported Linux kernels
Expand Down
21 changes: 9 additions & 12 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@ fn main() {
.detach();

// What would you write if there were no enclose! macro.
let second =
glommio::spawn_local(|_left: Rc<RefCell<bool>>, right: Rc<RefCell<bool>>| -> _ {
async move {
loop {
if !(*(right.borrow())) {
println!("right");
*(right.borrow_mut()) = true
}
glommio::yield_if_needed().await;
}
let second = glommio::spawn_local(async move {
loop {
if !(*(right.borrow())) {
println!("right");
*(right.borrow_mut()) = true
}
}(left.clone(), right.clone()))
.detach();
glommio::yield_if_needed().await;
}
})
.detach();

futures::join!(first, second);
});
Expand Down
4 changes: 2 additions & 2 deletions glommio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ keywords = ["linux", "rust", "async", "iouring", "thread-per-core"]
categories = ["asynchronous", "concurrency", "os", "filesystem", "network-programming"]
readme = "../README.md"
# This is also documented in the README.md under "Supported Rust Versions"
rust-version = "1.58"
rust-version = "1.65"

[dependencies]
ahash = "0.7"
backtrace = { version = "0.3" }
bitflags = "1.3"
bitflags = "2.4"
bitmaps = "3.1"
buddy-alloc = "0.4"
concurrent-queue = "1.2"
Expand Down
5 changes: 3 additions & 2 deletions glommio/src/channels/channel_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
cell::Cell,
fmt::{self, Debug, Formatter},
io::{Error, ErrorKind},
rc::Rc,
};

use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -251,7 +252,7 @@ pub type PartialMesh<T> = MeshBuilder<T, Partial>;
pub struct MeshBuilder<T: Send, A: MeshAdapter> {
nr_peers: usize,
channel_size: usize,
peers: Arc<RwLock<Vec<Peer>>>,
peers: Rc<RwLock<Vec<Peer>>>,
channels: Arc<SharedChannels<T>>,
adapter: A,
}
Expand Down Expand Up @@ -308,7 +309,7 @@ impl<T: 'static + Send, A: MeshAdapter> MeshBuilder<T, A> {
MeshBuilder {
nr_peers,
channel_size,
peers: Arc::new(RwLock::new(Vec::new())),
peers: Rc::new(RwLock::new(Vec::new())),
channels: Arc::new(Self::placeholder(nr_peers)),
adapter,
}
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/channels/shared_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl<T: Send + Sized> ConnectedSender<T> {
res
}

fn wait_for_room(&self, cx: &mut Context<'_>) -> Poll<()> {
fn wait_for_room(&self, cx: &Context<'_>) -> Poll<()> {
match self.state.buffer.free_space() > 0 || self.state.buffer.producer_disconnected() {
true => Poll::Ready(()),
false => {
Expand Down
1 change: 1 addition & 0 deletions glommio/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ impl<T> From<GlommioError<T>> for io::Error {
}

#[cfg(test)]
#[allow(clippy::unnecessary_literal_unwrap)]
mod test {
use std::{io, panic::panic_any};

Expand Down
55 changes: 52 additions & 3 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::{
error::BuilderErrorKind,
executor::stall::StallDetector,
io::DmaBuffer,
parking, reactor, sys,
parking, reactor,
sys::{self, blocking::BlockingThreadPool},
task::{self, waker_fn::dummy_waker},
GlommioError, IoRequirements, IoStats, Latency, Reactor, Shares,
};
Expand Down Expand Up @@ -1140,6 +1141,9 @@ impl LocalExecutor {
cpu_binding: Option<impl IntoIterator<Item = usize>>,
mut config: LocalExecutorConfig,
) -> Result<LocalExecutor> {
let blocking_thread =
BlockingThreadPool::new(config.thread_pool_placement, notifier.clone())?;

// Linux's default memory policy is "local allocation" which allocates memory
// on the NUMA node containing the CPU where the allocation takes place.
// Hence, we bind to a CPU in the provided CPU set before allocating any
Expand All @@ -1165,7 +1169,7 @@ impl LocalExecutor {
config.io_memory,
config.ring_depth,
config.record_io_latencies,
config.thread_pool_placement,
blocking_thread,
)?),
stall_detector: RefCell::new(
config
Expand Down Expand Up @@ -4132,7 +4136,7 @@ mod test {
}

// we created 5 blocking jobs each taking 100ms but our thread pool only has 4
// threads. SWe expect one of those jobs to take twice as long as the others.
// threads. We expect one of those jobs to take twice as long as the others.

let mut ts = join_all(blocking.into_iter()).await;
assert_eq!(ts.len(), 5);
Expand All @@ -4148,6 +4152,51 @@ mod test {
.unwrap();
}

#[test]
fn blocking_function_placement_independent_of_executor_placement() {
let affinity = nix::sched::sched_getaffinity(nix::unistd::Pid::from_raw(0)).unwrap();
let num_cpus_accessible_by_default = (0..nix::sched::CpuSet::count())
.map(|cpu| affinity.is_set(cpu).unwrap() as usize)
.sum::<usize>();
if num_cpus_accessible_by_default < 2 {
eprintln!(
"Insufficient CPUs available to test blocking_function_placement_independent_of_executor_placement (affinity only allows for {})",
num_cpus_accessible_by_default,
);
return;
}

let num_schedulable_cpus = LocalExecutorBuilder::new(Placement::Fixed(0))
.blocking_thread_pool_placement(PoolPlacement::Unbound(2))
.spawn(|| async {
executor()
.spawn_blocking(move || {
let pid = nix::unistd::Pid::from_raw(0);
let affinity =
nix::sched::sched_getaffinity(pid).expect("Failed to get affinity");
(0..nix::sched::CpuSet::count())
.map(|cpu| {
affinity
.is_set(cpu)
.expect("Failed to check if cpu affinity is set")
as usize
})
.sum::<usize>()
})
.await
})
.unwrap()
.join()
.unwrap();

assert!(
num_schedulable_cpus >= num_cpus_accessible_by_default,
"num schedulable {}, num cpus accessible {}",
num_schedulable_cpus,
num_cpus_accessible_by_default,
);
}

#[test]
fn blocking_pool_invalid_placement() {
let ret = LocalExecutorBuilder::new(Placement::Unbound)
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/executor/placement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl PartialEq for CpuSet {

impl FromIterator<CpuLocation> for CpuSet {
fn from_iter<I: IntoIterator<Item = CpuLocation>>(cpus: I) -> Self {
Self(HashSet::<CpuLocation>::from_iter(cpus.into_iter()))
Self(HashSet::<CpuLocation>::from_iter(cpus))
}
}

Expand Down
2 changes: 1 addition & 1 deletion glommio/src/executor/stall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ mod test {

#[test]
fn stall_detector_multiple_signals() {
let signals = vec![
let signals = [
nix::libc::SIGALRM as u8,
nix::libc::SIGUSR1 as u8,
nix::libc::SIGUSR2 as u8,
Expand Down
8 changes: 4 additions & 4 deletions glommio/src/io/buffered_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl StreamWriter {
}
}

fn poll_sync(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll_sync(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
if !self.sync_on_close {
Poll::Ready(Ok(()))
} else {
Expand All @@ -426,7 +426,7 @@ impl StreamWriter {
}
}

fn poll_inner_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll_inner_close(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
match self.source.take() {
None => {
let source = self
Expand All @@ -446,7 +446,7 @@ impl StreamWriter {
}
}

fn do_poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn do_poll_flush(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
match self.source.take() {
None => match self.flush_write_buffer(cx.waker()) {
true => Poll::Pending,
Expand All @@ -456,7 +456,7 @@ impl StreamWriter {
}
}

fn do_poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn do_poll_close(&mut self, cx: &Context<'_>) -> Poll<io::Result<()>> {
while self.file.is_some() {
match self.file_status {
FileStatus::Open => {
Expand Down
6 changes: 3 additions & 3 deletions glommio/src/io/dma_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl DmaStreamReader {
/// [`get_buffer_aligned`]: Self::get_buffer_aligned
pub fn poll_get_buffer_aligned(
&mut self,
cx: &mut Context<'_>,
cx: &Context<'_>,
mut len: u64,
) -> Poll<Result<ReadResult>> {
let (start_id, buffer_len) = {
Expand All @@ -568,7 +568,7 @@ impl DmaStreamReader {

fn poll_get_buffer(
&mut self,
cx: &mut Context<'_>,
cx: &Context<'_>,
len: u64,
buffer_id: u64,
) -> Poll<Result<ReadResult>> {
Expand Down Expand Up @@ -1269,7 +1269,7 @@ impl DmaStreamWriter {
// but leaves the file open. Useful for the immutable file abstraction.
pub(super) fn poll_seal(
&mut self,
cx: &mut Context<'_>,
cx: &Context<'_>,
) -> Poll<io::Result<DmaStreamReaderBuilder>> {
let mut state = self.state.borrow_mut();
match state.file_status {
Expand Down
1 change: 1 addition & 0 deletions glommio/src/iou/cqe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ impl Iterator for CQEsBlocking<'_> {

bitflags::bitflags! {
/// Flags that can be returned from the kernel on [`CQE`]s.
#[derive(Debug, Clone, Copy)]
pub struct CompletionFlags: u32 {
const BUFFER_SHIFT = 1 << 0;
}
Expand Down
3 changes: 2 additions & 1 deletion glommio/src/iou/sqe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<'a> SQE<'a> {
/// Get this event's flags.
#[inline]
pub fn flags(&self) -> SubmissionFlags {
unsafe { SubmissionFlags::from_bits_unchecked(self.sqe.flags as _) }
SubmissionFlags::from_bits_retain(self.sqe.flags as _)
}

/// Overwrite this event's flags.
Expand Down Expand Up @@ -535,6 +535,7 @@ pub struct BufferGroupId {

bitflags::bitflags! {
/// [`SQE`](SQE) configuration flags.
#[derive(Debug, Clone, Copy)]
pub struct SubmissionFlags: u8 {
/// This event's file descriptor is an index into the preregistered set of files.
const FIXED_FILE = 1 << 0; /* use fixed fileset */
Expand Down
6 changes: 3 additions & 3 deletions glommio/src/net/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
pub(crate) async fn send_to(
&self,
buf: &[u8],
mut addr: nix::sys::socket::SockAddr,
addr: nix::sys::socket::SockAddr,
) -> io::Result<usize> {
match self.yolo_sendmsg(buf, &mut addr) {
match self.yolo_sendmsg(buf, &addr) {
Some(res) => res,
None => self.send_to_blocking(buf, addr).await,
}
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
fn yolo_sendmsg(
&self,
buf: &[u8],
addr: &mut nix::sys::socket::SockAddr,
addr: &nix::sys::socket::SockAddr,
) -> Option<io::Result<usize>> {
if self.tx_yolo.get() {
super::yolo_sendmsg(self.socket.as_raw_fd(), buf, addr)
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn yolo_recvmsg(
fn yolo_sendmsg(
fd: RawFd,
buf: &[u8],
addr: &mut nix::sys::socket::SockAddr,
addr: &nix::sys::socket::SockAddr,
) -> Option<io::Result<usize>> {
match sys::sendmsg_syscall(
fd,
Expand Down
Loading

0 comments on commit e3bdf40

Please sign in to comment.