Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Nov 23, 2021
1 parent 8ce5b41 commit 290bcf6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
33 changes: 12 additions & 21 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::Future;
use oci_spec::image as oci_image;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufRead, AsyncRead};
use tracing::{event, instrument, Level};

Expand All @@ -55,7 +56,7 @@ struct ProgressReader<T> {
#[pin]
reader: T,
#[pin]
progress: Option<Progress>,
progress: Option<Arc<Mutex<Progress>>>,
}

impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
Expand All @@ -69,6 +70,7 @@ impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
match this.reader.poll_read(cx, buf) {
v @ std::task::Poll::Ready(Ok(_)) => {
if let Some(progress) = this.progress.as_ref().get_ref() {
let progress = progress.lock().unwrap();
let state = {
let mut state = *progress.borrow();
let newlen = buf.filled().len();
Expand Down Expand Up @@ -108,20 +110,6 @@ pub struct Import {
pub image_digest: String,
}

fn require_one_layer_blob(manifest: &oci_image::ImageManifest) -> Result<&oci_image::Descriptor> {
let n = manifest.layers().len();
if let Some(layer) = manifest.layers().get(0) {
if n > 1 {
Err(anyhow!("Expected 1 layer, found {}", n))
} else {
Ok(layer)
}
} else {
// Validated by find_layer_blobids()
unreachable!()
}
}

/// Configuration for container fetches.
#[derive(Debug, Default)]
pub struct UnencapsulateOptions {
Expand Down Expand Up @@ -224,10 +212,12 @@ pub async fn unencapsulate_from_manifest(
let txn = repo.auto_transaction(Some(cancellable))?;

// First, import the commit
let commit_blob = rx.blocking_recv().unwrap();
let commit_blob = tokio_util::io::SyncIoBridge::new(commit_blob);
let mut archive = tar::Archive::new(commit_blob);
importer.import_commit(&mut archive, Some(cancellable))?;
{
let commit_blob = rx.blocking_recv().unwrap();
let commit_blob = tokio_util::io::SyncIoBridge::new(commit_blob);
let mut archive = tar::Archive::new(commit_blob);
importer.import_commit(&mut archive, Some(cancellable))?;
}

// Then, all component/split blobs
while let Some(blob) = rx.blocking_recv() {
Expand All @@ -241,11 +231,12 @@ pub async fn unencapsulate_from_manifest(
repo.mark_commit_partial(&checksum, false)?;
Ok::<_, anyhow::Error>(checksum)
});
let progress = options.progress.map(|v| Arc::new(Mutex::new(v)));
for &layer in std::iter::once(&commit_layer).chain(component_layers.iter()) {
let (blob, driver) = fetch_layer_decompress(&mut proxy, &oi, commit_layer).await?;
let (blob, driver) = fetch_layer_decompress(&mut proxy, &oi, layer).await?;
let blob = ProgressReader {
reader: blob,
progress: options.progress,
progress: progress.as_ref().map(|v| Arc::clone(v)),
};
if tx.send(blob).await.is_err() {
drop(tx);
Expand Down
22 changes: 15 additions & 7 deletions lib/src/tar/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,11 @@ impl Importer {
Ok(())
}

pub(crate) fn import_objects(
fn import_objects_impl<'a>(
&mut self,
archive: &mut tar::Archive<impl Read + Send + Unpin>,
ents: impl Iterator<Item = Result<(tar::Entry<'a, impl Read + Send + Unpin + 'a>, Utf8PathBuf)>>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
let ents = archive.entries()?.filter_map(|e| match e {
Ok(e) => Self::filter_entry(e).transpose(),
Err(e) => Some(Err(anyhow::Error::msg(e))),
});
for entry in ents {
let (entry, path) = entry?;

Expand All @@ -475,6 +471,18 @@ impl Importer {
Ok(())
}

pub(crate) fn import_objects(
&mut self,
archive: &mut tar::Archive<impl Read + Send + Unpin>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
let ents = archive.entries()?.filter_map(|e| match e {
Ok(e) => Self::filter_entry(e).transpose(),
Err(e) => Some(Err(anyhow::Error::msg(e))),
});
self.import_objects_impl(ents, cancellable)
}

pub(crate) fn import_commit(
&mut self,
archive: &mut tar::Archive<impl Read + Send + Unpin>,
Expand Down Expand Up @@ -581,7 +589,7 @@ impl Importer {
}
self.commit_checksum = Some(checksum);

self.import_objects(archive, cancellable)?;
self.import_objects_impl(ents, cancellable)?;

Ok(())
}
Expand Down

0 comments on commit 290bcf6

Please sign in to comment.