Skip to content

Commit

Permalink
feat: emit module artifacts as blobs (#3184)
Browse files Browse the repository at this point in the history
fixes #3076

module binaries now stored as blobs (instead of images); enumeration
operations dropped due to inefficiency - deployment workflows do not
need it anyways

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jonathanj-square and github-actions[bot] authored Oct 28, 2024
1 parent 4435fcc commit c74b989
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 133 deletions.
159 changes: 42 additions & 117 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"strings"

"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
Expand All @@ -24,7 +23,7 @@ import (
)

const (
ModuleArtifactPrefix = "ftl/modules/"
ModuleBlobsPrefix = "ftl/modules/"
)

type ContainerConfig struct {
Expand All @@ -35,8 +34,8 @@ type ContainerConfig struct {
}

type ContainerService struct {
host string
repoConnectionBuilder func(container string) (*remote.Repository, error)
host string
repoFactory func() (*remote.Repository, error)

// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table
Handle *libdal.Handle[ContainerService]
Expand All @@ -51,10 +50,16 @@ type ArtefactRepository struct {
Size int64
}

type ArtefactBlobs struct {
Digest sha256.SHA256
MediaType string
Size int64
}

func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService {
// Connect the registry targeting the specified container
repoConnectionBuilder := func(path string) (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, path)
repoFactory := func() (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, ModuleBlobsPrefix)
reg, err := remote.NewRepository(ref)
if err != nil {
return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err)
Expand All @@ -74,34 +79,40 @@ func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerSe
}

return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
host: c.Registry,
repoFactory: repoFactory,
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService {
return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
Handle: h,
db: sql.New(h.Connection),
host: c.Registry,
repoFactory: repoFactory,
Handle: h,
db: sql.New(h.Connection),
}
}),
}
}

func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
repo, err := s.repoFactory()
if err != nil {
return nil, nil, fmt.Errorf("unable to connect to container registry '%s': %w", s.host, err)
}
set := make(map[sha256.SHA256]bool)
for _, d := range digests {
set[d] = true
}
modules, err := s.DiscoverModuleArtefacts(ctx)
if err != nil {
return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err)
}
keys = make([]ArtefactKey, 0)
for _, m := range modules {
if set[m.ModuleDigest] {
keys = append(keys, ArtefactKey{Digest: m.ModuleDigest})
delete(set, m.ModuleDigest)
blobs := repo.Blobs()
for _, d := range digests {
_, err := blobs.Resolve(ctx, fmt.Sprintf("sha256:%s", d))
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
continue
}
return nil, nil, fmt.Errorf("unable to resolve digest '%s': %w", d, err)
}
keys = append(keys, ArtefactKey{Digest: d})
delete(set, d)
}
missing = make([]sha256.SHA256, 0)
for d := range set {
Expand All @@ -110,43 +121,22 @@ func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.
return keys, missing, nil
}

// Upload uploads the specific artifact as a raw blob and links it to a manifest to prevent GC
func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest)
ms := memory.New()
mediaDescriptor := v1.Descriptor{
MediaType: "application/ftl.module.v1",
Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]),
Size: int64(len(artefact.Content)),
}
err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content))
repo, err := s.repoFactory()
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err)
}
artifactType := "application/ftl.module.artifact"
opts := oras.PackManifestOptions{
Layers: []v1.Descriptor{mediaDescriptor},
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s': %w", s.host, err)
}
tag := "latest"
manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err)
}
if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err)
}
repo, err := s.repoConnectionBuilder(ref)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err)
}
if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err)
desc := content.NewDescriptorFromBytes("application/x-octet-stream", artefact.Content)
if err = repo.Push(ctx, desc, bytes.NewReader(artefact.Content)); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to upload module blob to repository: %w", err)
}
return artefact.Digest, nil
}

func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
ref := createModuleRepositoryPathFromDigest(digest)
registry, err := s.repoConnectionBuilder(ref)
registry, err := s.repoFactory()
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err)
}
Expand All @@ -157,58 +147,6 @@ func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (
return stream, nil
}

func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) {
return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix)
}

func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) {
registry, err := remote.NewRegistry(s.host)
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err)
}
registry.PlainHTTP = true
result := make([]ArtefactRepository, 0)
err = registry.Repositories(ctx, "", func(repos []string) error {
for _, path := range repos {
if !strings.HasPrefix(path, prefix) {
continue
}
d, err := getDigestFromModuleRepositoryPath(path)
if err != nil {
return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err)
}
repo, err := registry.Repository(ctx, path)
if err != nil {
return fmt.Errorf("unable to connect to repository '%s': %w", path, err)
}
desc, err := repo.Resolve(ctx, "latest")
if err != nil {
return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err)
}
_, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions)
if err != nil {
return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err)
}
var manifest v1.Manifest
if err := json.Unmarshal(data, &manifest); err != nil {
return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err)
}
result = append(result, ArtefactRepository{
ModuleDigest: d,
MediaType: manifest.Layers[0].MediaType,
ArtefactType: manifest.ArtifactType,
RepositoryDigest: desc.Digest,
Size: desc.Size,
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("unable to discover artefacts: %w", err)
}
return result, nil
}

func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}
Expand All @@ -219,23 +157,10 @@ func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.Dep

// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root
func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:]))
return fmt.Sprintf("%s/%s:latest", ModuleBlobsPrefix, hex.EncodeToString(digest[:]))
}

// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository
func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest))
}

// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/<digest>:latest
func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) {
slash := strings.LastIndex(repository, "/")
if slash == -1 {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository)
}
d, err := sha256.ParseSHA256(repository[slash+1:])
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err)
}
return d, nil
}
32 changes: 17 additions & 15 deletions frontend/cli/cmd_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/TBD54566975/ftl/backend/controller/artefacts"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
sh "github.com/TBD54566975/ftl/internal/sha256"
"github.com/TBD54566975/ftl/internal/slices"
)

type releaseCmd struct {
Expand All @@ -18,7 +20,7 @@ type releaseCmd struct {
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`

Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."`
List releaseListCmd `cmd:"" help:"Lists all published releases."`
Exists releaseExistsCmd `cmd:"" help:"Indicates whether modules, with the specified digests, have been published."`
}

type releasePublishCmd struct {
Expand All @@ -45,30 +47,30 @@ func (d *releasePublishCmd) Run(release *releaseCmd) error {
return nil
}

type releaseListCmd struct {
type releaseExistsCmd struct {
Digests []string `help:"Digest sha256:hex" default:""`
}

func (d *releaseListCmd) Run(release *releaseCmd) error {
func (d *releaseExistsCmd) Run(release *releaseCmd) error {
svc, err := createContainerService(release)
if err != nil {
return fmt.Errorf("failed to create container service: %w", err)
}
modules, err := svc.DiscoverModuleArtefacts(context.Background())
digests := slices.Map(slices.Unique(d.Digests), sh.MustParseSHA256)
keys, missing, err := svc.GetDigestsKeys(context.Background(), digests)
if err != nil {
return fmt.Errorf("failed to discover module artefacts: %w", err)
return fmt.Errorf("failed to get keys: %w", err)
}
if len(modules) == 0 {
fmt.Println("No module artefacts found.")
return nil
fmt.Printf("\033[31m%d\033[0m FTL module blobs located\n", len(keys))
for i, key := range keys {
fmt.Printf(" \u001B[34m%02d\u001B[0m - sha256:\u001B[32m%s\u001B[0m\n", i+1, key.Digest)
}

format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n"
fmt.Printf("Found %d module artefacts:\n", len(modules))
for i, m := range modules {
fmt.Printf("\033[31m Artefact %d\033[0m\n", i)
fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType)
if len(missing) > 0 {
fmt.Printf("\033[31m%d\033[0m FTL module blobs keys \033[31mnot found\033[0m\n", len(missing))
for i, key := range missing {
fmt.Printf(" \u001B[34m%02d\u001B[0m - sha256:\u001B[31m%s\u001B[0m\n", i+1, key)
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ require (
github.com/mattn/go-isatty v0.0.20
github.com/multiformats/go-base36 v0.2.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
github.com/otiai10/copy v1.14.0
github.com/posener/complete v1.2.3
github.com/radovskyb/watcher v1.0.7
Expand Down Expand Up @@ -134,6 +133,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/onsi/gomega v1.33.1 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
Expand Down

0 comments on commit c74b989

Please sign in to comment.