Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward lines from stderr of child process to stdout on the same thread, instead of spawning a thread #940

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ rust-version = "1.53"
[target.'cfg(unix)'.dependencies]
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
# which is still an issue with `resolver = "1"`.
libc = { version = "0.2.62", default-features = false }
libc = { version = "0.2.62", default-features = false, optional = true }

[features]
parallel = []
parallel = ["libc"]

[dev-dependencies]
tempfile = "3"
Expand Down
3 changes: 1 addition & 2 deletions dev-tools/gen-windows-sys-binding/windows_sys.list
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
Windows.Win32.Foundation.FILETIME
Windows.Win32.Foundation.INVALID_HANDLE_VALUE
Windows.Win32.Foundation.ERROR_NO_MORE_ITEMS
Windows.Win32.Foundation.ERROR_SUCCESS
Windows.Win32.Foundation.SysFreeString
Expand All @@ -20,7 +19,7 @@ Windows.Win32.System.Com.COINIT_MULTITHREADED
Windows.Win32.System.Com.CoCreateInstance
Windows.Win32.System.Com.CoInitializeEx

Windows.Win32.System.Pipes.CreatePipe
Windows.Win32.System.Pipes.PeekNamedPipe

Windows.Win32.System.Registry.RegCloseKey
Windows.Win32.System.Registry.RegEnumKeyExW
Expand Down
253 changes: 163 additions & 90 deletions src/command_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::{
collections::hash_map,
ffi::OsString,
fmt::Display,
fs::{self, File},
fs,
hash::Hasher,
io::{self, BufRead, BufReader, Read, Write},
io::{self, Read, Write},
path::Path,
process::{Child, Command, Stdio},
process::{Child, ChildStderr, Command, Stdio},
sync::Arc,
thread::{self, JoinHandle},
};

use crate::{Error, ErrorKind, Object};
Expand Down Expand Up @@ -41,83 +40,164 @@ impl CargoOutput {
}
}

pub(crate) fn print_thread(&self) -> Result<Option<PrintThread>, Error> {
self.warnings.then(PrintThread::new).transpose()
fn stdio_for_warnings(&self) -> Stdio {
if self.warnings {
Stdio::piped()
} else {
Stdio::null()
}
}
}

pub(crate) struct PrintThread {
handle: Option<JoinHandle<()>>,
pipe_writer: Option<File>,
pub(crate) struct StderrForwarder {
inner: Option<(ChildStderr, Vec<u8>)>,
#[cfg(feature = "parallel")]
is_non_blocking: bool,
#[cfg(feature = "parallel")]
bytes_available_failed: bool,
}

impl PrintThread {
pub(crate) fn new() -> Result<Self, Error> {
let (pipe_reader, pipe_writer) = crate::os_pipe::pipe()?;

// Capture the standard error coming from compilation, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
let print = thread::spawn(move || {
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
let mut line = Vec::with_capacity(20);
let stdout = io::stdout();

// read_until returns 0 on Eof
while stderr.read_until(b'\n', &mut line).unwrap() != 0 {
{
let mut stdout = stdout.lock();

stdout.write_all(b"cargo:warning=").unwrap();
stdout.write_all(&line).unwrap();
stdout.write_all(b"\n").unwrap();
}
const MIN_BUFFER_CAPACITY: usize = 100;

// read_until does not clear the buffer
line.clear();
}
});
impl StderrForwarder {
pub(crate) fn new(child: &mut Child) -> Self {
dpaoliello marked this conversation as resolved.
Show resolved Hide resolved
Self {
inner: child
.stderr
.take()
.map(|stderr| (stderr, Vec::with_capacity(MIN_BUFFER_CAPACITY))),
#[cfg(feature = "parallel")]
is_non_blocking: false,
#[cfg(feature = "parallel")]
bytes_available_failed: false,
}
}

Ok(Self {
handle: Some(print),
pipe_writer: Some(pipe_writer),
})
#[allow(clippy::uninit_vec)]
fn forward_available(&mut self) -> bool {
if let Some((stderr, buffer)) = self.inner.as_mut() {
loop {
let old_data_end = buffer.len();

// For non-blocking we check to see if there is data available, so we should try to
// read at least that much. For blocking, always read at least the minimum amount.
#[cfg(not(feature = "parallel"))]
let to_reserve = MIN_BUFFER_CAPACITY;
#[cfg(feature = "parallel")]
let to_reserve = if self.is_non_blocking && !self.bytes_available_failed {
match crate::parallel::stderr::bytes_available(stderr) {
#[cfg(windows)]
Ok(0) => return false,
#[cfg(unix)]
Ok(0) => {
// On Unix, depending on the implementation, we may sometimes get 0 in a
// loop (either there is data available or the pipe is broken), so
// continue with the non-blocking read anyway.
MIN_BUFFER_CAPACITY
}
#[cfg(windows)]
Err(_) => {
// On Windows, if we get an error then the pipe is broken, so flush
// the buffer and bail.
if !buffer.is_empty() {
write_warning(&buffer[..]);
}
self.inner = None;
return true;
}
#[cfg(unix)]
Err(_) => {
// On Unix, depending on the implementation, we may get spurious
// errors so make a note not to use bytes_available again and try
// the non-blocking read anyway.
self.bytes_available_failed = true;
MIN_BUFFER_CAPACITY
}
Ok(bytes_available) => MIN_BUFFER_CAPACITY.max(bytes_available),
}
} else {
MIN_BUFFER_CAPACITY
};
buffer.reserve(to_reserve);

// SAFETY: 1) the length is set to the capacity, so we are never using memory beyond
// the underlying buffer and 2) we always call `truncate` below to set the len back
// to the intitialized data.
unsafe {
buffer.set_len(buffer.capacity());
}
match stderr.read(&mut buffer[old_data_end..]) {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No data currently, yield back.
buffer.truncate(old_data_end);
return false;
}
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {
// Interrupted, try again.
buffer.truncate(old_data_end);
NobodyXu marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(0) | Err(_) => {
// End of stream: flush remaining data and bail.
if old_data_end > 0 {
write_warning(&buffer[..old_data_end]);
}
self.inner = None;
return true;
dpaoliello marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(bytes_read) => {
buffer.truncate(old_data_end + bytes_read);
let mut consumed = 0;
for line in buffer.split_inclusive(|&b| b == b'\n') {
// Only forward complete lines, leave the rest in the buffer.
if let Some((b'\n', line)) = line.split_last() {
consumed += line.len() + 1;
write_warning(line);
}
}
buffer.drain(..consumed);
}
}
}
} else {
true
}
}

/// # Panics
///
/// Will panic if the pipe writer has already been taken.
pub(crate) fn take_pipe_writer(&mut self) -> File {
self.pipe_writer.take().unwrap()
#[cfg(feature = "parallel")]
pub(crate) fn set_non_blocking(&mut self) -> Result<(), Error> {
assert!(!self.is_non_blocking);

if let Some((stderr, _)) = self.inner.as_mut() {
crate::parallel::stderr::set_non_blocking(stderr)?;
}

self.is_non_blocking = true;
Ok(())
}

/// # Panics
///
/// Will panic if the pipe writer has already been taken.
pub(crate) fn clone_pipe_writer(&self) -> Result<File, Error> {
self.try_clone_pipe_writer().map(Option::unwrap)
#[cfg(feature = "parallel")]
fn forward_all(&mut self) {
while !self.forward_available() {}
}

pub(crate) fn try_clone_pipe_writer(&self) -> Result<Option<File>, Error> {
self.pipe_writer
.as_ref()
.map(File::try_clone)
.transpose()
.map_err(From::from)
#[cfg(not(feature = "parallel"))]
fn forward_all(&mut self) {
let forward_result = self.forward_available();
assert!(forward_result, "Should have consumed all data");
}
}

impl Drop for PrintThread {
fn drop(&mut self) {
// Drop pipe_writer first to avoid deadlock
self.pipe_writer.take();

self.handle.take().unwrap().join().unwrap();
}
fn write_warning(line: &[u8]) {
let stdout = io::stdout();
let mut stdout = stdout.lock();
stdout.write_all(b"cargo:warning=").unwrap();
dpaoliello marked this conversation as resolved.
Show resolved Hide resolved
stdout.write_all(line).unwrap();
stdout.write_all(b"\n").unwrap();
}

fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
StderrForwarder::new(child).forward_all();

let status = match child.wait() {
Ok(s) => s,
Err(e) => {
Expand Down Expand Up @@ -193,20 +273,13 @@ pub(crate) fn objects_from_files(files: &[Arc<Path>], dst: &Path) -> Result<Vec<
Ok(objects)
}

fn run_inner(cmd: &mut Command, program: &str, pipe_writer: Option<File>) -> Result<(), Error> {
let mut child = spawn(cmd, program, pipe_writer)?;
wait_on_child(cmd, program, &mut child)
}

pub(crate) fn run(
cmd: &mut Command,
program: &str,
print: Option<&PrintThread>,
cargo_output: &CargoOutput,
) -> Result<(), Error> {
let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?;
run_inner(cmd, program, pipe_writer)?;

Ok(())
let mut child = spawn(cmd, program, cargo_output)?;
wait_on_child(cmd, program, &mut child)
}

pub(crate) fn run_output(
Expand All @@ -216,12 +289,7 @@ pub(crate) fn run_output(
) -> Result<Vec<u8>, Error> {
cmd.stdout(Stdio::piped());

let mut print = cargo_output.print_thread()?;
let mut child = spawn(
cmd,
program,
print.as_mut().map(PrintThread::take_pipe_writer),
)?;
let mut child = spawn(cmd, program, cargo_output)?;

let mut stdout = vec![];
child
Expand All @@ -239,7 +307,7 @@ pub(crate) fn run_output(
pub(crate) fn spawn(
cmd: &mut Command,
program: &str,
pipe_writer: Option<File>,
cargo_output: &CargoOutput,
) -> Result<Child, Error> {
struct ResetStderr<'cmd>(&'cmd mut Command);

Expand All @@ -254,10 +322,7 @@ pub(crate) fn spawn(
println!("running: {:?}", cmd);

let cmd = ResetStderr(cmd);
let child = cmd
.0
.stderr(pipe_writer.map_or_else(Stdio::null, Stdio::from))
.spawn();
let child = cmd.0.stderr(cargo_output.stdio_for_warnings()).spawn();
match child {
Ok(child) => Ok(child),
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
Expand Down Expand Up @@ -307,9 +372,14 @@ pub(crate) fn try_wait_on_child(
program: &str,
child: &mut Child,
stdout: &mut dyn io::Write,
stderr_forwarder: &mut StderrForwarder,
) -> Result<Option<()>, Error> {
stderr_forwarder.forward_available();

match child.try_wait() {
Ok(Some(status)) => {
stderr_forwarder.forward_all();

let _ = writeln!(stdout, "{}", status);

if status.success() {
Expand All @@ -325,12 +395,15 @@ pub(crate) fn try_wait_on_child(
}
}
Ok(None) => Ok(None),
Err(e) => Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
)),
Err(e) => {
stderr_forwarder.forward_all();
Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
))
}
}
}
Loading