Skip to content

Commit

Permalink
Merge pull request #3 from cgwalters/request-timeout
Browse files Browse the repository at this point in the history
Fixes for child process monitoring
  • Loading branch information
cgwalters authored Nov 5, 2021
2 parents 70b6f59 + 8593d1a commit 5b74e02
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ license = "MIT OR Apache-2.0"
name = "containers-image-proxy"
readme = "README.md"
repository = "https://github.com/cgwalters/containers-image-proxy"
version = "0.1.1"
version = "0.2.0"

[dependencies]
anyhow = "1.0"
Expand Down
80 changes: 30 additions & 50 deletions src/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
//! More information: <https://github.com/containers/skopeo/pull/1476>

use anyhow::{anyhow, Context, Result};
use futures_util::{Future, FutureExt, TryFutureExt};
use futures_util::Future;
use nix::sys::socket::{self as nixsocket, ControlMessageOwned};
use nix::sys::uio::IoVec;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::{FromRawFd, RawFd};
use std::pin::Pin;
use std::process::{ExitStatus, Stdio};
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufRead, AsyncReadExt};

Expand Down Expand Up @@ -65,13 +65,12 @@ struct Reply {
value: serde_json::Value,
}

type JoinFuture<T> = Pin<Box<dyn Future<Output = Result<Result<T>>>>>;
type ChildFuture = Pin<Box<dyn Future<Output = std::io::Result<std::process::Output>>>>;

/// Manage a child process proxy to fetch container images.
pub struct ImageProxy {
sockfd: Arc<Mutex<File>>,
stderr: JoinFuture<String>,
procwait: Pin<Box<dyn Future<Output = Result<ExitStatus>>>>,
childwait: ChildFuture,
}

impl std::fmt::Debug for ImageProxy {
Expand Down Expand Up @@ -117,35 +116,15 @@ impl ImageProxy {
c.stdin(Stdio::from(theirsock));
let mut c = tokio::process::Command::from(c);
c.kill_on_drop(true);
let mut proc = c.spawn().context("Failed to spawn skopeo")?;

// Safety: We passed `Stdio::piped()` above
let mut child_stderr = proc.stderr.take().unwrap();

let stderr = tokio::spawn(async move {
let mut buf = String::new();
child_stderr.read_to_string(&mut buf).await?;
Ok(buf)
})
.map_err(anyhow::Error::msg)
.boxed();

let mut procwait = Box::pin(async move { proc.wait().map_err(anyhow::Error::msg).await });
let child = c.spawn().context("Failed to spawn skopeo")?;
let childwait = Box::pin(child.wait_with_output());

let sockfd = Arc::new(Mutex::new(mysock));

let mut r = Self { sockfd, childwait };

// Verify semantic version
let protoreq =
Self::impl_request_raw::<String>(Arc::clone(&sockfd), Request::new_bare("Initialize"));
let protover = tokio::select! {
r = protoreq => {
r?.0
}
r = &mut procwait => {
let errmsg = stderr.await??;
return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r?, errmsg));
}
};
let protover = r.impl_request::<String, _, ()>("Initialize", []).await?.0;
let protover = semver::Version::parse(protover.as_str())?;
let supported = &*SUPPORTED_PROTO_VERSION;
if !supported.matches(&protover) {
Expand All @@ -156,11 +135,6 @@ impl ImageProxy {
));
}

let r = Self {
stderr,
sockfd,
procwait,
};
Ok(r)
}

Expand Down Expand Up @@ -217,41 +191,50 @@ impl ImageProxy {
}

async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
&self,
&mut self,
method: &str,
args: T,
) -> Result<(R, Option<(File, u32)>)>
where
T: IntoIterator<Item = I>,
I: Into<serde_json::Value>,
{
let req = Request::new(method, args);
Self::impl_request_raw(Arc::clone(&self.sockfd), req).await
let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
tokio::select! {
r = req => {
Ok(r?)
}
r = &mut self.childwait => {
let r = r?;
let stderr = String::from_utf8_lossy(&r.stderr);
return Err(anyhow::anyhow!("proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr))
}
}
}

async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
async fn finish_pipe(&mut self, pipeid: u32) -> Result<()> {
let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
if fd.is_some() {
return Err(anyhow!("Unexpected fd in finish_pipe reply"));
}
Ok(r)
}

pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
pub async fn open_image(&mut self, imgref: &str) -> Result<OpenedImage> {
let (imgid, _) = self
.impl_request::<u32, _, _>("OpenImage", [imgref])
.await?;
Ok(OpenedImage(imgid))
}

pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
pub async fn close_image(&mut self, img: &OpenedImage) -> Result<()> {
let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
Ok(r)
}

/// Fetch the manifest.
/// https://github.com/opencontainers/image-spec/blob/main/manifest.md
pub async fn fetch_manifest(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
pub async fn fetch_manifest(&mut self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?;
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let mut fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd));
Expand All @@ -268,7 +251,7 @@ impl ImageProxy {
/// Note that right now the proxy does verification of the digest:
/// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
pub async fn get_blob(
&self,
&mut self,
img: &OpenedImage,
digest: &str,
size: u64,
Expand All @@ -293,13 +276,10 @@ impl ImageProxy {
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?;
drop(sendbuf);
let status = self.procwait.await?;
if !status.success() {
if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() {
anyhow::bail!("proxy failed: {}\n{}", status, stderr)
} else {
anyhow::bail!("proxy failed: {} (failed to fetch stderr)", status)
}
let output = self.childwait.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("proxy failed: {}\n{}", output.status, stderr)
}
Ok(())
}
Expand Down

0 comments on commit 5b74e02

Please sign in to comment.