Skip to content

Commit

Permalink
Merge pull request #780 from NobodyXu/optimize
Browse files Browse the repository at this point in the history
Optimize `Build::compile_objects`: Only spawns one thread
  • Loading branch information
joshtriplett authored Jul 10, 2023
2 parents 57853c4 + ff45d42 commit 17c8858
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 118 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ edition = "2018"

[dependencies]
jobserver = { version = "0.1.16", optional = true }
os_pipe = "1"

[features]
parallel = ["jobserver"]
Expand Down
252 changes: 134 additions & 118 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,8 +1218,7 @@ impl Build {
}

#[cfg(feature = "parallel")]
fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
use std::sync::Once;

// Limit our parallelism globally with a jobserver. Start off by
Expand All @@ -1242,56 +1241,28 @@ impl Build {
// Note that this jobserver is cached globally so we only used one per
// process and only worry about creating it once.
//
// * Next we use a raw `thread::spawn` per thread to actually compile
// objects in parallel. We only actually spawn a thread after we've
// acquired a token to perform some work
//
// * Finally though we want to keep the dependencies of this crate
// pretty light, so we avoid using a safe abstraction like `rayon` and
// instead rely on some bits of `unsafe` code. We know that this stack
// frame persists while everything is compiling so we use all the
// stack-allocated objects without cloning/reallocating. We use a
// transmute to `State` with a `'static` lifetime to persist
// everything we need across the boundary, and the join-on-drop
// semantics of `JoinOnDrop` should ensure that our stack frame is
// alive while threads are alive.
// * Next we use spawn the process to actually compile objects in
// parallel after we've acquired a token to perform some work
//
// With all that in mind we compile all objects in a loop here, after we
// acquire the appropriate tokens, Once all objects have been compiled
// we join on all the threads and propagate the results of compilation.
//
// Note that as a slight optimization we try to break out as soon as
// possible as soon as any compilation fails to ensure that errors get
// out to the user as fast as possible.
let error = AtomicBool::new(false);
let mut threads = Vec::new();
for obj in objs {
if error.load(SeqCst) {
break;
}
let token = server.acquire()?;
let state = State {
build: self,
obj,
error: &error,
};
let state = unsafe { std::mem::transmute::<State, State<'static>>(state) };
let thread = thread::spawn(|| {
let state: State<'me> = state; // erase the `'static` lifetime
let result = state.build.compile_object(state.obj);
if result.is_err() {
state.error.store(true, SeqCst);
}
drop(token); // make sure our jobserver token is released after the compile
return result;
});
threads.push(JoinOnDrop(Some(thread)));
}
// we wait on all the processes and propagate the results of compilation.
let print = PrintThread::new()?;

for mut thread in threads {
if let Some(thread) = thread.0.take() {
thread.join().expect("thread should not panic")?;
}
let children = objs
.iter()
.map(|obj| {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = server.acquire()?;

let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;

Ok((cmd, program, KillOnDrop(child), token))
})
.collect::<Result<Vec<_>, Error>>()?;

for (cmd, program, mut child, _token) in children {
wait_on_child(&cmd, &program, &mut child.0)?;
}

// Reacquire our process's token before we proceed, which we released
Expand All @@ -1302,16 +1273,6 @@ impl Build {

return Ok(());

/// Shared state from the parent thread to the child thread. This
/// package of pointers is temporarily transmuted to a `'static`
/// lifetime to cross the thread boundary and then once the thread is
/// running we erase the `'static` to go back to an anonymous lifetime.
struct State<'a> {
build: &'a Build,
obj: &'a Object,
error: &'a AtomicBool,
}

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> &'static jobserver::Client {
Expand Down Expand Up @@ -1357,26 +1318,30 @@ impl Build {
return client;
}

struct JoinOnDrop(Option<thread::JoinHandle<Result<(), Error>>>);
struct KillOnDrop(Child);

impl Drop for JoinOnDrop {
impl Drop for KillOnDrop {
fn drop(&mut self) {
if let Some(thread) = self.0.take() {
drop(thread.join());
}
let child = &mut self.0;

child.kill().ok();
}
}
}

#[cfg(not(feature = "parallel"))]
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
let print = PrintThread::new()?;

for obj in objs {
self.compile_object(obj)?;
let (mut cmd, name) = self.create_compile_object_cmd(obj)?;
run_inner(&mut cmd, &name, print.pipe_writer_cloned()?.unwrap())?;
}

Ok(())
}

fn compile_object(&self, obj: &Object) -> Result<(), Error> {
fn create_compile_object_cmd(&self, obj: &Object) -> Result<(Command, String), Error> {
let asm_ext = AsmFileExt::from_path(&obj.src);
let is_asm = asm_ext.is_some();
let target = self.get_target()?;
Expand Down Expand Up @@ -1425,8 +1390,7 @@ impl Build {
self.fix_env_for_apple_os(&mut cmd)?;
}

run(&mut cmd, &name)?;
Ok(())
Ok((cmd, name))
}

/// This will return a result instead of panicing; see expand() for the complete description.
Expand Down Expand Up @@ -3463,21 +3427,19 @@ impl Tool {
}
}

fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
let (mut child, print) = spawn(cmd, program)?;
fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
let status = match child.wait() {
Ok(s) => s,
Err(_) => {
Err(e) => {
return Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Failed to wait on spawned child process, command {:?} with args {:?}.",
cmd, program
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
));
}
};
print.join().unwrap();
println!("{}", status);

if status.success() {
Expand All @@ -3493,63 +3455,62 @@ fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
}
}

fn run_inner(
cmd: &mut Command,
program: &str,
pipe_writer: os_pipe::PipeWriter,
) -> Result<(), Error> {
let mut child = spawn(cmd, program, pipe_writer)?;
wait_on_child(cmd, program, &mut child)
}

fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
let mut print = PrintThread::new()?;
run_inner(cmd, program, print.pipe_writer().take().unwrap())?;

Ok(())
}

fn run_output(cmd: &mut Command, program: &str) -> Result<Vec<u8>, Error> {
cmd.stdout(Stdio::piped());
let (mut child, print) = spawn(cmd, program)?;

let mut print = PrintThread::new()?;
let mut child = spawn(cmd, program, print.pipe_writer().take().unwrap())?;

let mut stdout = vec![];
child
.stdout
.take()
.unwrap()
.read_to_end(&mut stdout)
.unwrap();
let status = match child.wait() {
Ok(s) => s,
Err(_) => {
return Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Failed to wait on spawned child process, command {:?} with args {:?}.",
cmd, program
),
));
}
};
print.join().unwrap();
println!("{}", status);

if status.success() {
Ok(stdout)
} else {
Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Command {:?} with args {:?} did not execute successfully (status code {}).",
cmd, program, status
),
))
}
wait_on_child(cmd, program, &mut child)?;

Ok(stdout)
}

fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Error> {
println!("running: {:?}", cmd);
fn spawn(
cmd: &mut Command,
program: &str,
pipe_writer: os_pipe::PipeWriter,
) -> Result<Child, Error> {
struct ResetStderr<'cmd>(&'cmd mut Command);

// Capture the standard error coming from these programs, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
match cmd.stderr(Stdio::piped()).spawn() {
Ok(mut child) => {
let stderr = BufReader::new(child.stderr.take().unwrap());
let print = thread::spawn(move || {
for line in stderr.split(b'\n').filter_map(|l| l.ok()) {
print!("cargo:warning=");
std::io::stdout().write_all(&line).unwrap();
println!("");
}
});
Ok((child, print))
impl Drop for ResetStderr<'_> {
fn drop(&mut self) {
// Reset stderr to default to release pipe_writer so that print thread will
// not block forever.
self.0.stderr(Stdio::inherit());
}
}

println!("running: {:?}", cmd);

let cmd = ResetStderr(cmd);

match cmd.0.stderr(pipe_writer).spawn() {
Ok(child) => Ok(child),
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
let extra = if cfg!(windows) {
" (see https://github.com/rust-lang/cc-rs#compile-time-requirements \
Expand All @@ -3562,11 +3523,11 @@ fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Er
&format!("Failed to find tool. Is `{}` installed?{}", program, extra),
))
}
Err(ref e) => Err(Error::new(
Err(e) => Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Command {:?} with args {:?} failed to start: {:?}",
cmd, program, e
cmd.0, program, e
),
)),
}
Expand Down Expand Up @@ -3769,3 +3730,58 @@ impl AsmFileExt {
None
}
}

struct PrintThread {
handle: Option<JoinHandle<()>>,
pipe_writer: Option<os_pipe::PipeWriter>,
}

impl PrintThread {
fn new() -> Result<Self, Error> {
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;

// Capture the standard error coming from compilation, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
let print = thread::spawn(move || {
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
let mut line = String::with_capacity(20);
let mut stdout = io::stdout();

// read_line returns 0 on Eof
while stderr.read_line(&mut line).unwrap() != 0 {
writeln!(&mut stdout, "cargo:warning={}", line).ok();

// read_line does not clear the buffer
line.clear();
}
});

Ok(Self {
handle: Some(print),
pipe_writer: Some(pipe_writer),
})
}

fn pipe_writer(&mut self) -> &mut Option<os_pipe::PipeWriter> {
&mut self.pipe_writer
}

fn pipe_writer_cloned(&self) -> Result<Option<os_pipe::PipeWriter>, Error> {
self.pipe_writer
.as_ref()
.map(os_pipe::PipeWriter::try_clone)
.transpose()
.map_err(From::from)
}
}

impl Drop for PrintThread {
fn drop(&mut self) {
// Drop pipe_writer first to avoid deadlock
self.pipe_writer.take();

self.handle.take().unwrap().join().unwrap();
}
}

0 comments on commit 17c8858

Please sign in to comment.