From eadadb7f339fa45a619e572ffd044ce5c91f7fe6 Mon Sep 17 00:00:00 2001 From: Colin Rofls Date: Thu, 13 Jul 2023 11:45:58 -0400 Subject: [PATCH] Halt runloop when jobs panic This involved quite a bit of fiddling. I couldn't get rayon's built-in panic_handler to work, so I ended up rolling my own solution, where we manually catch unwinds that occur in spawned jobs. When a panic occurs, we will set an atomic flag and return an error. We check this flag before beginning any scheduled job, and return immediately if it is set. --- fontc/src/main.rs | 5 ++-- fontc/src/work.rs | 2 ++ fontc/src/workload.rs | 64 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/fontc/src/main.rs b/fontc/src/main.rs index 36f429a2..9f88be8b 100644 --- a/fontc/src/main.rs +++ b/fontc/src/main.rs @@ -12,8 +12,9 @@ fn main() -> Result<(), Error> { let ts = buf.timestamp_micros(); writeln!( buf, - "[{ts} {:?} {} {}] {}", - std::thread::current().id(), + "[{ts} {} {} {}] {}", + // we manually assign all threads a name + std::thread::current().name().unwrap_or("unknown"), record.target(), buf.default_level_style(record.level()) .value(record.level()), diff --git a/fontc/src/work.rs b/fontc/src/work.rs index 0b55e902..387ca0c6 100644 --- a/fontc/src/work.rs +++ b/fontc/src/work.rs @@ -18,6 +18,7 @@ use fontir::{ pub enum AnyWorkError { Fe(FeError), Be(BeError), + Panic(String), } impl From for AnyWorkError { @@ -37,6 +38,7 @@ impl Display for AnyWorkError { match self { AnyWorkError::Be(e) => e.fmt(f), AnyWorkError::Fe(e) => e.fmt(f), + AnyWorkError::Panic(e) => write!(f, "Job panicked: '{e}'"), } } } diff --git a/fontc/src/workload.rs b/fontc/src/workload.rs index e44b7cdf..f5055687 100644 --- a/fontc/src/workload.rs +++ b/fontc/src/workload.rs @@ -1,6 +1,14 @@ //! Tracking jobs to run -use std::collections::{HashMap, HashSet}; +use rayon::ThreadPoolBuilder; +use std::{ + collections::{HashMap, HashSet}, + panic::AssertUnwindSafe, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; use crossbeam_channel::{Receiver, TryRecvError}; use fontbe::orchestration::{AnyWorkId, Context as BeContext}; @@ -191,7 +199,15 @@ impl<'a> Workload<'a> { // Async work will send us it's ID on completion let (send, recv) = crossbeam_channel::unbounded::<(AnyWorkId, Result<(), AnyWorkError>)>(); - rayon::in_place_scope(|scope| { + // a flag we set if we panic + let abort_queued_jobs = Arc::new(AtomicBool::new(false)); + // build a custom threadpool. we use this to ensure all threads are named + let threadpool = ThreadPoolBuilder::new() + .thread_name(|n| format!("tp{n}")) + .build() + .expect("failed to init thread pool"); + + threadpool.in_place_scope(|scope| { // Whenever a task completes see if it was the last incomplete dependency of other task(s) // and spawn them if it was // TODO timeout and die it if takes too long to make forward progress or we're spinning w/o progress @@ -228,8 +244,38 @@ impl<'a> Workload<'a> { job.write_access.clone(), ); + let abort = abort_queued_jobs.clone(); + scope.spawn(move |_| { - let result = work.exec(work_context); + if abort.load(Ordering::Relaxed) { + log::trace!("Aborting {:?}", work.id()); + return; + } + // # Unwind Safety + // + // 'unwind safety' does not impact memory safety, but + // it may impact program correctness; the thread may have + // left shared memory in an inconsistent state. + // + // I believe this is not a concern for us, as we cancel any + // pending jobs after seeing a panic and jobs that depend + // on state produced by the panicking job must be scheduled + // after it. Unless we have jobs that are mutating + // shared resources then I think this is fine. + // + // references: + // + // + let result = match std::panic::catch_unwind(AssertUnwindSafe(|| { + work.exec(work_context) + })) { + Ok(result) => result, + Err(err) => { + let msg = get_panic_message(err); + abort.store(true, Ordering::Relaxed); + Err(AnyWorkError::Panic(msg)) + } + }; if let Err(e) = send.send((id.clone(), result)) { log::error!("Unable to write {id:?} to completion channel: {e}"); } @@ -387,3 +433,15 @@ impl<'a> Workload<'a> { self.success.difference(&pre_success).cloned().collect() } } + +// taken from std: +// +fn get_panic_message(msg: Box) -> String { + match msg.downcast_ref::<&'static str>() { + Some(s) => s.to_string(), + None => match msg.downcast_ref::() { + Some(s) => s.to_owned(), + None => "Box".to_owned(), + }, + } +}