Skip to content

Commit

Permalink
Remove dependency on pipe, unless parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
dpaoliello committed Feb 4, 2024
1 parent 2b52daf commit 68ed37a
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 321 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ rust-version = "1.53"
[target.'cfg(unix)'.dependencies]
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
# which is still an issue with `resolver = "1"`.
libc = { version = "0.2.62", default-features = false }
libc = { version = "0.2.62", default-features = false, optional = true }

[features]
parallel = []
parallel = ["libc"]

[dev-dependencies]
tempfile = "3"
Expand Down
3 changes: 1 addition & 2 deletions dev-tools/gen-windows-sys-binding/windows_sys.list
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
Windows.Win32.Foundation.FILETIME
Windows.Win32.Foundation.INVALID_HANDLE_VALUE
Windows.Win32.Foundation.ERROR_NO_MORE_ITEMS
Windows.Win32.Foundation.ERROR_SUCCESS
Windows.Win32.Foundation.SysFreeString
Expand All @@ -20,7 +19,7 @@ Windows.Win32.System.Com.COINIT_MULTITHREADED
Windows.Win32.System.Com.CoCreateInstance
Windows.Win32.System.Com.CoInitializeEx

Windows.Win32.System.Pipes.CreatePipe
Windows.Win32.System.Pipes.PeekNamedPipe

Windows.Win32.System.Registry.RegCloseKey
Windows.Win32.System.Registry.RegEnumKeyExW
Expand Down
246 changes: 156 additions & 90 deletions src/command_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::{
collections::hash_map,
ffi::OsString,
fmt::Display,
fs::{self, File},
fs,
hash::Hasher,
io::{self, BufRead, BufReader, Read, Write},
io::{self, Read, Write},
path::Path,
process::{Child, Command, Stdio},
process::{Child, ChildStderr, Command, Stdio},
sync::Arc,
thread::{self, JoinHandle},
};

use crate::{Error, ErrorKind, Object};
Expand Down Expand Up @@ -41,83 +40,157 @@ impl CargoOutput {
}
}

pub(crate) fn print_thread(&self) -> Result<Option<PrintThread>, Error> {
self.warnings.then(PrintThread::new).transpose()
fn stdio_for_warnings(&self) -> Stdio {
if self.warnings {
Stdio::piped()
} else {
Stdio::null()
}
}
}

pub(crate) struct PrintThread {
handle: Option<JoinHandle<()>>,
pipe_writer: Option<File>,
pub(crate) struct StderrForwarder {
inner: Option<(ChildStderr, Vec<u8>)>,
#[cfg(feature = "parallel")]
is_non_blocking: bool,
#[cfg(feature = "parallel")]
bytes_available_failed: bool,
}

impl PrintThread {
pub(crate) fn new() -> Result<Self, Error> {
let (pipe_reader, pipe_writer) = crate::os_pipe::pipe()?;

// Capture the standard error coming from compilation, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
let print = thread::spawn(move || {
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
let mut line = Vec::with_capacity(20);
let stdout = io::stdout();

// read_until returns 0 on Eof
while stderr.read_until(b'\n', &mut line).unwrap() != 0 {
{
let mut stdout = stdout.lock();

stdout.write_all(b"cargo:warning=").unwrap();
stdout.write_all(&line).unwrap();
stdout.write_all(b"\n").unwrap();
}
const MIN_BUFFER_CAPACITY: usize = 100;

// read_until does not clear the buffer
line.clear();
}
});
impl StderrForwarder {
pub(crate) fn new(child: &mut Child) -> Self {
Self {
inner: child
.stderr
.take()
.map(|stderr| (stderr, Vec::with_capacity(MIN_BUFFER_CAPACITY))),
#[cfg(feature = "parallel")]
is_non_blocking: false,
#[cfg(feature = "parallel")]
bytes_available_failed: false,
}
}

Ok(Self {
handle: Some(print),
pipe_writer: Some(pipe_writer),
})
#[allow(clippy::uninit_vec)]
fn forward_available(&mut self) -> bool {
if let Some((stderr, buffer)) = self.inner.as_mut() {
loop {
let old_data_end = buffer.len();

// For non-blocking we check to see if there is data available, so we should try to
// read at least that much. For blocking, always read at least the minimum amount.
#[cfg(not(feature = "parallel"))]
let to_reserve = MIN_BUFFER_CAPACITY;
#[cfg(feature = "parallel")]
let to_reserve = if self.is_non_blocking && !self.bytes_available_failed {
match crate::parallel::stderr::bytes_available(stderr) {
#[cfg(windows)]
Ok(0) => return false,
#[cfg(windows)]
Err(_) => {
// On Windows, if we get an error then the pipe is broken, so flush
// the buffer and bail.
if !buffer.is_empty() {
write_warning(&buffer[..]);
}
self.inner = None;
return true;
}
#[cfg(unix)]
Err(_) | Ok(0) => {
// On Unix, depending on the implementation, we may get spurious
// errors so make a note not to use bytes_available again and try
// the non-blocking read anyway.
self.bytes_available_failed = true;
MIN_BUFFER_CAPACITY
}
Ok(bytes_available) => MIN_BUFFER_CAPACITY.max(bytes_available),
}
} else {
MIN_BUFFER_CAPACITY
};
buffer.reserve(to_reserve);

// SAFETY: 1) the length is set to the capacity, so we are never using memory beyond
// the underlying buffer and 2) we always call `truncate` below to set the len back
// to the intitialized data.
unsafe {
buffer.set_len(buffer.capacity());
}
match stderr.read(&mut buffer[old_data_end..]) {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No data currently, yield back.
buffer.truncate(old_data_end);
return false;
}
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {
// Interrupted, try again.
buffer.truncate(old_data_end);
}
Ok(0) | Err(_) => {
// End of stream: flush remaining data and bail.
if old_data_end > 0 {
write_warning(&buffer[..old_data_end]);
}
self.inner = None;
return true;
}
Ok(bytes_read) => {
buffer.truncate(old_data_end + bytes_read);
let mut consumed = 0;
for line in buffer.split_inclusive(|&b| b == b'\n') {
// Only forward complete lines, leave the rest in the buffer.
if let Some((b'\n', line)) = line.split_last() {
consumed += line.len() + 1;
write_warning(line);
}
}
buffer.drain(..consumed);
}
}
}
} else {
true
}
}

/// # Panics
///
/// Will panic if the pipe writer has already been taken.
pub(crate) fn take_pipe_writer(&mut self) -> File {
self.pipe_writer.take().unwrap()
#[cfg(feature = "parallel")]
pub(crate) fn set_non_blocking(&mut self) -> Result<(), Error> {
assert!(!self.is_non_blocking);

if let Some((stderr, _)) = self.inner.as_mut() {
crate::parallel::stderr::set_non_blocking(stderr)?;
}

self.is_non_blocking = true;
Ok(())
}

/// # Panics
///
/// Will panic if the pipe writer has already been taken.
pub(crate) fn clone_pipe_writer(&self) -> Result<File, Error> {
self.try_clone_pipe_writer().map(Option::unwrap)
#[cfg(feature = "parallel")]
fn forward_all(&mut self) {
while !self.forward_available() {}
}

pub(crate) fn try_clone_pipe_writer(&self) -> Result<Option<File>, Error> {
self.pipe_writer
.as_ref()
.map(File::try_clone)
.transpose()
.map_err(From::from)
#[cfg(not(feature = "parallel"))]
fn forward_all(&mut self) {
let forward_result = self.forward_available();
assert!(forward_result, "Should have consumed all data");
}
}

impl Drop for PrintThread {
fn drop(&mut self) {
// Drop pipe_writer first to avoid deadlock
self.pipe_writer.take();

self.handle.take().unwrap().join().unwrap();
}
fn write_warning(line: &[u8]) {
let stdout = io::stdout();
let mut stdout = stdout.lock();
stdout.write_all(b"cargo:warning=").unwrap();
stdout.write_all(line).unwrap();
stdout.write_all(b"\n").unwrap();
}

fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
StderrForwarder::new(child).forward_all();

let status = match child.wait() {
Ok(s) => s,
Err(e) => {
Expand Down Expand Up @@ -193,20 +266,13 @@ pub(crate) fn objects_from_files(files: &[Arc<Path>], dst: &Path) -> Result<Vec<
Ok(objects)
}

fn run_inner(cmd: &mut Command, program: &str, pipe_writer: Option<File>) -> Result<(), Error> {
let mut child = spawn(cmd, program, pipe_writer)?;
wait_on_child(cmd, program, &mut child)
}

pub(crate) fn run(
cmd: &mut Command,
program: &str,
print: Option<&PrintThread>,
cargo_output: &CargoOutput,
) -> Result<(), Error> {
let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?;
run_inner(cmd, program, pipe_writer)?;

Ok(())
let mut child = spawn(cmd, program, cargo_output)?;
wait_on_child(cmd, program, &mut child)
}

pub(crate) fn run_output(
Expand All @@ -216,12 +282,7 @@ pub(crate) fn run_output(
) -> Result<Vec<u8>, Error> {
cmd.stdout(Stdio::piped());

let mut print = cargo_output.print_thread()?;
let mut child = spawn(
cmd,
program,
print.as_mut().map(PrintThread::take_pipe_writer),
)?;
let mut child = spawn(cmd, program, cargo_output)?;

let mut stdout = vec![];
child
Expand All @@ -239,7 +300,7 @@ pub(crate) fn run_output(
pub(crate) fn spawn(
cmd: &mut Command,
program: &str,
pipe_writer: Option<File>,
cargo_output: &CargoOutput,
) -> Result<Child, Error> {
struct ResetStderr<'cmd>(&'cmd mut Command);

Expand All @@ -254,10 +315,7 @@ pub(crate) fn spawn(
println!("running: {:?}", cmd);

let cmd = ResetStderr(cmd);
let child = cmd
.0
.stderr(pipe_writer.map_or_else(Stdio::null, Stdio::from))
.spawn();
let child = cmd.0.stderr(cargo_output.stdio_for_warnings()).spawn();
match child {
Ok(child) => Ok(child),
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
Expand Down Expand Up @@ -307,9 +365,14 @@ pub(crate) fn try_wait_on_child(
program: &str,
child: &mut Child,
stdout: &mut dyn io::Write,
stderr_forwarder: &mut StderrForwarder,
) -> Result<Option<()>, Error> {
stderr_forwarder.forward_available();

match child.try_wait() {
Ok(Some(status)) => {
stderr_forwarder.forward_all();

let _ = writeln!(stdout, "{}", status);

if status.success() {
Expand All @@ -325,12 +388,15 @@ pub(crate) fn try_wait_on_child(
}
}
Ok(None) => Ok(None),
Err(e) => Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
)),
Err(e) => {
stderr_forwarder.forward_all();
Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
))
}
}
}
Loading

0 comments on commit 68ed37a

Please sign in to comment.