Skip to content

Commit

Permalink
Unify url and oci buildcache push code paths (spack#45776)
Browse files Browse the repository at this point in the history
  • Loading branch information
haampie authored Aug 20, 2024
1 parent 9d0b9f0 commit 2ae5596
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 142 deletions.
180 changes: 138 additions & 42 deletions lib/spack/spack/binary_distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import codecs
import collections
import concurrent.futures
import contextlib
import copy
import hashlib
import io
Expand All @@ -25,7 +24,7 @@
import urllib.request
import warnings
from contextlib import closing
from typing import Dict, Generator, Iterable, List, NamedTuple, Optional, Set, Tuple, Union
from typing import Dict, Iterable, List, NamedTuple, Optional, Set, Tuple, Union

import llnl.util.filesystem as fsys
import llnl.util.lang
Expand Down Expand Up @@ -958,7 +957,7 @@ def _spec_files_from_cache(url: str):
raise ListMirrorSpecsError("Failed to get list of specs from {0}".format(url))


def generate_package_index(url: str, tmpdir: str, concurrency: int = 32):
def _url_generate_package_index(url: str, tmpdir: str, concurrency: int = 32):
"""Create or replace the build cache index on the given mirror. The
buildcache index contains an entry for each binary package under the
cache_prefix.
Expand Down Expand Up @@ -1119,7 +1118,7 @@ def _exists_in_buildcache(spec: Spec, tmpdir: str, out_url: str) -> ExistsInBuil
return ExistsInBuildcache(signed, unsigned, tarball)


def _upload_tarball_and_specfile(
def _url_upload_tarball_and_specfile(
spec: Spec, tmpdir: str, out_url: str, exists: ExistsInBuildcache, signing_key: Optional[str]
):
files = BuildcacheFiles(spec, tmpdir, out_url)
Expand Down Expand Up @@ -1154,47 +1153,144 @@ def _upload_tarball_and_specfile(
)


def _format_spec(spec: Spec) -> str:
return spec.cformat("{name}{@version}{/hash:7}")
class Uploader:
def __init__(self, mirror: spack.mirror.Mirror, force: bool, update_index: bool):
self.mirror = mirror
self.force = force
self.update_index = update_index

self.tmpdir: str
self.executor: concurrent.futures.Executor

@contextlib.contextmanager
def default_push_context() -> Generator[Tuple[str, concurrent.futures.Executor], None, None]:
with tempfile.TemporaryDirectory(
dir=spack.stage.get_stage_root()
) as tmpdir, spack.util.parallel.make_concurrent_executor() as executor:
yield tmpdir, executor
def __enter__(self):
self._tmpdir = tempfile.TemporaryDirectory(dir=spack.stage.get_stage_root())
self._executor = spack.util.parallel.make_concurrent_executor()

self.tmpdir = self._tmpdir.__enter__()
self.executor = self.executor = self._executor.__enter__()

def push_or_raise(
specs: List[Spec],
out_url: str,
signing_key: Optional[str],
force: bool = False,
update_index: bool = False,
) -> List[Spec]:
"""Same as push, but raises an exception on error. Returns a list of skipped specs already
present in the build cache when force=False."""
skipped, errors = push(specs, out_url, signing_key, force, update_index)
if errors:
raise PushToBuildCacheError(
f"Failed to push {len(errors)} specs to {out_url}:\n"
+ "\n".join(f"Failed to push {_format_spec(spec)}: {error}" for spec, error in errors)
return self

def __exit__(self, *args):
self._executor.__exit__(*args)
self._tmpdir.__exit__(*args)

def push_or_raise(self, specs: List[spack.spec.Spec]) -> List[spack.spec.Spec]:
skipped, errors = self.push(specs)
if errors:
raise PushToBuildCacheError(
f"Failed to push {len(errors)} specs to {self.mirror.push_url}:\n"
+ "\n".join(
f"Failed to push {_format_spec(spec)}: {error}" for spec, error in errors
)
)
return skipped

def push(
self, specs: List[spack.spec.Spec]
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
raise NotImplementedError

def tag(self, tag: str, roots: List[spack.spec.Spec]):
"""Make a list of selected specs together available under the given tag"""
pass


class OCIUploader(Uploader):
def __init__(
self,
mirror: spack.mirror.Mirror,
force: bool,
update_index: bool,
base_image: Optional[str],
) -> None:
super().__init__(mirror, force, update_index)
self.target_image = spack.oci.oci.image_from_mirror(mirror)
self.base_image = ImageReference.from_string(base_image) if base_image else None

def push(
self, specs: List[spack.spec.Spec]
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
skipped, base_images, checksums, upload_errors = _oci_push(
target_image=self.target_image,
base_image=self.base_image,
installed_specs_with_deps=specs,
force=self.force,
tmpdir=self.tmpdir,
executor=self.executor,
)
return skipped

self._base_images = base_images
self._checksums = checksums

def push(
specs: List[Spec],
out_url: str,
signing_key: Optional[str],
# only update index if any binaries were uploaded
if self.update_index and len(skipped) + len(upload_errors) < len(specs):
_oci_update_index(self.target_image, self.tmpdir, self.executor)

return skipped, upload_errors

def tag(self, tag: str, roots: List[spack.spec.Spec]):
tagged_image = self.target_image.with_tag(tag)

# _push_oci may not populate self._base_images if binaries were already in the registry
for spec in roots:
_oci_update_base_images(
base_image=self.base_image,
target_image=self.target_image,
spec=spec,
base_image_cache=self._base_images,
)
_oci_put_manifest(
self._base_images, self._checksums, tagged_image, self.tmpdir, None, None, *roots
)


class URLUploader(Uploader):
def __init__(
self,
mirror: spack.mirror.Mirror,
force: bool,
update_index: bool,
signing_key: Optional[str],
) -> None:
super().__init__(mirror, force, update_index)
self.url = mirror.push_url
self.signing_key = signing_key

def push(
self, specs: List[spack.spec.Spec]
) -> Tuple[List[spack.spec.Spec], List[Tuple[spack.spec.Spec, BaseException]]]:
return _url_push(
specs,
out_url=self.url,
force=self.force,
update_index=self.update_index,
signing_key=self.signing_key,
tmpdir=self.tmpdir,
executor=self.executor,
)


def make_uploader(
mirror: spack.mirror.Mirror,
force: bool = False,
update_index: bool = False,
) -> Tuple[List[Spec], List[Tuple[Spec, BaseException]]]:
"""Pushes to the provided build cache, and returns a list of skipped specs that were already
present (when force=False). Does not raise on error."""
with default_push_context() as (tmpdir, executor):
return _push(specs, out_url, signing_key, force, update_index, tmpdir, executor)
signing_key: Optional[str] = None,
base_image: Optional[str] = None,
) -> Uploader:
"""Builder for the appropriate uploader based on the mirror type"""
if mirror.push_url.startswith("oci://"):
return OCIUploader(
mirror=mirror, force=force, update_index=update_index, base_image=base_image
)
else:
return URLUploader(
mirror=mirror, force=force, update_index=update_index, signing_key=signing_key
)


def _format_spec(spec: Spec) -> str:
return spec.cformat("{name}{@version}{/hash:7}")


class FancyProgress:
Expand Down Expand Up @@ -1234,7 +1330,7 @@ def fail(self) -> None:
tty.info(f"{self.pre}Failed to push {self.pretty_spec}")


def _push(
def _url_push(
specs: List[Spec],
out_url: str,
signing_key: Optional[str],
Expand Down Expand Up @@ -1279,7 +1375,7 @@ def _push(

upload_futures = [
executor.submit(
_upload_tarball_and_specfile,
_url_upload_tarball_and_specfile,
spec,
tmpdir,
out_url,
Expand Down Expand Up @@ -1309,12 +1405,12 @@ def _push(
if signing_key:
keys_tmpdir = os.path.join(tmpdir, "keys")
os.mkdir(keys_tmpdir)
push_keys(out_url, keys=[signing_key], update_index=update_index, tmpdir=keys_tmpdir)
_url_push_keys(out_url, keys=[signing_key], update_index=update_index, tmpdir=keys_tmpdir)

if update_index:
index_tmpdir = os.path.join(tmpdir, "index")
os.mkdir(index_tmpdir)
generate_package_index(out_url, index_tmpdir)
_url_generate_package_index(out_url, index_tmpdir)

return skipped, errors

Expand Down Expand Up @@ -1517,7 +1613,7 @@ def _oci_update_base_images(
)


def _push_oci(
def _oci_push(
*,
target_image: ImageReference,
base_image: Optional[ImageReference],
Expand Down Expand Up @@ -2643,7 +2739,7 @@ def get_keys(install=False, trust=False, force=False, mirrors=None):
)


def push_keys(
def _url_push_keys(
*mirrors: Union[spack.mirror.Mirror, str],
keys: List[str],
tmpdir: str,
Expand Down
4 changes: 3 additions & 1 deletion lib/spack/spack/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,8 +1382,10 @@ def push_to_build_cache(spec: spack.spec.Spec, mirror_url: str, sign_binaries: b
"""
tty.debug(f"Pushing to build cache ({'signed' if sign_binaries else 'unsigned'})")
signing_key = bindist.select_signing_key() if sign_binaries else None
mirror = spack.mirror.Mirror.from_url(mirror_url)
try:
bindist.push_or_raise([spec], out_url=mirror_url, signing_key=signing_key)
with bindist.make_uploader(mirror, signing_key=signing_key) as uploader:
uploader.push_or_raise([spec])
return True
except bindist.PushToBuildCacheError as e:
tty.error(f"Problem writing to {mirror_url}: {e}")
Expand Down
Loading

0 comments on commit 2ae5596

Please sign in to comment.