diff --git a/Cargo.toml b/Cargo.toml index 542e908..1e5d36e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ license = "MIT OR Apache-2.0" name = "containers-image-proxy" readme = "README.md" repository = "https://github.com/cgwalters/containers-image-proxy" -version = "0.1.1" +version = "0.2.0" [dependencies] anyhow = "1.0" diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 07260a3..1a99856 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -5,7 +5,7 @@ //! More information: use anyhow::{anyhow, Context, Result}; -use futures_util::{Future, FutureExt, TryFutureExt}; +use futures_util::Future; use nix::sys::socket::{self as nixsocket, ControlMessageOwned}; use nix::sys::uio::IoVec; use serde::{Deserialize, Serialize}; @@ -13,7 +13,7 @@ use std::fs::File; use std::os::unix::io::AsRawFd; use std::os::unix::prelude::{FromRawFd, RawFd}; use std::pin::Pin; -use std::process::{ExitStatus, Stdio}; +use std::process::Stdio; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncBufRead, AsyncReadExt}; @@ -65,13 +65,12 @@ struct Reply { value: serde_json::Value, } -type JoinFuture = Pin>>>>; +type ChildFuture = Pin>>>; /// Manage a child process proxy to fetch container images. pub struct ImageProxy { sockfd: Arc>, - stderr: JoinFuture, - procwait: Pin>>>, + childwait: ChildFuture, } impl std::fmt::Debug for ImageProxy { @@ -117,35 +116,15 @@ impl ImageProxy { c.stdin(Stdio::from(theirsock)); let mut c = tokio::process::Command::from(c); c.kill_on_drop(true); - let mut proc = c.spawn().context("Failed to spawn skopeo")?; - - // Safety: We passed `Stdio::piped()` above - let mut child_stderr = proc.stderr.take().unwrap(); - - let stderr = tokio::spawn(async move { - let mut buf = String::new(); - child_stderr.read_to_string(&mut buf).await?; - Ok(buf) - }) - .map_err(anyhow::Error::msg) - .boxed(); - - let mut procwait = Box::pin(async move { proc.wait().map_err(anyhow::Error::msg).await }); + let child = c.spawn().context("Failed to spawn skopeo")?; + let childwait = Box::pin(child.wait_with_output()); let sockfd = Arc::new(Mutex::new(mysock)); + let mut r = Self { sockfd, childwait }; + // Verify semantic version - let protoreq = - Self::impl_request_raw::(Arc::clone(&sockfd), Request::new_bare("Initialize")); - let protover = tokio::select! { - r = protoreq => { - r?.0 - } - r = &mut procwait => { - let errmsg = stderr.await??; - return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r?, errmsg)); - } - }; + let protover = r.impl_request::("Initialize", []).await?.0; let protover = semver::Version::parse(protover.as_str())?; let supported = &*SUPPORTED_PROTO_VERSION; if !supported.matches(&protover) { @@ -156,11 +135,6 @@ impl ImageProxy { )); } - let r = Self { - stderr, - sockfd, - procwait, - }; Ok(r) } @@ -217,7 +191,7 @@ impl ImageProxy { } async fn impl_request( - &self, + &mut self, method: &str, args: T, ) -> Result<(R, Option<(File, u32)>)> @@ -225,11 +199,20 @@ impl ImageProxy { T: IntoIterator, I: Into, { - let req = Request::new(method, args); - Self::impl_request_raw(Arc::clone(&self.sockfd), req).await + let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args)); + tokio::select! { + r = req => { + Ok(r?) + } + r = &mut self.childwait => { + let r = r?; + let stderr = String::from_utf8_lossy(&r.stderr); + return Err(anyhow::anyhow!("proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr)) + } + } } - async fn finish_pipe(&self, pipeid: u32) -> Result<()> { + async fn finish_pipe(&mut self, pipeid: u32) -> Result<()> { let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?; if fd.is_some() { return Err(anyhow!("Unexpected fd in finish_pipe reply")); @@ -237,21 +220,21 @@ impl ImageProxy { Ok(r) } - pub async fn open_image(&self, imgref: &str) -> Result { + pub async fn open_image(&mut self, imgref: &str) -> Result { let (imgid, _) = self .impl_request::("OpenImage", [imgref]) .await?; Ok(OpenedImage(imgid)) } - pub async fn close_image(&self, img: &OpenedImage) -> Result<()> { + pub async fn close_image(&mut self, img: &OpenedImage) -> Result<()> { let (r, _) = self.impl_request("CloseImage", [img.0]).await?; Ok(r) } /// Fetch the manifest. /// https://github.com/opencontainers/image-spec/blob/main/manifest.md - pub async fn fetch_manifest(&self, img: &OpenedImage) -> Result<(String, Vec)> { + pub async fn fetch_manifest(&mut self, img: &OpenedImage) -> Result<(String, Vec)> { let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?; let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?; let mut fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd)); @@ -268,7 +251,7 @@ impl ImageProxy { /// Note that right now the proxy does verification of the digest: /// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009 pub async fn get_blob( - &self, + &mut self, img: &OpenedImage, digest: &str, size: u64, @@ -293,13 +276,10 @@ impl ImageProxy { let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap(); nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?; drop(sendbuf); - let status = self.procwait.await?; - if !status.success() { - if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() { - anyhow::bail!("proxy failed: {}\n{}", status, stderr) - } else { - anyhow::bail!("proxy failed: {} (failed to fetch stderr)", status) - } + let output = self.childwait.await?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("proxy failed: {}\n{}", output.status, stderr) } Ok(()) }