Skip to content

unencapsulate: Wrap decompressor and implement Drop #1247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions ostree-ext/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,47 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

pub(crate) struct Decompressor {
inner: Box<dyn Read + Send + 'static>,
}

impl Read for Decompressor {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.read(buf)
}
}

impl Drop for Decompressor {
fn drop(&mut self) {
// We need to make sure to flush out the decompressor and/or
// tar stream here. For tar, we might not read through the
// entire stream, because the archive has zero-block-markers
// at the end; or possibly because the final entry is filtered
// in filter_tar so we don't advance to read the data. For
// decompressor, zstd:chunked layers will have
// metadata/skippable frames at the end of the stream. That
// data isn't relevant to the tar stream, but if we don't read
// it here then on the skopeo proxy we'll block trying to
// write the end of the stream. That in turn will block our
// client end trying to call FinishPipe, and we end up
// deadlocking ourselves through skopeo.
//
// https://github.com/bootc-dev/bootc/issues/1204

let mut sink = std::io::sink();
match std::io::copy(&mut self.inner, &mut sink) {
Err(e) => tracing::debug!("Ignoring error while dropping decompressor: {e}"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about as a followup we add a method fn finish(self) -> std::io::Result<()> so we can propagate any errors here instead of ignoring them?

This is a general pattern in Rust, a similar reference example is https://doc.rust-lang.org/std/io/struct.BufWriter.html#method.into_inner

Then we update the callers of decompress to invoke .finish()?; by default.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See

fn impl_close(&mut self) -> Result<()> {
for prior art

Ok(0) => { /* We already read everything and are happy */ }
Ok(n) => tracing::debug!("Read extra {n} bytes at end of decompressor stream"),
}
}
}

/// Create a decompressor for this MIME type, given a stream of input.
pub(crate) fn decompressor(
media_type: &oci_image::MediaType,
src: impl Read + Send + 'static,
) -> Result<Box<dyn Read + Send + 'static>> {
) -> Result<Decompressor> {
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
oci_image::MediaType::ImageLayerZstd => Box::new(zstd::stream::read::Decoder::new(src)?),
oci_image::MediaType::ImageLayerGzip => Box::new(flate2::bufread::GzDecoder::new(
Expand All @@ -205,7 +241,7 @@ pub(crate) fn decompressor(
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
Ok(Decompressor { inner: r })
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
Expand Down
20 changes: 0 additions & 20 deletions ostree-ext/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,26 +367,6 @@ async fn filter_tar_async(

let r = filter_tar(&mut src, dest, &config, &repo_tmpdir);

// We need to make sure to flush out the decompressor and/or
// tar stream here. For tar, we might not read through the
// entire stream, because the archive has zero-block-markers
// at the end; or possibly because the final entry is filtered
// in filter_tar so we don't advance to read the data. For
// decompressor, zstd:chunked layers will have
// metadata/skippable frames at the end of the stream. That
// data isn't relevant to the tar stream, but if we don't read
// it here then on the skopeo proxy we'll block trying to
// write the end of the stream. That in turn will block our
// client end trying to call FinishPipe, and we end up
// deadlocking ourselves through skopeo.
//
// https://github.com/bootc-dev/bootc/issues/1204
let mut sink = std::io::sink();
let n = std::io::copy(&mut src, &mut sink)?;
if n != 0 {
tracing::debug!("Read extra {n} bytes at end of decompressor stream");
}

Ok(r)
});
let copier = tokio::io::copy(&mut rx_buf, &mut dest);
Expand Down