Skip to content

Commit

Permalink
refactor(sync): use task scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
  • Loading branch information
eusebiu-constantin-petu-dbk committed May 31, 2023
1 parent e148343 commit e0a5c18
Show file tree
Hide file tree
Showing 40 changed files with 4,257 additions and 3,518 deletions.
18 changes: 17 additions & 1 deletion .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ permissions: read-all
# 2. run zot with s3 storage and dynamodb and dedupe enabled, push images, restart zot with dedupe false and no cache
# task scheduler will start a restore all blobs process at zot startup, after it finishes all blobs should be restored to their original state (have content)
jobs:
client-tools:
dedupe:
name: Dedupe/restore blobs
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -55,3 +55,19 @@ jobs:
env:
AWS_ACCESS_KEY_ID: fake
AWS_SECRET_ACCESS_KEY: fake
sync:
name: Sync harness
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.20.x
- name: Install dependencies
run: |
cd $GITHUB_WORKSPACE
go install github.com/swaggo/swag/cmd/swag
go mod download
- name: Run sync harness
run: |
make test-sync-harness
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@ test-push-pull-running-dedupe: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS)
test-push-pull-running-dedupe-verbose: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) $(CRICTL)
$(BATS) --trace --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/pushpull_running_dedupe.bats

.PHONY: test-sync-harness
test-sync-harness: binary binary-minimal bench check-skopeo $(BATS)
$(BATS) --trace --print-output-on-failure test/blackbox/sync_harness.bats

.PHONY: test-sync-harness-verbose
test-sync-harness-verbose: binary binary-minimal bench check-skopeo $(BATS)
$(BATS) --trace --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/sync_harness.bats

.PHONY: test-restore-s3-blobs
test-restore-s3-blobs: binary check-skopeo $(BATS) $(REGCLIENT) $(ORAS) $(HELM) $(CRICTL)
$(BATS) --trace --print-output-on-failure test/blackbox/restore_s3_blobs.bats
Expand Down Expand Up @@ -362,13 +370,13 @@ test-cloud-only-verbose: binary check-skopeo $(BATS)

.PHONY: test-bats-sync
test-bats-sync: BUILD_LABELS=sync
test-bats-sync: binary binary-minimal check-skopeo $(BATS) $(NOTATION) $(COSIGN)
test-bats-sync: binary binary-minimal bench check-skopeo $(BATS) $(NOTATION) $(COSIGN)
$(BATS) --trace --print-output-on-failure test/blackbox/sync.bats
$(BATS) --trace --print-output-on-failure test/blackbox/sync_docker.bats

.PHONY: test-bats-sync-verbose
test-bats-sync-verbose: BUILD_LABELS=sync
test-bats-sync-verbose: binary binary-minimal check-skopeo $(BATS) $(NOTATION) $(COSIGN)
test-bats-sync-verbose: binary binary-minimal bench check-skopeo $(BATS) $(NOTATION) $(COSIGN)
$(BATS) --trace -t -x -p --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/sync.bats
$(BATS) --trace -t -x -p --verbose-run --print-output-on-failure --show-output-of-passing-tests test/blackbox/sync_docker.bats

Expand Down
4 changes: 3 additions & 1 deletion errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ var (
ErrSyncInvalidUpstreamURL = errors.New("sync: upstream url not found in sync config")
ErrRegistryNoContent = errors.New("sync: could not find a Content that matches localRepo")
ErrSyncReferrerNotFound = errors.New("sync: couldn't find upstream referrer")
ErrSyncReferrer = errors.New("sync: failed to get upstream referrer")
ErrImageLintAnnotations = errors.New("routes: lint checks failed")
ErrParsingAuthHeader = errors.New("auth: failed parsing authorization header")
ErrBadType = errors.New("core: invalid type")
Expand Down Expand Up @@ -92,4 +91,7 @@ var (
ErrSignConfigDirNotSet = errors.New("signatures: signature config dir not set")
ErrBadManifestDigest = errors.New("signatures: bad manifest digest")
ErrInvalidSignatureType = errors.New("signatures: invalid signature type")
ErrSyncPingRegistry = errors.New("sync: unable to ping any registry URLs")
ErrSyncImageNotSigned = errors.New("sync: image is not signed")
ErrSyncImageFilteredOut = errors.New("sync: image is filtered out by sync config")
)
46 changes: 33 additions & 13 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Controller struct {
Server *http.Server
Metrics monitoring.MetricServer
CveInfo ext.CveInfo
SyncOnDemand SyncOnDemand
// runtime params
chosenPort int // kernel-chosen port
}
Expand Down Expand Up @@ -279,16 +280,30 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, config *config.Con
// reload access control config
c.Config.HTTP.AccessControl = config.HTTP.AccessControl

// Enable extensions if extension config is provided
if config.Extensions != nil && config.Extensions.Sync != nil {
// reload sync config
// reload periodical gc interval
c.Config.Storage.GCInterval = config.Storage.GCInterval

// reload background tasks
if config.Extensions != nil {
// reload sync extension
c.Config.Extensions.Sync = config.Extensions.Sync
ext.EnableSyncExtension(reloadCtx, c.Config, c.RepoDB, c.StoreController, c.Log)
} else if c.Config.Extensions != nil {
c.Config.Extensions.Sync = nil
// reload search cve extension
if c.Config.Extensions.Search != nil {
// reload only if search is enabled and reloaded config has search extension
if *c.Config.Extensions.Search.Enable && config.Extensions.Search != nil {
c.Config.Extensions.Search.CVE = config.Extensions.Search.CVE
}
}
// reload scrub extension
c.Config.Extensions.Scrub = config.Extensions.Scrub
} else {
c.Config.Extensions = nil
}

c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).Msg("new configuration settings")
c.StartBackgroundTasks(reloadCtx)

c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).
Msg("loaded new configuration settings")
}

func (c *Controller) Shutdown() {
Expand Down Expand Up @@ -334,14 +349,19 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}
}

// Enable extensions if extension config is provided for storeController
if c.Config.Extensions != nil {
if c.Config.Extensions.Sync != nil {
ext.EnableSyncExtension(reloadCtx, c.Config, c.RepoDB, c.StoreController, c.Log)
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)

syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.RepoDB, c.StoreController, taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
c.SyncOnDemand = syncOnDemand
}
}

type SyncOnDemand interface {
SyncImage(repo, reference string) error
SyncReference(repo string, subjectDigestStr string, referenceType string) error
}
100 changes: 55 additions & 45 deletions pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -27,14 +26,15 @@ import (
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/sigstore/cosign/v2/pkg/oci/remote"

zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api/constants"
zcommon "zotregistry.io/zot/pkg/common"
gqlPlayground "zotregistry.io/zot/pkg/debug/gqlplayground"
debug "zotregistry.io/zot/pkg/debug/swagger"
ext "zotregistry.io/zot/pkg/extensions"
"zotregistry.io/zot/pkg/extensions/sync"
syncConstants "zotregistry.io/zot/pkg/extensions/sync/constants"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta"
zreg "zotregistry.io/zot/pkg/regexp"
Expand Down Expand Up @@ -376,8 +376,7 @@ func (rh *RouteHandler) CheckManifest(response http.ResponseWriter, request *htt
return
}

content, digest, mediaType, err := getImageManifest(request.Context(), rh, imgStore,
name, reference) //nolint:contextcheck
content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint:contextcheck
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusNotFound,
Expand Down Expand Up @@ -449,8 +448,7 @@ func (rh *RouteHandler) GetManifest(response http.ResponseWriter, request *http.
return
}

content, digest, mediaType, err := getImageManifest(request.Context(), rh,
imgStore, name, reference) //nolint: contextcheck
content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint: contextcheck
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusNotFound,
Expand Down Expand Up @@ -488,31 +486,26 @@ type ImageIndex struct {
ispec.Index
}

func getReferrers(ctx context.Context, routeHandler *RouteHandler,
func getReferrers(routeHandler *RouteHandler,
imgStore storageTypes.ImageStore, name string, digest godigest.Digest,
artifactTypes []string,
) (ispec.Index, error) {
references, err := imgStore.GetReferrers(name, digest, artifactTypes)
if err != nil || len(references.Manifests) == 0 {
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
refs, err := imgStore.GetReferrers(name, digest, artifactTypes)
if err != nil || len(refs.Manifests) == 0 {
if isSyncOnDemandEnabled(*routeHandler.c) {
routeHandler.c.Log.Info().Str("repository", name).Str("reference", digest.String()).
Msg("referrers not found, trying to get reference by syncing on demand")

errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, digest.String(), sync.OCIReference, routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("unable to get references")

return ispec.Index{}, err
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), syncConstants.OCI); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", digest.String()).
Msg("error encounter while syncing OCI reference for image")
}

references, err = imgStore.GetReferrers(name, digest, artifactTypes)
refs, err = imgStore.GetReferrers(name, digest, artifactTypes)
}
}

return references, err
return refs, err
}

// GetReferrers godoc
Expand Down Expand Up @@ -559,7 +552,7 @@ func (rh *RouteHandler) GetReferrers(response http.ResponseWriter, request *http

imgStore := rh.getImageStore(name)

referrers, err := getReferrers(request.Context(), rh, imgStore, name, digest, artifactTypes)
referrers, err := getReferrers(rh, imgStore, name, digest, artifactTypes)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found")
Expand Down Expand Up @@ -1722,15 +1715,10 @@ func (rh *RouteHandler) getImageStore(name string) storageTypes.ImageStore {
}

// will sync on demand if an image is not found, in case sync extensions is enabled.
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storageTypes.ImageStore,
name, reference string,
func getImageManifest(routeHandler *RouteHandler, imgStore storageTypes.ImageStore, name,
reference string,
) ([]byte, godigest.Digest, string, error) {
syncEnabled := false
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
syncEnabled = true
}
syncEnabled := isSyncOnDemandEnabled(*routeHandler.c)

_, digestErr := godigest.Parse(reference)
if digestErr == nil {
Expand All @@ -1745,36 +1733,47 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore
routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference).
Msg("trying to get updated image by syncing on demand")

errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, reference, "", routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("error encounter while syncing image")
// we use a custom method for syncing cosign signatures for the moment, even though it's also an oci image.
if isCosignTag(reference) {
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, reference, syncConstants.Cosign); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("error encounter while syncing cosign signature for image")
}
} else {
if errSync := routeHandler.c.SyncOnDemand.SyncImage(name, reference); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("error encounter while syncing image")
}
}
}

return imgStore.GetImageManifest(name, reference)
}

// this function will check if tag is a cosign tag (signature or sbom).
func isCosignTag(tag string) bool {
if strings.HasPrefix(tag, "sha256-") &&
(strings.HasSuffix(tag, remote.SignatureTagSuffix) || strings.HasSuffix(tag, remote.SBOMTagSuffix)) {
return true
}

return false
}

// will sync referrers on demand if they are not found, in case sync extensions is enabled.
func getOrasReferrers(ctx context.Context, routeHandler *RouteHandler,
func getOrasReferrers(routeHandler *RouteHandler,
imgStore storageTypes.ImageStore, name string, digest godigest.Digest,
artifactType string,
) ([]artifactspec.Descriptor, error) {
refs, err := imgStore.GetOrasReferrers(name, digest, artifactType)
if err != nil {
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
if isSyncOnDemandEnabled(*routeHandler.c) {
routeHandler.c.Log.Info().Str("repository", name).Str("reference", digest.String()).
Msg("artifact not found, trying to get artifact by syncing on demand")

errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, digest.String(), sync.OrasArtifact, routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("unable to get references")

return []artifactspec.Descriptor{}, err
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), syncConstants.Oras); errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).
Msg("unable to get references")
}

refs, err = imgStore.GetOrasReferrers(name, digest, artifactType)
Expand Down Expand Up @@ -1838,7 +1837,7 @@ func (rh *RouteHandler) GetOrasReferrers(response http.ResponseWriter, request *

rh.c.Log.Info().Str("digest", digest.String()).Str("artifactType", artifactType).Msg("getting manifest")

refs, err := getOrasReferrers(request.Context(), rh, imgStore, name, digest, artifactType) //nolint:contextcheck
refs, err := getOrasReferrers(rh, imgStore, name, digest, artifactType) //nolint:contextcheck
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found")
Expand Down Expand Up @@ -1883,3 +1882,14 @@ func getBlobUploadLocation(url *url.URL, name string, digest godigest.Digest) st

return url.String()
}

func isSyncOnDemandEnabled(ctlr Controller) bool {
if ctlr.Config.Extensions != nil &&
ctlr.Config.Extensions.Sync != nil &&
*ctlr.Config.Extensions.Sync.Enable &&
fmt.Sprintf("%v", ctlr.SyncOnDemand) != fmt.Sprintf("%v", nil) {
return true
}

return false
}
Loading

0 comments on commit e0a5c18

Please sign in to comment.