Skip to content

Commit

Permalink
Fix Build::compile_objects perf regression and deadlock (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
NobodyXu authored Aug 7, 2023
1 parent 5710ce5 commit 6f43cf3
Showing 1 changed file with 115 additions and 9 deletions.
124 changes: 115 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ impl Build {

#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
use std::sync::{mpsc::channel, Once};
use std::sync::{mpsc, Once};

if objs.len() <= 1 {
for obj in objs {
Expand Down Expand Up @@ -1299,16 +1299,88 @@ impl Build {
// acquire the appropriate tokens, Once all objects have been compiled
// we wait on all the processes and propagate the results of compilation.

let (tx, rx) = channel::<(_, String, KillOnDrop, _)>();
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>();

// Since jobserver::Client::acquire can block, waiting
// must be done in parallel so that acquire won't block forever.
let wait_thread = thread::Builder::new().spawn(move || {
for (cmd, program, mut child, _token) in rx {
wait_on_child(&cmd, &program, &mut child.0)?;
}
let mut error = None;
let mut pendings = Vec::new();
// Buffer the stdout
let mut stdout = io::BufWriter::with_capacity(128, io::stdout());
let mut backoff_cnt = 0;

loop {
let mut has_made_progress = false;

// Reading new pending tasks
loop {
match rx.try_recv() {
Ok(pending) => {
has_made_progress = true;
pendings.push(pending)
}
Err(mpsc::TryRecvError::Disconnected) if pendings.is_empty() => {
let _ = stdout.flush();
return if let Some(err) = error {
Err(err)
} else {
Ok(())
};
}
_ => break,
}
}

Ok(())
// Try waiting on them.
pendings.retain_mut(|(cmd, program, child, _)| {
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
Ok(Some(())) => {
// Task done, remove the entry
has_made_progress = true;
false
}
Ok(None) => true, // Task still not finished, keep the entry
Err(err) => {
// Task fail, remove the entry.
has_made_progress = true;

// Since we can only return one error, log the error to make
// sure users always see all the compilation failures.
let _ = writeln!(stdout, "cargo:warning={}", err);
error = Some(err);

false
}
}
});

if !has_made_progress {
if backoff_cnt > 3 {
// We have yielded at least three times without making'
// any progress, so we will sleep for a while.
let duration =
std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10));
thread::sleep(duration);
} else {
// Given that we spawned a lot of compilation tasks, it is unlikely
// that OS cannot find other ready task to execute.
//
// If all of them are done, then we will yield them and spawn more,
// or simply returns.
//
// Thus this will not be turned into a busy-wait loop and it will not
// waste CPU resource.
thread::yield_now();
}
}

backoff_cnt = if has_made_progress {
0
} else {
backoff_cnt + 1
};
}
})?;

for obj in objs {
Expand All @@ -1317,10 +1389,10 @@ impl Build {

let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;

if tx.send((cmd, program, KillOnDrop(child), token)).is_err() {
break;
}
tx.send((cmd, program, KillOnDrop(child), token))
.expect("Wait thread must be alive until all compilation jobs are done, otherwise we risk deadlock");
}
// Drop tx so that the wait_thread could return
drop(tx);

return wait_thread.join().expect("wait_thread panics");
Expand Down Expand Up @@ -3545,6 +3617,40 @@ fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(),
}
}

#[cfg(feature = "parallel")]
fn try_wait_on_child(
cmd: &Command,
program: &str,
child: &mut Child,
stdout: &mut dyn io::Write,
) -> Result<Option<()>, Error> {
match child.try_wait() {
Ok(Some(status)) => {
let _ = writeln!(stdout, "{}", status);

if status.success() {
Ok(Some(()))
} else {
Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Command {:?} with args {:?} did not execute successfully (status code {}).",
cmd, program, status
),
))
}
}
Ok(None) => Ok(None),
Err(e) => Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
)),
}
}

fn run_inner(cmd: &mut Command, program: &str, pipe_writer: File) -> Result<(), Error> {
let mut child = spawn(cmd, program, pipe_writer)?;
wait_on_child(cmd, program, &mut child)
Expand Down

0 comments on commit 6f43cf3

Please sign in to comment.