diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 96028300..e50fb8c9 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -40,6 +40,7 @@ tokio = { features = ["full"], version = "1" } tokio-stream = "0.1.5" tokio-util = { features = ["io"], version = "0.6" } tracing = "0.1" +scopeguard = "1.1.0" [dev-dependencies] clap = "2.33.3" diff --git a/lib/src/cli.rs b/lib/src/cli.rs index 2b15ea4e..d22ad0c8 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -12,6 +12,7 @@ use std::convert::TryInto; use std::ffi::OsString; use structopt::StructOpt; +use crate::container::store::{LayeredImageImporter, PrepareResult}; use crate::container::{Config, ImportOptions}; #[derive(Debug, StructOpt)] @@ -105,6 +106,16 @@ enum ContainerOpts { #[structopt(long)] cmd: Option>, }, + + /// Store a (possibly) layered container image with an OSTree base + StoreLayered { + /// Path to the repository + #[structopt(long)] + repo: String, + + /// Image reference, e.g. registry:quay.io/exampleos/exampleos:latest + imgref: String, + }, } /// Options for the Integrity Measurement Architecture (IMA). @@ -249,6 +260,23 @@ async fn container_info(imgref: &str) -> Result<()> { Ok(()) } +/// Write a layered container image into an OSTree commit. +async fn container_store(repo: &str, imgref: &str) -> Result<()> { + let repo = &ostree::Repo::open_at(libc::AT_FDCWD, repo, gio::NONE_CANCELLABLE)?; + let imgref = imgref.try_into()?; + let mut imp = LayeredImageImporter::new(&repo, &imgref).await?; + let prep = match imp.prepare().await? { + PrepareResult::AlreadyPresent(c) => { + println!("No changes in {} => {}", imgref, c); + return Ok(()); + } + PrepareResult::Ready(r) => r, + }; + let import = imp.import(prep).await?; + println!("Wrote: {} => {}", imgref, import); + Ok(()) +} + /// Add IMA signatures to an ostree commit, generating a new commit. fn ima_sign(cmdopts: &ImaSignOpts) -> Result<()> { let repo = @@ -306,6 +334,9 @@ where .collect(); container_export(&repo, &rev, &imgref, labels?, cmd).await } + Opt::Container(ContainerOpts::StoreLayered { repo, imgref }) => { + container_store(&repo, &imgref).await + } Opt::ImaSign(ref opts) => ima_sign(opts), } } diff --git a/lib/src/container/imageproxy.rs b/lib/src/container/imageproxy.rs index c43e7407..53f5587b 100644 --- a/lib/src/container/imageproxy.rs +++ b/lib/src/container/imageproxy.rs @@ -11,7 +11,7 @@ use hyper::{Body, Request, StatusCode}; use std::os::unix::prelude::AsRawFd; use std::pin::Pin; use std::process::Stdio; -use tokio::io::{AsyncBufRead, AsyncReadExt}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt}; // What we get from boxing a fallible tokio::spawn() closure. Note the nested Result. type JoinFuture = Pin>>>>; @@ -120,6 +120,17 @@ impl ImageProxy { Ok(body) } + // Download and decompress a blob in a streaming fashion. + pub(crate) async fn fetch_decompress_blob( + &mut self, + digest: &str, + ) -> Result { + let blob = self.fetch_blob(digest).await?; + // TODO - detect e.g. zstd compression too; but that's going to be a while + // to ship. + Ok(async_compression::tokio::bufread::GzipDecoder::new(blob)) + } + pub(crate) async fn finalize(mut self) -> Result<()> { // For now discard any errors from the connection drop(self.request_sender); diff --git a/lib/src/container/mod.rs b/lib/src/container/mod.rs index 1628f405..e798c260 100644 --- a/lib/src/container/mod.rs +++ b/lib/src/container/mod.rs @@ -230,6 +230,7 @@ pub use import::*; mod imageproxy; mod oci; mod skopeo; +pub mod store; #[cfg(test)] mod tests { diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs new file mode 100644 index 00000000..548ce48e --- /dev/null +++ b/lib/src/container/store.rs @@ -0,0 +1,233 @@ +//! APIs for generating OSTree commits from layered container images +//! +//! # Extension of import support +//! +//! This code supports ingesting arbitrary layered container images from an ostree-exported +//! base. See [`super::import`] for more information on encaspulation of images. + +use super::imageproxy::ImageProxy; +use super::*; +use anyhow::{anyhow, Context}; +use fn_error_context::context; +use ostree::gio; +use ostree::glib::GString; +use ostree::prelude::Cast; + +const LAYER_PREFIX: &str = "ostree/container/blob/"; +const IMAGE_PREFIX: &str = "ostree/container/image"; + +/// Convert e.g. sha256:12345... into `/ostree/container/blob/sha256_2B12345...`. +fn ref_for_blob_digest(d: &str) -> Result { + let escaped = crate::util::escape_for_ref(d)?; + Ok(format!("{}{}", LAYER_PREFIX, escaped)) +} + +/// Context for importing a container image. +pub struct LayeredImageImporter { + repo: ostree::Repo, + proxy: ImageProxy, + imgref: OstreeImageReference, + ostree_ref: String, +} + +/// Result of invoking [`LayeredImageImporter::prepare`]. +pub enum PrepareResult { + /// The image reference is already present; the contained string is the OSTree commit. + AlreadyPresent(String), + /// The image needs to be downloaded + Ready(PreparedImport), +} + +/// Information about which layers need to be downloaded. +pub struct PreparedImport { + /// The manifest digest that was found + pub manifest_digest: String, + manifest: oci::Manifest, + /// In-order mapping from layerid -> ostree commit + layers: Vec<(String, Option)>, +} + +impl PreparedImport { + /// Find all blobs without currently present ostree refs + fn blobs_to_fetch(&self) -> Vec<&str> { + self.layers + .iter() + .filter_map(|(blobid, commit)| { + if commit.is_none() { + Some(blobid.as_str()) + } else { + None + } + }) + .collect() + } + + /// Returns `true` if there are no layers to fetch for this import. + pub fn is_complete(&self) -> bool { + self.blobs_to_fetch().is_empty() + } +} + +impl LayeredImageImporter { + /// Create a new importer. + pub async fn new(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result { + let proxy = ImageProxy::new(&imgref.imgref).await?; + let repo = repo.clone(); + let ostree_ref = crate::util::escape_for_ref(&imgref.imgref.to_string())?; + Ok(LayeredImageImporter { + repo, + proxy, + ostree_ref, + imgref: imgref.clone(), + }) + } + + /// Determine if there is a new manifest, and if so return its digest. + #[context("Fetching manifest")] + pub async fn prepare(&mut self) -> Result { + match &self.imgref.sigverify { + SignatureSource::ContainerPolicy if skopeo::container_policy_is_default_insecure()? => { + return Err(anyhow!("containers-policy.json specifies a default of `insecureAcceptAnything`; refusing usage")); + } + SignatureSource::OstreeRemote(_) => { + return Err(anyhow!( + "Cannot currently verify layered containers via ostree remote" + )); + } + _ => {} + } + + // Do we already have this image? If so, we're done. + if let Some(merge_commit) = self.repo.resolve_rev(&self.ostree_ref, true)? { + return Ok(PrepareResult::AlreadyPresent(merge_commit.to_string())); + } + + let (manifest_digest, manifest_bytes) = self.proxy.fetch_manifest().await?; + let manifest: oci::Manifest = serde_json::from_slice(&manifest_bytes)?; + let layers: Result> = manifest + .layers + .iter() + .map(|layer| -> Result<_> { + let blobid = layer.digest.as_str(); + let blobref = ref_for_blob_digest(blobid)?; + let blobcommit = self.repo.resolve_rev(&blobref, true)?; + Ok((blobid.to_string(), blobcommit)) + }) + .collect(); + let layers = layers?; + + let imp = PreparedImport { + manifest, + manifest_digest, + layers, + }; + Ok(PrepareResult::Ready(imp)) + } + + /// Import a layered container image + pub async fn import(mut self, import: PreparedImport) -> Result { + // TODO hook up gcancellable + async https://github.com/gtk-rs/gtk-rs-core/issues/240 + let cancellable = gio::NONE_CANCELLABLE; + let manifest = import.manifest; + + // First download the base image - we need the SELinux policy + // there to label all following layers. + // Presence of at least one layer is validated by find_layer_blobids + let &base_layer = manifest.find_layer_blobids()?.iter().next().unwrap(); + let base_ref = ref_for_blob_digest(base_layer)?; + let base_commit = if let Some(base_commit) = self.repo.resolve_rev(&base_ref, true)? { + base_commit.to_string() + } else { + let blob = self.proxy.fetch_decompress_blob(base_layer).await?; + let commit = crate::tar::import_tar(&self.repo, blob, None) + .await + .with_context(|| format!("Parsing blob {}", base_layer))?; + // TODO support ref writing in tar import + self.repo + .set_ref_immediate(None, &base_ref, Some(commit.as_str()), cancellable)?; + commit + }; + + let mut blob_commits = Vec::new(); + for (blobid, commit) in import.layers.iter() { + if blobid == base_layer { + continue; + } + if let Some(c) = commit { + blob_commits.push(c.to_string()); + } else { + let blob = self.proxy.fetch_decompress_blob(blobid).await?; + let blobref = ref_for_blob_digest(blobid)?; + let opts = crate::tar::WriteTarOptions { + base: Some(base_commit.as_str()), + selinux: true, + }; + let commit = crate::tar::write_tar(&self.repo, blob, &blobref, Some(opts)) + .await + .with_context(|| format!("Parsing blob {}", blobid))?; + blob_commits.push(commit); + } + } + + // We're done with the proxy, make sure it didn't have any errors. + self.proxy.finalize().await?; + + // Destructure to transfer ownership to thread + let repo = self.repo; + let target_ref = self.ostree_ref; + tokio::task::spawn_blocking(move || -> Result { + let repo = &repo; + scopeguard::defer! { + let _ = repo.abort_transaction(cancellable); + } + let (base_commit_tree, _) = repo.read_commit(&base_commit, gio::NONE_CANCELLABLE)?; + let base_commit_tree = base_commit_tree.downcast::().unwrap(); + let base_contents_obj = base_commit_tree.tree_get_contents_checksum().unwrap(); + let base_metadata_obj = base_commit_tree.tree_get_contents_checksum().unwrap(); + let mt = + ostree::MutableTree::from_checksum(&repo, &base_contents_obj, &base_metadata_obj); + repo.prepare_transaction(cancellable)?; + // Layer all subsequent commits + for commit in blob_commits { + let (layer_tree, _) = repo.read_commit(&commit, gio::NONE_CANCELLABLE)?; + repo.write_directory_to_mtree(&layer_tree, &mt, None, gio::NONE_CANCELLABLE)?; + } + + let merged_root = repo.write_mtree(&mt, gio::NONE_CANCELLABLE)?; + let merged_root = merged_root.downcast::().unwrap(); + let merged_commit = + repo.write_commit(None, None, None, None, &merged_root, gio::NONE_CANCELLABLE)?; + repo.transaction_set_ref(None, &target_ref, Some(merged_commit.as_str())); + repo.commit_transaction(cancellable)?; + Ok(merged_commit.to_string()) + }) + .await? + } +} + +/// List all images stored +pub fn list_images(repo: &ostree::Repo) -> Result> { + let cancellable = gio::NONE_CANCELLABLE; + let refs = repo.list_refs_ext( + Some(IMAGE_PREFIX), + ostree::RepoListRefsExtFlags::empty(), + cancellable, + )?; + let r: Result> = refs + .keys() + .map(|imgname| { + let img = imgname.strip_prefix(IMAGE_PREFIX).unwrap(); + crate::util::unescape_for_ref(img) + }) + .collect(); + Ok(r?) +} + +/// Remove the specified images and their corresponding blobs. +pub fn prune_images(_repo: &ostree::Repo, _imgs: &[&str]) -> Result<()> { + // Most robust approach is to iterate over all known images, load the + // manifest and build the set of reachable blobs, then compute the set + // Set(unreachable) = Set(all) - Set(reachable) + // And remove the unreachable ones. + unimplemented!() +} diff --git a/lib/src/util.rs b/lib/src/util.rs index a80bbb37..59822a31 100644 --- a/lib/src/util.rs +++ b/lib/src/util.rs @@ -13,7 +13,7 @@ pub(crate) fn escape_for_ref(s: &str) -> Result { c if c.is_ascii_alphanumeric() => { r.push(c); } - '.' | '-' | '/' => r.push(c), + '-' | '/' => r.push(c), '_' => r.push_str("__"), o => write!(r, "_{:02X}", o as u8).unwrap(), } @@ -33,7 +33,7 @@ pub(crate) fn unescape_for_ref(s: &str) -> Result { c if c.is_ascii_alphanumeric() => { r.push(c); } - '.' | '-' | '/' => r.push(c), + '-' | '/' => r.push(c), '_' => { let next = it.next(); if let Some('_') = next { @@ -48,7 +48,7 @@ pub(crate) fn unescape_for_ref(s: &str) -> Result { let v: char = v.try_into()?; r.push(v); } - o => { + _ => { anyhow::bail!("Invalid escaped ref string: {}", s) } } @@ -65,7 +65,11 @@ mod test { use super::*; const UNCHANGED: &[&str] = &["", "foo", "foo/bar/baz-blah/foo"]; - const ROUNDTRIP: &[&str] = &["localhost:5000/foo:latest", "fedora/x86_64/coreos"]; + const ROUNDTRIP: &[&str] = &[ + "localhost:5000/foo:latest", + "fedora/x86_64/coreos", + "/foo/bar/foo.oci-archive", + ]; #[test] fn escape() { diff --git a/lib/tests/it/main.rs b/lib/tests/it/main.rs index 0b28580f..f76a199d 100644 --- a/lib/tests/it/main.rs +++ b/lib/tests/it/main.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; use fn_error_context::context; use indoc::indoc; +use ostree_ext::container::store::PrepareResult; use ostree_ext::container::{ Config, ImageReference, OstreeImageReference, SignatureSource, Transport, }; @@ -367,7 +368,7 @@ async fn test_container_import_export() -> Result<()> { Ok(()) } -/// We should currently reject an image with multiple layers. +/// We should reject an image with multiple layers when doing an "import" - i.e. a direct un-encapsulation. #[tokio::test] async fn test_container_import_derive() -> Result<()> { let fixture = Fixture::new()?; @@ -385,6 +386,45 @@ async fn test_container_import_derive() -> Result<()> { Ok(()) } +/// But layers work via the container::write module. +#[tokio::test] +async fn test_container_write_derive() -> Result<()> { + let fixture = Fixture::new()?; + let exampleos_path = &fixture.path.join("exampleos-derive.ociarchive"); + std::fs::write(exampleos_path, EXAMPLEOS_DERIVED_OCI)?; + let exampleos_ref = OstreeImageReference { + sigverify: SignatureSource::ContainerPolicyAllowInsecure, + imgref: ImageReference { + transport: Transport::OciArchive, + name: exampleos_path.to_string(), + }, + }; + let mut imp = + ostree_ext::container::store::LayeredImageImporter::new(&fixture.destrepo, &exampleos_ref) + .await?; + let prep = match imp.prepare().await? { + PrepareResult::AlreadyPresent(_) => panic!("should not be already imported"), + PrepareResult::Ready(r) => r, + }; + let commit = imp.import(prep).await?; + bash!( + "ostree --repo={repo} ls {commit} /usr/share/anewfile", + repo = fixture.destrepo_path.as_str(), + commit = commit + )?; + let mut imp = + ostree_ext::container::store::LayeredImageImporter::new(&fixture.destrepo, &exampleos_ref) + .await?; + let already_present = match imp.prepare().await? { + PrepareResult::AlreadyPresent(c) => c, + PrepareResult::Ready(_) => { + panic!("Should have already imported commit {}", commit) + } + }; + assert_eq!(commit, already_present); + Ok(()) +} + #[ignore] #[tokio::test] // Verify that we can push and pull to a registry, not just oci-archive:.