Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a stall detector that logs stacktraces of unyielding tasks, redux #499

Merged
merged 26 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9605100
measure and log unyielding task queues
Apr 19, 2021
f60f577
use the preempt timer duration as the stall detector threshold
HippoBaro Sep 19, 2021
849ab2f
gate the stall detector behind a feature flag
HippoBaro Sep 19, 2021
75681e7
asynchronously record stack traces when a task queue goes over budget
HippoBaro Sep 22, 2021
77662c4
Remove ordering requirement in drop, per PR feedback.
davidblewett Jan 11, 2022
9ae41f7
Make stall detection configurable per executor.
davidblewett Jan 12, 2022
77e005c
Add trait-based handler, to allow for customizing signal
davidblewett Jan 13, 2022
186a4d1
Move logging to default handler.
davidblewett Jan 13, 2022
647ce55
Update docs.
davidblewett Jan 13, 2022
aee9682
PR feedback
davidblewett Jan 14, 2022
c24ad19
Merge branch 'master' into stall_detector
davidblewett Jan 14, 2022
99237c1
PR feedback
davidblewett Jan 14, 2022
af66c8d
Refactor stall detector tests and add coverage
davidblewett Jan 19, 2022
8a1ca54
Explicitly check that we're on the same thread we started on.
davidblewett Jan 20, 2022
5c7a31b
Add test for multiple signal numbers on different executors.
davidblewett Jan 20, 2022
b479ab5
Merge branch 'master' into stall_detector
davidblewett Jan 20, 2022
0721f20
PR feedback fixes.
davidblewett Jan 20, 2022
0477a0b
Make `StallDetector` completely self-contained
davidblewett Jan 20, 2022
bdaf8ee
Doctest fixes.
davidblewett Jan 20, 2022
04fddd6
Rename `DefaultStallDetectionHandler` -> `LoggingStallDetectionHandler`
davidblewett Jan 20, 2022
b06a369
PR feedback
davidblewett Jan 21, 2022
79bfca1
Simplify Debug output.
davidblewett Jan 21, 2022
dec6adb
Switch back to `DefaultStallDetectionHandler`, with no tunables.
davidblewett Jan 24, 2022
515781b
Update `OpenOptions::append` to return `&mut Self`
r3v2d0g Jan 21, 2022
65f305d
Merge remote-tracking branch 'upstream' into stall_detector
davidblewett Jan 25, 2022
9be026d
Don't export `LocalExecutor::detect_stalls` for now;
davidblewett Jan 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions glommio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ readme = "../README.md"

[dependencies]
ahash = "0.7"
backtrace = { version = "~0.3.58" }
bitflags = "1.3"
bitmaps = "3.1"
buddy-alloc = "0.4"
Expand All @@ -35,6 +36,7 @@ pin-project-lite = "0.2"
rlimit = "0.6"
scoped-tls = "1.0"
scopeguard = "1.1"
signal-hook = { version = "0.3" }
sketches-ddsketch = "0.1"
smallvec = { version = "1.7", features = ["union"] }
socket2 = { version = "0.3", features = ["unix", "reuseport"] }
Expand Down
181 changes: 160 additions & 21 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
mod latch;
mod multitask;
mod placement;
pub mod stall;

use latch::{Latch, LatchState};
pub use placement::{CpuSet, Placement, PoolPlacement};
Expand All @@ -43,11 +44,12 @@ use tracing::trace;
use std::{
cell::RefCell,
collections::{hash_map::Entry, BinaryHeap},
fmt,
future::Future,
io,
marker::PhantomData,
mem::MaybeUninit,
ops::DerefMut,
ops::{Deref, DerefMut},
pin::Pin,
rc::Rc,
sync::{Arc, Mutex},
Expand All @@ -59,8 +61,13 @@ use std::{
use futures_lite::pin;
use scoped_tls::scoped_thread_local;

use log::warn;

#[cfg(doc)]
use crate::executor::stall::LoggingStallDetectionHandler;
use crate::{
error::BuilderErrorKind,
executor::stall::StallDetector,
io::DmaBuffer,
parking,
reactor,
Expand Down Expand Up @@ -123,7 +130,7 @@ pub(crate) struct TaskQueue {
shares: Shares,
vruntime: u64,
io_requirements: IoRequirements,
_name: String,
name: String,
last_adjustment: Instant,
// for dynamic shares classes
yielded: bool,
Expand Down Expand Up @@ -168,7 +175,7 @@ impl TaskQueue {
shares,
vruntime: 0,
io_requirements: ioreq,
_name: name.into(),
name: name.into(),
last_adjustment: Instant::now(),
yielded: false,
}))
Expand Down Expand Up @@ -468,6 +475,10 @@ pub struct LocalExecutorBuilder {
/// Defaults to one thread using the same placement strategy as the host
/// executor
blocking_thread_pool_placement: PoolPlacement,
/// Whether to detect stalls in unyielding tasks.
/// [`LoggingStallDetectionHandler`] installs a signal handler for
/// [`nix::libc::SIGUSR1`], so is disabled by default.
detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
}

impl LocalExecutorBuilder {
Expand All @@ -486,6 +497,7 @@ impl LocalExecutorBuilder {
preempt_timer_duration: DEFAULT_PREEMPT_TIMER,
record_io_latencies: false,
blocking_thread_pool_placement: PoolPlacement::from(placement),
detect_stalls: None,
}
}

Expand Down Expand Up @@ -566,6 +578,28 @@ impl LocalExecutorBuilder {
self
}

/// Whether to detect stalls in unyielding tasks.
/// [`LoggingStallDetectionHandler`] installs a signal handler for
/// [`nix::libc::SIGUSR1`], so is disabled by default.
/// # Examples
///
/// ```
/// use glommio::{LocalExecutorBuilder, LoggingStallDetectionHandler};
///
/// let local_ex = LocalExecutorBuilder::default()
/// .detect_stalls(Some(Box::new(LoggingStallDetectionHandler::default())))
/// .make()
/// .unwrap();
/// ```
#[must_use = "The builder must be built to be useful"]
pub fn detect_stalls(
mut self,
handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
) -> Self {
self.detect_stalls = handler;
self
}

/// Make a new [`LocalExecutor`] by taking ownership of the Builder, and
/// returns a [`Result`](crate::Result) to the executor.
/// # Examples
Expand All @@ -588,6 +622,7 @@ impl LocalExecutorBuilder {
record_io_latencies: self.record_io_latencies,
spin_before_park: self.spin_before_park,
thread_pool_placement: self.blocking_thread_pool_placement,
detect_stalls: self.detect_stalls,
},
)?;
le.init();
Expand Down Expand Up @@ -651,6 +686,7 @@ impl LocalExecutorBuilder {
let ring_depth = self.ring_depth;
let preempt_timer_duration = self.preempt_timer_duration;
let spin_before_park = self.spin_before_park;
let detect_stalls = self.detect_stalls;
let record_io_latencies = self.record_io_latencies;
let blocking_thread_pool_placement = self.blocking_thread_pool_placement;

Expand All @@ -667,6 +703,7 @@ impl LocalExecutorBuilder {
record_io_latencies,
spin_before_park,
thread_pool_placement: blocking_thread_pool_placement,
detect_stalls,
},
)?;
le.init();
Expand Down Expand Up @@ -705,7 +742,6 @@ impl Default for LocalExecutorBuilder {
///
/// handles.join_all();
/// ```
#[derive(Debug)]
pub struct LocalExecutorPoolBuilder {
/// Spin for duration before parking a reactor
spin_before_park: Option<Duration>,
Expand All @@ -732,6 +768,32 @@ pub struct LocalExecutorPoolBuilder {
/// its own pool. Defaults to 1 thread per pool, bound using the same
/// placement strategy as its host executor
blocking_thread_pool_placement: PoolPlacement,
/// Factory function to generate the stall detection handler.
/// [`LoggingStallDetectionHandler installs`] a signal handler for
/// [`nix::libc::SIGUSR1`], so is disabled by default.
handler_gen: Option<Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>>>,
}

impl fmt::Debug for LocalExecutorPoolBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let gen = self
.handler_gen
.as_ref()
.map(|_| "Some(Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>)");
write!(
f,
"spin_before_park:{:?}, name:{}, io_memory:{}, ring_depth:{}, \
preempt_timer_duration:{:?}, placement:{:?}, record_io_latencies:{}, handler_gen:{:?}",
self.spin_before_park,
self.name,
self.io_memory,
self.ring_depth,
self.preempt_timer_duration,
self.placement,
self.record_io_latencies,
gen
davidblewett marked this conversation as resolved.
Show resolved Hide resolved
)
}
}

impl LocalExecutorPoolBuilder {
Expand All @@ -750,6 +812,7 @@ impl LocalExecutorPoolBuilder {
placement: placement.clone(),
record_io_latencies: false,
blocking_thread_pool_placement: placement.shrink_to(1),
handler_gen: None,
}
}

Expand Down Expand Up @@ -815,6 +878,41 @@ impl LocalExecutorPoolBuilder {
self
}

/// Whether to detect stalls in unyielding tasks.
/// This method takes a closure of `handler_gen`, which will be called on
/// each new thread to generate the stall detection handler to be used in
/// that executor. [`LoggingStallDetectionHandler`] installs a signal
/// handler for [`nix::libc::SIGUSR1`], so is disabled by default.
/// # Examples
///
/// ```
/// use glommio::{
/// timer::Timer,
/// LocalExecutorPoolBuilder,
/// LoggingStallDetectionHandler,
/// PoolPlacement,
/// };
///
/// let local_ex = LocalExecutorPoolBuilder::new(PoolPlacement::Unbound(4))
/// .detect_stalls(Some(Box::new(|| {
/// Box::new(LoggingStallDetectionHandler::default())
/// })))
/// .on_all_shards(move || async {
/// Timer::new(std::time::Duration::from_millis(100)).await;
/// println!("Hello world!");
/// })
/// .expect("failed to spawn local executors")
/// .join_all();
/// ```
#[must_use = "The builder must be built to be useful"]
pub fn detect_stalls(
mut self,
handler_gen: Option<Box<dyn Fn() -> Box<dyn stall::StallDetectionHandler + 'static>>>,
) -> Self {
self.handler_gen = handler_gen;
self
}

/// Spawn a pool of [`LocalExecutor`]s in a new thread according to the
/// [`PoolPlacement`] policy, which is `Unbound` by default.
///
Expand Down Expand Up @@ -878,6 +976,7 @@ impl LocalExecutorPoolBuilder {
let spin_before_park = self.spin_before_park;
let record_io_latencies = self.record_io_latencies;
let blocking_thread_pool_placement = self.blocking_thread_pool_placement.clone();
let detect_stalls = self.handler_gen.as_ref().map(|x| (*x.deref())());
let latch = Latch::clone(latch);

move || {
Expand All @@ -894,6 +993,7 @@ impl LocalExecutorPoolBuilder {
record_io_latencies,
spin_before_park,
thread_pool_placement: blocking_thread_pool_placement,
detect_stalls,
},
)?;
le.init();
Expand Down Expand Up @@ -977,6 +1077,7 @@ pub struct LocalExecutorConfig {
pub record_io_latencies: bool,
pub spin_before_park: Option<Duration>,
pub thread_pool_placement: PoolPlacement,
pub detect_stalls: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
}

/// Single-threaded executor.
Expand Down Expand Up @@ -1009,6 +1110,7 @@ pub struct LocalExecutor {
parker: parking::Parker,
id: usize,
reactor: Rc<reactor::Reactor>,
stall_detector: Option<StallDetector>,
}

impl LocalExecutor {
Expand Down Expand Up @@ -1049,7 +1151,7 @@ impl LocalExecutor {
let p = parking::Parker::new();
let queues = ExecutorQueues::new(config.preempt_timer, config.spin_before_park);
trace!(id = notifier.id(), "Creating executor");
Ok(LocalExecutor {
let mut exec = LocalExecutor {
queues: Rc::new(RefCell::new(queues)),
parker: p,
id: notifier.id(),
Expand All @@ -1060,7 +1162,31 @@ impl LocalExecutor {
config.record_io_latencies,
config.thread_pool_placement,
)?),
})
stall_detector: None,
};
exec.detect_stalls(config.detect_stalls)?;

Ok(exec)
}

/// Enable or disable task stall detection at runtime
///
/// # Examples
/// ```
/// use glommio::{LocalExecutor, LoggingStallDetectionHandler};
///
/// let local_ex = LocalExecutor::default()
/// .detect_stalls(Some(Box::new(LoggingStallDetectionHandler::default())));
/// ```
pub fn detect_stalls(
&mut self,
handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
) -> Result<()> {
self.stall_detector = match handler {
Some(handler) => Some(StallDetector::new(self.id, handler)?),
None => None,
};
Ok(())
davidblewett marked this conversation as resolved.
Show resolved Hide resolved
}

/// Returns a unique identifier for this Executor.
Expand Down Expand Up @@ -1217,23 +1343,36 @@ impl LocalExecutor {
now
};

let mut tasks_executed_this_loop = 0;
loop {
let mut queue_ref = queue.borrow_mut();
if self.need_preempt() || queue_ref.yielded() {
break;
}
let (runtime, tasks_executed_this_loop) = {
let guard = self.stall_detector.as_ref().map(|x| {
let queue = queue.borrow_mut();
x.enter_task_queue(
queue.stats.index,
queue.name.clone(),
time,
self.preempt_timer_duration(),
)
});

let mut tasks_executed_this_loop = 0;
loop {
let mut queue_ref = queue.borrow_mut();
if self.need_preempt() || queue_ref.yielded() {
break;
}

if let Some(r) = queue_ref.get_task() {
drop(queue_ref);
r.run();
tasks_executed_this_loop += 1;
} else {
break;
if let Some(r) = queue_ref.get_task() {
drop(queue_ref);
r.run();
tasks_executed_this_loop += 1;
} else {
break;
}
}
}

let runtime = time.elapsed();
let elapsed = time.elapsed();
drop(guard);
(elapsed, tasks_executed_this_loop)
};

let (need_repush, vruntime) = {
let mut state = queue.borrow_mut();
Expand Down
Loading