Skip to content

Commit

Permalink
WIP: Add a new container/store module
Browse files Browse the repository at this point in the history
The initial scope of this project was just "encapsulating" ostree
commits in containers.

However, when doing that a very, very natural question arises:
Why not support *deriving* from that base image container, and
have the tooling natively support importing it?

This initial prototype code implements that.  Here, we still use
the `tar::import` path for the base image - we expect it to have
a pre-generated ostree commit.

This new `container::store` module processes layered images and
generates (client side) ostree commits from the tar layers.

There's a whole lot of new infrastructure we need around mapping
ostree refs to blobs and images, etc.
  • Loading branch information
cgwalters committed Sep 24, 2021
1 parent 06d3740 commit a138e92
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 6 deletions.
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tokio = { features = ["full"], version = "1" }
tokio-stream = "0.1.5"
tokio-util = { features = ["io"], version = "0.6" }
tracing = "0.1"
scopeguard = "1.1.0"

[dev-dependencies]
clap = "2.33.3"
Expand Down
31 changes: 31 additions & 0 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::convert::TryInto;
use std::ffi::OsString;
use structopt::StructOpt;

use crate::container::store::{LayeredImageImporter, PrepareResult};
use crate::container::{Config, ImportOptions};

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -105,6 +106,16 @@ enum ContainerOpts {
#[structopt(long)]
cmd: Option<Vec<String>>,
},

/// Store a (possibly) layered container image with an OSTree base
StoreLayered {
/// Path to the repository
#[structopt(long)]
repo: String,

/// Image reference, e.g. registry:quay.io/exampleos/exampleos:latest
imgref: String,
},
}

/// Options for the Integrity Measurement Architecture (IMA).
Expand Down Expand Up @@ -249,6 +260,23 @@ async fn container_info(imgref: &str) -> Result<()> {
Ok(())
}

/// Write a layered container image into an OSTree commit.
async fn container_store(repo: &str, imgref: &str) -> Result<()> {
let repo = &ostree::Repo::open_at(libc::AT_FDCWD, repo, gio::NONE_CANCELLABLE)?;
let imgref = imgref.try_into()?;
let mut imp = LayeredImageImporter::new(&repo, &imgref).await?;
let prep = match imp.prepare().await? {
PrepareResult::AlreadyPresent(c) => {
println!("No changes in {} => {}", imgref, c);
return Ok(());
}
PrepareResult::Ready(r) => r,
};
let import = imp.import(prep).await?;
println!("Wrote: {} => {}", imgref, import);
Ok(())
}

/// Add IMA signatures to an ostree commit, generating a new commit.
fn ima_sign(cmdopts: &ImaSignOpts) -> Result<()> {
let repo =
Expand Down Expand Up @@ -306,6 +334,9 @@ where
.collect();
container_export(&repo, &rev, &imgref, labels?, cmd).await
}
Opt::Container(ContainerOpts::StoreLayered { repo, imgref }) => {
container_store(&repo, &imgref).await
}
Opt::ImaSign(ref opts) => ima_sign(opts),
}
}
13 changes: 12 additions & 1 deletion lib/src/container/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use hyper::{Body, Request, StatusCode};
use std::os::unix::prelude::AsRawFd;
use std::pin::Pin;
use std::process::Stdio;
use tokio::io::{AsyncBufRead, AsyncReadExt};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt};

// What we get from boxing a fallible tokio::spawn() closure. Note the nested Result.
type JoinFuture<T> = Pin<Box<dyn Future<Output = Result<Result<T>>>>>;
Expand Down Expand Up @@ -120,6 +120,17 @@ impl ImageProxy {
Ok(body)
}

// Download and decompress a blob in a streaming fashion.
pub(crate) async fn fetch_decompress_blob(
&mut self,
digest: &str,
) -> Result<impl AsyncRead + Send + Unpin> {
let blob = self.fetch_blob(digest).await?;
// TODO - detect e.g. zstd compression too; but that's going to be a while
// to ship.
Ok(async_compression::tokio::bufread::GzipDecoder::new(blob))
}

pub(crate) async fn finalize(mut self) -> Result<()> {
// For now discard any errors from the connection
drop(self.request_sender);
Expand Down
1 change: 1 addition & 0 deletions lib/src/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ pub use import::*;
mod imageproxy;
mod oci;
mod skopeo;
pub mod store;

#[cfg(test)]
mod tests {
Expand Down
233 changes: 233 additions & 0 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//! APIs for generating OSTree commits from layered container images
//!
//! # Extension of import support
//!
//! This code supports ingesting arbitrary layered container images from an ostree-exported
//! base. See [`super::import`] for more information on encaspulation of images.

use super::imageproxy::ImageProxy;
use super::*;
use anyhow::{anyhow, Context};
use fn_error_context::context;
use ostree::gio;
use ostree::glib::GString;
use ostree::prelude::Cast;

const LAYER_PREFIX: &str = "ostree/container/blob/";
const IMAGE_PREFIX: &str = "ostree/container/image";

/// Convert e.g. sha256:12345... into `/ostree/container/blob/sha256_2B12345...`.
fn ref_for_blob_digest(d: &str) -> Result<String> {
let escaped = crate::util::escape_for_ref(d)?;
Ok(format!("{}{}", LAYER_PREFIX, escaped))
}

/// Context for importing a container image.
pub struct LayeredImageImporter {
repo: ostree::Repo,
proxy: ImageProxy,
imgref: OstreeImageReference,
ostree_ref: String,
}

/// Result of invoking [`LayeredImageImporter::prepare`].
pub enum PrepareResult {
/// The image reference is already present; the contained string is the OSTree commit.
AlreadyPresent(String),
/// The image needs to be downloaded
Ready(PreparedImport),
}

/// Information about which layers need to be downloaded.
pub struct PreparedImport {
/// The manifest digest that was found
pub manifest_digest: String,
manifest: oci::Manifest,
/// In-order mapping from layerid -> ostree commit
layers: Vec<(String, Option<GString>)>,
}

impl PreparedImport {
/// Find all blobs without currently present ostree refs
fn blobs_to_fetch(&self) -> Vec<&str> {
self.layers
.iter()
.filter_map(|(blobid, commit)| {
if commit.is_none() {
Some(blobid.as_str())
} else {
None
}
})
.collect()
}

/// Returns `true` if there are no layers to fetch for this import.
pub fn is_complete(&self) -> bool {
self.blobs_to_fetch().is_empty()
}
}

impl LayeredImageImporter {
/// Create a new importer.
pub async fn new(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result<Self> {
let proxy = ImageProxy::new(&imgref.imgref).await?;
let repo = repo.clone();
let ostree_ref = crate::util::escape_for_ref(&imgref.imgref.to_string())?;
Ok(LayeredImageImporter {
repo,
proxy,
ostree_ref,
imgref: imgref.clone(),
})
}

/// Determine if there is a new manifest, and if so return its digest.
#[context("Fetching manifest")]
pub async fn prepare(&mut self) -> Result<PrepareResult> {
match &self.imgref.sigverify {
SignatureSource::ContainerPolicy if skopeo::container_policy_is_default_insecure()? => {
return Err(anyhow!("containers-policy.json specifies a default of `insecureAcceptAnything`; refusing usage"));
}
SignatureSource::OstreeRemote(_) => {
return Err(anyhow!(
"Cannot currently verify layered containers via ostree remote"
));
}
_ => {}
}

// Do we already have this image? If so, we're done.
if let Some(merge_commit) = self.repo.resolve_rev(&self.ostree_ref, true)? {
return Ok(PrepareResult::AlreadyPresent(merge_commit.to_string()));
}

let (manifest_digest, manifest_bytes) = self.proxy.fetch_manifest().await?;
let manifest: oci::Manifest = serde_json::from_slice(&manifest_bytes)?;
let layers: Result<Vec<_>> = manifest
.layers
.iter()
.map(|layer| -> Result<_> {
let blobid = layer.digest.as_str();
let blobref = ref_for_blob_digest(blobid)?;
let blobcommit = self.repo.resolve_rev(&blobref, true)?;
Ok((blobid.to_string(), blobcommit))
})
.collect();
let layers = layers?;

let imp = PreparedImport {
manifest,
manifest_digest,
layers,
};
Ok(PrepareResult::Ready(imp))
}

/// Import a layered container image
pub async fn import(mut self, import: PreparedImport) -> Result<String> {
// TODO hook up gcancellable + async https://github.com/gtk-rs/gtk-rs-core/issues/240
let cancellable = gio::NONE_CANCELLABLE;
let manifest = import.manifest;

// First download the base image - we need the SELinux policy
// there to label all following layers.
// Presence of at least one layer is validated by find_layer_blobids
let &base_layer = manifest.find_layer_blobids()?.iter().next().unwrap();
let base_ref = ref_for_blob_digest(base_layer)?;
let base_commit = if let Some(base_commit) = self.repo.resolve_rev(&base_ref, true)? {
base_commit.to_string()
} else {
let blob = self.proxy.fetch_decompress_blob(base_layer).await?;
let commit = crate::tar::import_tar(&self.repo, blob, None)
.await
.with_context(|| format!("Parsing blob {}", base_layer))?;
// TODO support ref writing in tar import
self.repo
.set_ref_immediate(None, &base_ref, Some(commit.as_str()), cancellable)?;
commit
};

let mut blob_commits = Vec::new();
for (blobid, commit) in import.layers.iter() {
if blobid == base_layer {
continue;
}
if let Some(c) = commit {
blob_commits.push(c.to_string());
} else {
let blob = self.proxy.fetch_decompress_blob(blobid).await?;
let blobref = ref_for_blob_digest(blobid)?;
let opts = crate::tar::WriteTarOptions {
base: Some(base_commit.as_str()),
selinux: true,
};
let commit = crate::tar::write_tar(&self.repo, blob, &blobref, Some(opts))
.await
.with_context(|| format!("Parsing blob {}", blobid))?;
blob_commits.push(commit);
}
}

// We're done with the proxy, make sure it didn't have any errors.
self.proxy.finalize().await?;

// Destructure to transfer ownership to thread
let repo = self.repo;
let target_ref = self.ostree_ref;
tokio::task::spawn_blocking(move || -> Result<String> {
let repo = &repo;
scopeguard::defer! {
let _ = repo.abort_transaction(cancellable);
}
let (base_commit_tree, _) = repo.read_commit(&base_commit, gio::NONE_CANCELLABLE)?;
let base_commit_tree = base_commit_tree.downcast::<ostree::RepoFile>().unwrap();
let base_contents_obj = base_commit_tree.tree_get_contents_checksum().unwrap();
let base_metadata_obj = base_commit_tree.tree_get_contents_checksum().unwrap();
let mt =
ostree::MutableTree::from_checksum(&repo, &base_contents_obj, &base_metadata_obj);
repo.prepare_transaction(cancellable)?;
// Layer all subsequent commits
for commit in blob_commits {
let (layer_tree, _) = repo.read_commit(&commit, gio::NONE_CANCELLABLE)?;
repo.write_directory_to_mtree(&layer_tree, &mt, None, gio::NONE_CANCELLABLE)?;
}

let merged_root = repo.write_mtree(&mt, gio::NONE_CANCELLABLE)?;
let merged_root = merged_root.downcast::<ostree::RepoFile>().unwrap();
let merged_commit =
repo.write_commit(None, None, None, None, &merged_root, gio::NONE_CANCELLABLE)?;
repo.transaction_set_ref(None, &target_ref, Some(merged_commit.as_str()));
repo.commit_transaction(cancellable)?;
Ok(merged_commit.to_string())
})
.await?
}
}

/// List all images stored
pub fn list_images(repo: &ostree::Repo) -> Result<Vec<String>> {
let cancellable = gio::NONE_CANCELLABLE;
let refs = repo.list_refs_ext(
Some(IMAGE_PREFIX),
ostree::RepoListRefsExtFlags::empty(),
cancellable,
)?;
let r: Result<Vec<_>> = refs
.keys()
.map(|imgname| {
let img = imgname.strip_prefix(IMAGE_PREFIX).unwrap();
crate::util::unescape_for_ref(img)
})
.collect();
Ok(r?)
}

/// Remove the specified images and their corresponding blobs.
pub fn prune_images(_repo: &ostree::Repo, _imgs: &[&str]) -> Result<()> {
// Most robust approach is to iterate over all known images, load the
// manifest and build the set of reachable blobs, then compute the set
// Set(unreachable) = Set(all) - Set(reachable)
// And remove the unreachable ones.
unimplemented!()
}
12 changes: 8 additions & 4 deletions lib/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) fn escape_for_ref(s: &str) -> Result<String> {
c if c.is_ascii_alphanumeric() => {
r.push(c);
}
'.' | '-' | '/' => r.push(c),
'-' | '/' => r.push(c),
'_' => r.push_str("__"),
o => write!(r, "_{:02X}", o as u8).unwrap(),
}
Expand All @@ -33,7 +33,7 @@ pub(crate) fn unescape_for_ref(s: &str) -> Result<String> {
c if c.is_ascii_alphanumeric() => {
r.push(c);
}
'.' | '-' | '/' => r.push(c),
'-' | '/' => r.push(c),
'_' => {
let next = it.next();
if let Some('_') = next {
Expand All @@ -48,7 +48,7 @@ pub(crate) fn unescape_for_ref(s: &str) -> Result<String> {
let v: char = v.try_into()?;
r.push(v);
}
o => {
_ => {
anyhow::bail!("Invalid escaped ref string: {}", s)
}
}
Expand All @@ -65,7 +65,11 @@ mod test {
use super::*;

const UNCHANGED: &[&str] = &["", "foo", "foo/bar/baz-blah/foo"];
const ROUNDTRIP: &[&str] = &["localhost:5000/foo:latest", "fedora/x86_64/coreos"];
const ROUNDTRIP: &[&str] = &[
"localhost:5000/foo:latest",
"fedora/x86_64/coreos",
"/foo/bar/foo.oci-archive",
];

#[test]
fn escape() {
Expand Down
Loading

0 comments on commit a138e92

Please sign in to comment.