Skip to content

Commit

Permalink
Make StallDetector completely self-contained
Browse files Browse the repository at this point in the history
by moving signal_id from `LocalExecutor`.
  • Loading branch information
davidblewett committed Jan 20, 2022
1 parent effe70e commit 8be1ae0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 56 deletions.
53 changes: 10 additions & 43 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use std::{
rc::Rc,
sync::{Arc, Mutex},
task::{Context, Poll},
thread::{self, Builder, JoinHandle},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -1100,8 +1100,6 @@ pub struct LocalExecutor {
parker: parking::Parker,
id: usize,
reactor: Rc<reactor::Reactor>,

signal_id: Option<signal_hook::SigId>,
stall_detector: Option<StallDetector>,
}

Expand Down Expand Up @@ -1154,10 +1152,9 @@ impl LocalExecutor {
config.record_io_latencies,
config.thread_pool_placement,
)?),
signal_id: None,
stall_detector: None,
};
exec = exec.detect_stalls(config.detect_stalls)?;
exec.detect_stalls(config.detect_stalls)?;

Ok(exec)
}
Expand All @@ -1172,46 +1169,16 @@ impl LocalExecutor {
/// local_ex.detect_stalls(true);
/// ```
pub fn detect_stalls(
mut self,
&mut self,
handler: Option<Box<dyn stall::StallDetectionHandler + 'static>>,
) -> Result<LocalExecutor> {
match handler {
) -> Result<()> {
self.stall_detector = match handler {
Some(handler) => {
if self.signal_id.is_none() {
unsafe {
let signal_num = handler.signal();
let stall_detector = StallDetector::new(self.id, handler)?;
let tx = stall_detector.tx.clone();
let exec_thread = thread::current().id();
self.signal_id = Some(
signal_hook::low_level::register(signal_num.into(), move || {
// Bail if we can't send or if we've gotten a signal
// from an unexpected thread (i.e., a signal targeting the process)
if tx.is_full() || thread::current().id() != exec_thread {
return;
}

backtrace::trace_unsynchronized(|frame| {
tx.try_send(backtrace::BacktraceFrame::from(frame.clone()))
.is_ok()
});
})
.map_err(GlommioError::from)?,
);
self.stall_detector = Some(stall_detector);
}
}
}
None => {
if let Some(signal_id) = self.signal_id.take() {
signal_hook::low_level::unregister(signal_id);
}
if let Some(stall_detector) = self.stall_detector.take() {
stall_detector.disarm().map_err(std::io::Error::from)?;
}
}
}
Ok(self)
Some(StallDetector::new(self.id, handler)?)
},
None => None,
};
Ok(())
}

/// Returns a unique identifier for this Executor.
Expand Down
52 changes: 39 additions & 13 deletions glommio/src/executor/stall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::JoinHandle,
thread::{self, JoinHandle},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -116,10 +116,10 @@ pub(crate) struct StallDetector {
timer_handler: Option<JoinHandle<()>>,
id: usize,
terminated: Arc<AtomicBool>,
signal_id: signal_hook::SigId,
// NOTE: we don't use signal_hook::low_level::channel as backtraces
// have too many elements
pub(crate) tx: crossbeam::channel::Sender<backtrace::BacktraceFrame>,
pub(crate) rx: crossbeam::channel::Receiver<backtrace::BacktraceFrame>,
rx: crossbeam::channel::Receiver<backtrace::BacktraceFrame>,
}

impl StallDetector {
Expand All @@ -134,30 +134,55 @@ impl StallDetector {
)
.map_err(std::io::Error::from)?,
);
let tid = unsafe { nix::libc::pthread_self() };
let terminated = Arc::new(AtomicBool::new(false));
let sig = stall_handler.signal();
let timer_handler = std::thread::spawn(enclose::enclose! { (terminated, timer) move || {
while timer.wait().is_ok() {
if terminated.load(Ordering::Relaxed) {
return
}
unsafe { nix::libc::pthread_kill(tid, sig.into()) };
}
}});
let (tx, rx) = crossbeam::channel::bounded(1 << 10);
let timer_handler = StallDetector::install_trigger(terminated.clone(), timer.clone(), sig.into());
let signal_id = StallDetector::install_handler(tx, sig)?;

Ok(Self {
timer,
timer_handler: Some(timer_handler),
stall_handler,
id: executor_id,
terminated,
tx,
signal_id,
rx,
})
}

fn install_handler(tx: crossbeam::channel::Sender<backtrace::BacktraceFrame>, signal: u8) -> std::io::Result<signal_hook::SigId> {
let exec_thread = thread::current().id();
unsafe {
let signal_id = signal_hook::low_level::register(signal.into(), move || {
// Bail if we can't send or if we've gotten a signal
// from an unexpected thread (i.e., a signal targeting the process)
if tx.is_full() || thread::current().id() != exec_thread {
return;
}

backtrace::trace_unsynchronized(|frame| {
tx.try_send(backtrace::BacktraceFrame::from(frame.clone()))
.is_ok()
});
})?;
//.map_err::<io::Error, GlommioError<()>>(GlommioError::from)?;
Ok(signal_id)
}
}

fn install_trigger(terminated: Arc<AtomicBool>, timer: Arc<sys::timerfd::TimerFd>, signal: i32) -> JoinHandle<()> {
let tid = unsafe { nix::libc::pthread_self() };
std::thread::spawn(enclose::enclose! { (terminated, timer) move || {
while timer.wait().is_ok() {
if terminated.load(Ordering::Relaxed) {
return
}
unsafe { nix::libc::pthread_kill(tid, signal) };
}
}})
}

pub(crate) fn enter_task_queue(
&self,
queue_handle: TaskQueueHandle,
Expand Down Expand Up @@ -193,6 +218,7 @@ impl StallDetector {

impl Drop for StallDetector {
fn drop(&mut self) {
signal_hook::low_level::unregister(self.signal_id);
let timer_handler = self.timer_handler.take().unwrap();
self.terminated.store(true, Ordering::Relaxed);

Expand Down

0 comments on commit 8be1ae0

Please sign in to comment.