From a1ceb176f48b3c1c1d8e0094a4e6f4c12dbbb138 Mon Sep 17 00:00:00 2001 From: BruceAko Date: Wed, 21 Aug 2024 16:50:15 +0800 Subject: [PATCH] feat: support local tarball for nydusify copy Signed-off-by: BruceAko --- .../nydusify/pkg/converter/provider/ported.go | 122 ++++++++++++++++++ .../pkg/converter/provider/provider.go | 34 +++++ contrib/nydusify/pkg/copier/copier.go | 76 ++++++++--- 3 files changed, 213 insertions(+), 19 deletions(-) diff --git a/contrib/nydusify/pkg/converter/provider/ported.go b/contrib/nydusify/pkg/converter/provider/ported.go index 652cf5ff838..a934c63d71e 100644 --- a/contrib/nydusify/pkg/converter/provider/ported.go +++ b/contrib/nydusify/pkg/converter/provider/ported.go @@ -6,15 +6,20 @@ package provider import ( "context" + "encoding/json" "fmt" + "io" "strings" "github.com/containerd/containerd" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/images/archive" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/errdefs" + "github.com/opencontainers/go-digest" // nolint:staticcheck "github.com/containerd/containerd/remotes/docker/schema1" @@ -22,6 +27,18 @@ import ( "golang.org/x/sync/semaphore" ) +type importOpts struct { + indexName string + imageRefT func(string) string + dgstRefT func(digest.Digest) string + skipDgstRef func(string) bool + platformMatcher platforms.MatchComparer + compress bool + discardLayers bool + skipMissing bool + imageLabels map[string]string +} + // Ported from containerd project, copyright The containerd Authors. // github.com/containerd/containerd/blob/main/pull.go func fetch(ctx context.Context, store content.Store, rCtx *containerd.RemoteContext, ref string, limit int) (images.Image, error) { @@ -177,3 +194,108 @@ func push(ctx context.Context, store content.Store, pushCtx *containerd.RemoteCo return remotes.PushContent(ctx, pusher, desc, store, limiter, pushCtx.PlatformMatcher, wrapper) } + +// Ported from containerd project, copyright The containerd Authors. +// github.com/containerd/containerd/blob/main/import.go +func load(ctx context.Context, reader io.Reader, store content.Store, iopts importOpts) ([]images.Image, error) { + var aio []archive.ImportOpt + if iopts.compress { + aio = append(aio, archive.WithImportCompression()) + } + + index, err := archive.ImportIndex(ctx, store, reader, aio...) + if err != nil { + return nil, err + } + + var imgs []images.Image + + if iopts.indexName != "" { + imgs = append(imgs, images.Image{ + Name: iopts.indexName, + Target: index, + }) + } + var platformMatcher = iopts.platformMatcher + + var handler images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + // Only save images at top level + if desc.Digest != index.Digest { + // Don't set labels on missing content. + children, err := images.Children(ctx, store, desc) + if iopts.skipMissing && errdefs.IsNotFound(err) { + return nil, images.ErrSkipDesc + } + return children, err + } + + var idx ocispec.Index + p, err := content.ReadBlob(ctx, store, desc) + if err != nil { + return nil, err + } + if err := json.Unmarshal(p, &idx); err != nil { + return nil, err + } + + for _, m := range idx.Manifests { + name := imageName(m.Annotations, iopts.imageRefT) + if name != "" { + imgs = append(imgs, images.Image{ + Name: name, + Target: m, + }) + } + if iopts.skipDgstRef != nil { + if iopts.skipDgstRef(name) { + continue + } + } + if iopts.dgstRefT != nil { + ref := iopts.dgstRefT(m.Digest) + if ref != "" { + imgs = append(imgs, images.Image{ + Name: ref, + Target: m, + }) + } + } + } + + return idx.Manifests, nil + } + + handler = images.FilterPlatforms(handler, platformMatcher) + if iopts.discardLayers { + handler = images.SetChildrenMappedLabels(store, handler, images.ChildGCLabelsFilterLayers) + } else { + handler = images.SetChildrenLabels(store, handler) + } + if err := images.WalkNotEmpty(ctx, handler, index); err != nil { + return nil, err + } + + for i := range imgs { + fieldsPath := []string{"target"} + if iopts.imageLabels != nil { + fieldsPath = append(fieldsPath, "labels") + imgs[i].Labels = iopts.imageLabels + } + } + + return imgs, nil +} + +func imageName(annotations map[string]string, ociCleanup func(string) string) string { + name := annotations[images.AnnotationImageName] + if name != "" { + return name + } + name = annotations[ocispec.AnnotationRefName] + if name != "" { + if ociCleanup != nil { + name = ociCleanup(name) + } + } + return name +} diff --git a/contrib/nydusify/pkg/converter/provider/provider.go b/contrib/nydusify/pkg/converter/provider/provider.go index 08a7988dba1..2b199e30e20 100644 --- a/contrib/nydusify/pkg/converter/provider/provider.go +++ b/contrib/nydusify/pkg/converter/provider/provider.go @@ -7,6 +7,7 @@ package provider import ( "context" "crypto/tls" + "io" "net" "net/http" "os" @@ -17,13 +18,16 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images/archive" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/goharbor/acceleration-service/pkg/cache" accelcontent "github.com/goharbor/acceleration-service/pkg/content" "github.com/goharbor/acceleration-service/pkg/remote" + "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" ) var LayerConcurrentLimit = 5 @@ -152,6 +156,36 @@ func (pvd *Provider) Push(ctx context.Context, desc ocispec.Descriptor, ref stri return push(ctx, pvd.store, rc, desc, ref) } +func (pvd *Provider) Import(ctx context.Context, reader io.Reader) (string, error) { + iopts := importOpts{ + dgstRefT: func(dgst digest.Digest) string { + return "nydus" + "@" + dgst.String() + }, + skipDgstRef: func(name string) bool { return name != "" }, + platformMatcher: pvd.platformMC, + } + images, err := load(ctx, reader, pvd.store, iopts) + if err != nil { + return "", err + } + + if len(images) != 1 { + return "", errors.New("importing multiple images") + } + image := images[0] + + pvd.mutex.Lock() + defer pvd.mutex.Unlock() + pvd.images[image.Name] = &image.Target + + return image.Name, nil +} + +func (pvd *Provider) Export(ctx context.Context, writer io.Writer, img *ocispec.Descriptor, name string) error { + opts := []archive.ExportOpt{archive.WithManifest(*img, name), archive.WithPlatform(pvd.platformMC)} + return archive.Export(ctx, pvd.store, writer, opts...) +} + func (pvd *Provider) Image(_ context.Context, ref string) (*ocispec.Descriptor, error) { pvd.mutex.Lock() defer pvd.mutex.Unlock() diff --git a/contrib/nydusify/pkg/copier/copier.go b/contrib/nydusify/pkg/copier/copier.go index fa3d059035c..c04b9bc503b 100644 --- a/contrib/nydusify/pkg/copier/copier.go +++ b/contrib/nydusify/pkg/copier/copier.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strings" + "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" containerdErrdefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" @@ -285,41 +286,78 @@ func Copy(ctx context.Context, opt Opt) error { } defer os.RemoveAll(tmpDir) - sourceNamed, err := docker.ParseDockerRef(opt.Source) - if err != nil { - return errors.Wrap(err, "parse source reference") - } - targetNamed, err := docker.ParseDockerRef(opt.Target) - if err != nil { - return errors.Wrap(err, "parse target reference") - } - source := sourceNamed.String() - target := targetNamed.String() + var source string + if strings.HasPrefix(opt.Source, "file:///") { + inputPath := strings.TrimPrefix(opt.Source, "file:///") + + logrus.Infof("importing source image from %s", inputPath) + f, err := os.Open(inputPath) + if err != nil { + return err + } + defer f.Close() + decompressor, err := compression.DecompressStream(f) + if err != nil { + return err + } + if source, err = pvd.Import(ctx, decompressor); err != nil { + return errors.Wrap(err, "import source image") + } + logrus.Infof("imported source image %s", source) + } else { + sourceNamed, err := docker.ParseDockerRef(opt.Source) + if err != nil { + return errors.Wrap(err, "parse source reference") + } + source = sourceNamed.String() - logrus.Infof("pulling source image %s", source) - if err := pvd.Pull(ctx, source); err != nil { - if errdefs.NeedsRetryWithHTTP(err) { - pvd.UsePlainHTTP() - if err := pvd.Pull(ctx, source); err != nil { - return errors.Wrap(err, "try to pull image") + logrus.Infof("pulling source image %s", source) + if err := pvd.Pull(ctx, source); err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + if err := pvd.Pull(ctx, source); err != nil { + return errors.Wrap(err, "try to pull image") + } + } else { + return errors.Wrap(err, "pull source image") } - } else { - return errors.Wrap(err, "pull source image") } + logrus.Infof("pulled source image %s", source) } - logrus.Infof("pulled source image %s", source) sourceImage, err := pvd.Image(ctx, source) if err != nil { return errors.Wrap(err, "find image from store") } + if strings.HasPrefix(opt.Target, "file:///") { + outputPath := strings.TrimPrefix(opt.Target, "file:///") + + logrus.Infof("exporting source image to %s", outputPath) + f, err := os.OpenFile(outputPath, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + if err := pvd.Export(ctx, f, sourceImage, source); err != nil { + return errors.Wrap(err, "export source image to target tar file") + } + logrus.Infof("exported image %s", source) + return nil + } + sourceDescs, err := utils.GetManifests(ctx, pvd.ContentStore(), *sourceImage, platformMC) if err != nil { return errors.Wrap(err, "get image manifests") } targetDescs := make([]ocispec.Descriptor, len(sourceDescs)) + targetNamed, err := docker.ParseDockerRef(opt.Target) + if err != nil { + return errors.Wrap(err, "parse target reference") + } + target := targetNamed.String() + sem := semaphore.NewWeighted(1) eg := errgroup.Group{} for idx := range sourceDescs {