diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs index a2e5037a..7fe39574 100644 --- a/lib/src/container/store.rs +++ b/lib/src/container/store.rs @@ -280,10 +280,9 @@ impl LayeredImageImporter { ) .await?; let importer = crate::tar::import_tar(&self.repo, blob, None); - let (commit, driver) = tokio::join!(importer, driver); - driver?; - let commit = - commit.with_context(|| format!("Parsing blob {}", base_layer_ref.digest()))?; + let commit = super::unencapsulate::join_fetch(importer, driver) + .await + .with_context(|| format!("Parsing blob {}", base_layer_ref.digest()))?; // TODO support ref writing in tar import self.repo.set_ref_immediate( None, @@ -313,11 +312,11 @@ impl LayeredImageImporter { base: Some(base_commit.clone()), selinux: true, }; - let w = + let r = crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts)); - let (r, driver) = tokio::join!(w, driver); - let r = r.with_context(|| format!("Parsing layer blob {}", layer.digest()))?; - driver?; + let r = super::unencapsulate::join_fetch(r, driver) + .await + .with_context(|| format!("Parsing layer blob {}", layer.digest()))?; layer_commits.push(r.commit); if !r.filtered.is_empty() { let filtered = HashMap::from_iter(r.filtered.into_iter()); diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index 8afe290e..4b0b158f 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -165,6 +165,37 @@ pub(crate) async fn fetch_layer_decompress<'a>( Ok((blob, driver)) } +/// Use this to process potential errors from a worker and a driver. +/// This is really a brutal hack around the fact that an error can occur +/// on either our side or in the proxy. But if an error occurs on our +/// side, then we will close the pipe, which will *also* cause the proxy +/// to error out. +/// +/// What we really want is for the proxy to tell us when it got an +/// error from us closing the pipe. Or, we could store that state +/// on our side. Both are slightly tricky, so we have this (again) +/// hacky thing where we just search for `broken pipe` in the error text. +pub(crate) async fn join_fetch( + worker: impl Future>, + driver: impl Future>, +) -> Result { + let (worker, driver) = tokio::join!(worker, driver); + dbg!(&worker, &driver); + match (worker, driver) { + (Ok(t), Ok(())) => Ok(t), + (Err(worker), Err(driver)) => { + let text = driver.root_cause().to_string(); + if text.ends_with("broken pipe") { + Err(worker) + } else { + Err(worker.context(format!("proxy failure: {} and client error", text))) + } + } + (Ok(_), Err(driver)) => Err(driver), + (Err(worker), Ok(())) => Err(worker), + } +} + /// Fetch a container image using an in-memory manifest and import its embedded OSTree commit. #[context("Importing {}", imgref)] #[instrument(skip(repo, options, manifest))] @@ -238,18 +269,18 @@ pub async fn unencapsulate_from_manifest( reader: blob, progress: progress.as_ref().map(|v| Arc::clone(v)), }; - if tx.send(blob).await.is_err() { + let (txsend, driver) = tokio::join!(tx.send(blob), driver); + let err = txsend + .err() + .map(|_| anyhow::anyhow!("Failed to send")) + .or(driver.err()); + if let Some(err) = err { drop(tx); return match import.await? { - Ok(_) => { - return Err(anyhow::anyhow!( - "internal error: import worker thread did not set error" - )) - } + Ok(_) => return Err(err), Err(e) => Err(e), }; } - driver.await?; } drop(tx); diff --git a/lib/src/tar/write.rs b/lib/src/tar/write.rs index 578dc710..bd3b0e42 100644 --- a/lib/src/tar/write.rs +++ b/lib/src/tar/write.rs @@ -36,6 +36,7 @@ pub struct WriteTarOptions { /// /// This includes some basic data on the number of files that were filtered /// out because they were not in `/usr`. +#[derive(Debug)] pub struct WriteTarResult { /// The resulting OSTree commit SHA-256. pub commit: String,