Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Dec 6, 2021
1 parent 290bcf6 commit 9a1b9fb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
15 changes: 7 additions & 8 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,9 @@ impl LayeredImageImporter {
)
.await?;
let importer = crate::tar::import_tar(&self.repo, blob, None);
let (commit, driver) = tokio::join!(importer, driver);
driver?;
let commit =
commit.with_context(|| format!("Parsing blob {}", base_layer_ref.digest()))?;
let commit = super::unencapsulate::join_fetch(importer, driver)
.await
.with_context(|| format!("Parsing blob {}", base_layer_ref.digest()))?;
// TODO support ref writing in tar import
self.repo.set_ref_immediate(
None,
Expand Down Expand Up @@ -313,11 +312,11 @@ impl LayeredImageImporter {
base: Some(base_commit.clone()),
selinux: true,
};
let w =
let r =
crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts));
let (r, driver) = tokio::join!(w, driver);
let r = r.with_context(|| format!("Parsing layer blob {}", layer.digest()))?;
driver?;
let r = super::unencapsulate::join_fetch(r, driver)
.await
.with_context(|| format!("Parsing layer blob {}", layer.digest()))?;
layer_commits.push(r.commit);
if !r.filtered.is_empty() {
let filtered = HashMap::from_iter(r.filtered.into_iter());
Expand Down
45 changes: 38 additions & 7 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,37 @@ pub(crate) async fn fetch_layer_decompress<'a>(
Ok((blob, driver))
}

/// Use this to process potential errors from a worker and a driver.
/// This is really a brutal hack around the fact that an error can occur
/// on either our side or in the proxy. But if an error occurs on our
/// side, then we will close the pipe, which will *also* cause the proxy
/// to error out.
///
/// What we really want is for the proxy to tell us when it got an
/// error from us closing the pipe. Or, we could store that state
/// on our side. Both are slightly tricky, so we have this (again)
/// hacky thing where we just search for `broken pipe` in the error text.
pub(crate) async fn join_fetch<T: std::fmt::Debug>(
worker: impl Future<Output = Result<T>>,
driver: impl Future<Output = Result<()>>,
) -> Result<T> {
let (worker, driver) = tokio::join!(worker, driver);
dbg!(&worker, &driver);
match (worker, driver) {
(Ok(t), Ok(())) => Ok(t),
(Err(worker), Err(driver)) => {
let text = driver.root_cause().to_string();
if text.ends_with("broken pipe") {
Err(worker)
} else {
Err(worker.context(format!("proxy failure: {} and client error", text)))
}
}
(Ok(_), Err(driver)) => Err(driver),
(Err(worker), Ok(())) => Err(worker),
}
}

/// Fetch a container image using an in-memory manifest and import its embedded OSTree commit.
#[context("Importing {}", imgref)]
#[instrument(skip(repo, options, manifest))]
Expand Down Expand Up @@ -238,18 +269,18 @@ pub async fn unencapsulate_from_manifest(
reader: blob,
progress: progress.as_ref().map(|v| Arc::clone(v)),
};
if tx.send(blob).await.is_err() {
let (txsend, driver) = tokio::join!(tx.send(blob), driver);
let err = txsend
.err()
.map(|_| anyhow::anyhow!("Failed to send"))
.or(driver.err());
if let Some(err) = err {
drop(tx);
return match import.await? {
Ok(_) => {
return Err(anyhow::anyhow!(
"internal error: import worker thread did not set error"
))
}
Ok(_) => return Err(err),
Err(e) => Err(e),
};
}
driver.await?;
}
drop(tx);

Expand Down
1 change: 1 addition & 0 deletions lib/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct WriteTarOptions {
///
/// This includes some basic data on the number of files that were filtered
/// out because they were not in `/usr`.
#[derive(Debug)]
pub struct WriteTarResult {
/// The resulting OSTree commit SHA-256.
pub commit: String,
Expand Down

0 comments on commit 9a1b9fb

Please sign in to comment.