Skip to content

Commit

Permalink
refactor(sync): use task scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Mar 28, 2023
1 parent 9171591 commit 18f3253
Show file tree
Hide file tree
Showing 29 changed files with 4,146 additions and 3,761 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/gofrs/uuid v4.4.0+incompatible
github.com/google/go-containerregistry v0.13.0
github.com/google/uuid v1.3.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -54,6 +53,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.2
github.com/containers/image/v5 v5.23.0
github.com/gobwas/glob v0.2.3
github.com/gorilla/handlers v1.5.1
github.com/notaryproject/notation-go v1.0.0-rc.3
github.com/opencontainers/distribution-spec/specs-go v0.0.0-20230117141039-067a0f5b0e25
github.com/sigstore/cosign v1.13.1
Expand Down Expand Up @@ -91,6 +91,7 @@ require (
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/go-gorp/gorp/v3 v3.0.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down Expand Up @@ -274,7 +275,6 @@ require (
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fullstorydev/grpcurl v1.8.7 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
Expand Down
58 changes: 42 additions & 16 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"runtime"
"strconv"
"strings"
goSync "sync"
"syscall"
"time"

Expand Down Expand Up @@ -51,7 +50,7 @@ type Controller struct {
Server *http.Server
Metrics monitoring.MetricServer
CveInfo ext.CveInfo
wgShutDown *goSync.WaitGroup // use it to gracefully shutdown goroutines
SyncOnDemand SyncOnDemand
// runtime params
chosenPort int // kernel-chosen port
}
Expand All @@ -62,7 +61,6 @@ func NewController(config *config.Config) *Controller {
logger := log.NewLogger(config.Log.Level, config.Log.Output)
controller.Config = config
controller.Log = logger
controller.wgShutDown = new(goSync.WaitGroup)

if config.Log.Audit != "" {
audit := log.NewAuditLogger(config.Log.Level, config.Log.Audit)
Expand Down Expand Up @@ -121,7 +119,9 @@ func (c *Controller) GetPort() int {
}

func (c *Controller) Run(reloadCtx context.Context) error {
c.StartBackgroundTasks(reloadCtx)
if err := c.StartBackgroundTasks(reloadCtx); err != nil {
return err
}

// setup HTTP API router
engine := mux.NewRouter()
Expand Down Expand Up @@ -605,27 +605,41 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, config *config.Con
// reload access control config
c.Config.HTTP.AccessControl = config.HTTP.AccessControl

// Enable extensions if extension config is provided
if config.Extensions != nil && config.Extensions.Sync != nil {
// reload sync config
// reload periodical gc interval
c.Config.Storage.GCInterval = config.Storage.GCInterval

// reload background tasks
if config.Extensions != nil {
// reload sync extension
c.Config.Extensions.Sync = config.Extensions.Sync
ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.RepoDB, c.StoreController, c.Log)
} else if c.Config.Extensions != nil {
c.Config.Extensions.Sync = nil
// reload search cve extension
if c.Config.Extensions.Search != nil {
// reload only if search is enabled and reloaded config has search extension
if *c.Config.Extensions.Search.Enable && config.Extensions.Search != nil {
c.Config.Extensions.Search.CVE = config.Extensions.Search.CVE
}
}
// reload scrub extension
c.Config.Extensions.Scrub = config.Extensions.Scrub
} else {
c.Config.Extensions = nil
}

c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).Msg("new configuration settings")
if err := c.StartBackgroundTasks(reloadCtx); err != nil {
c.Log.Error().Err(err).Interface("reloaded params", c.Config.Sanitize()).
Msg("unable to load new configuration settings")
} else {
c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).
Msg("loaded new configuration settings")
}
}

func (c *Controller) Shutdown() {
// wait gracefully
c.wgShutDown.Wait()

ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) error {
taskScheduler := scheduler.NewScheduler(c.Log)
taskScheduler.RunScheduler(reloadCtx)

Expand Down Expand Up @@ -657,11 +671,23 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Enable extensions if extension config is provided for storeController
if c.Config.Extensions != nil {
if c.Config.Extensions.Sync != nil {
ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.RepoDB, c.StoreController, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.RepoDB, c.StoreController, taskScheduler, c.Log)
if err != nil {
return err
}

c.SyncOnDemand = syncOnDemand
}
}

if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
}

return nil
}

type SyncOnDemand interface {
SyncImage(repo, reference string) error
SyncReference(repo string, subjectDigestStr string, referenceType string) error
}
92 changes: 57 additions & 35 deletions pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package api

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -27,13 +26,14 @@ import (
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/sigstore/cosign/pkg/oci/remote"

zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api/constants"
gqlPlayground "zotregistry.io/zot/pkg/debug/gqlplayground"
debug "zotregistry.io/zot/pkg/debug/swagger"
ext "zotregistry.io/zot/pkg/extensions"
"zotregistry.io/zot/pkg/extensions/sync"
"zotregistry.io/zot/pkg/extensions/sync/references"
"zotregistry.io/zot/pkg/log"
repoDBUpdate "zotregistry.io/zot/pkg/meta/repodb/update"
zreg "zotregistry.io/zot/pkg/regexp"
Expand Down Expand Up @@ -319,8 +319,7 @@ func (rh *RouteHandler) CheckManifest(response http.ResponseWriter, request *htt
return
}

content, digest, mediaType, err := getImageManifest(request.Context(), rh, imgStore,
name, reference) //nolint:contextcheck
content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint:contextcheck
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusNotFound,
Expand Down Expand Up @@ -385,8 +384,7 @@ func (rh *RouteHandler) GetManifest(response http.ResponseWriter, request *http.
return
}

content, digest, mediaType, err := getImageManifest(request.Context(), rh,
imgStore, name, reference) //nolint: contextcheck
content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint: contextcheck
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusNotFound,
Expand Down Expand Up @@ -427,31 +425,29 @@ type ImageIndex struct {
ispec.Index
}

func getReferrers(ctx context.Context, routeHandler *RouteHandler,
func getReferrers(routeHandler *RouteHandler,
imgStore storage.ImageStore, name string, digest godigest.Digest,
artifactTypes []string,
) (ispec.Index, error) {
references, err := imgStore.GetReferrers(name, digest, artifactTypes)
if err != nil || len(references.Manifests) == 0 {
refs, err := imgStore.GetReferrers(name, digest, artifactTypes)
if err != nil || len(refs.Manifests) == 0 {
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
*routeHandler.c.Config.Extensions.Sync.Enable &&
routeHandler.c.SyncOnDemand != nil {
routeHandler.c.Log.Info().Msgf("referrers not found, trying to get referrers to %s:%s by syncing on demand",
name, digest)

errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, digest.String(), sync.OCIReference, routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("unable to get references")

return ispec.Index{}, err
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), references.OCI); errSync != nil {
routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing OCI reference for image %s:%s",
name, digest.String())
}

references, err = imgStore.GetReferrers(name, digest, artifactTypes)
refs, err = imgStore.GetReferrers(name, digest, artifactTypes)
}
}

return references, err
return refs, err
}

// GetReferrers godoc
Expand Down Expand Up @@ -491,7 +487,7 @@ func (rh *RouteHandler) GetReferrers(response http.ResponseWriter, request *http

imgStore := rh.getImageStore(name)

referrers, err := getReferrers(request.Context(), rh, imgStore, name, digest, artifactTypes)
referrers, err := getReferrers(rh, imgStore, name, digest, artifactTypes)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found")
Expand Down Expand Up @@ -1632,45 +1628,71 @@ func (rh *RouteHandler) getImageStore(name string) storage.ImageStore {
}

// will sync on demand if an image is not found, in case sync extensions is enabled.
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storage.ImageStore, name,
func getImageManifest(routeHandler *RouteHandler, imgStore storage.ImageStore, name,
reference string,
) ([]byte, godigest.Digest, string, error) {
syncEnabled := false
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
*routeHandler.c.Config.Extensions.Sync.Enable &&
routeHandler.c.SyncOnDemand != nil {
syncEnabled = true
}

_, digestErr := godigest.Parse(reference)
if digestErr == nil {
// if it's a digest then return local cached image, if not found and sync enabled, then try to sync
content, digest, mediaType, err := imgStore.GetImageManifest(name, reference)
if err == nil || !syncEnabled {
return content, digest, mediaType, err
}
}

if syncEnabled {
routeHandler.c.Log.Info().Msgf("trying to get updated image %s:%s by syncing on demand",
name, reference)

errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, reference, "", routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s",
name, reference)
if isCosignTag(reference) {
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, reference, references.Cosign); errSync != nil {
routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s",
name, reference)
}
} else {
if errSync := routeHandler.c.SyncOnDemand.SyncImage(name, reference); errSync != nil {
routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s",
name, reference)
}
}
}

return imgStore.GetImageManifest(name, reference)
}

func isCosignTag(tag string) bool {
if strings.HasPrefix(tag, "sha256-") && strings.HasSuffix(tag, remote.SignatureTagSuffix) {
return true
}

return false
}

// will sync referrers on demand if they are not found, in case sync extensions is enabled.
func getOrasReferrers(ctx context.Context, routeHandler *RouteHandler,
func getOrasReferrers(routeHandler *RouteHandler,
imgStore storage.ImageStore, name string, digest godigest.Digest,
artifactType string,
) ([]artifactspec.Descriptor, error) {
refs, err := imgStore.GetOrasReferrers(name, digest, artifactType)
if err != nil {
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
*routeHandler.c.Config.Extensions.Sync.Enable &&
routeHandler.c.SyncOnDemand != nil {
routeHandler.c.Log.Info().Msgf("artifact not found, trying to get artifact %s:%s by syncing on demand",
name, digest.String())

errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, digest.String(), sync.OrasArtifact, routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("unable to get references")

return []artifactspec.Descriptor{}, err
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), references.Oras); errSync != nil {
routeHandler.c.Log.Err(errSync).Msgf("error encounter while syncing ORAS reference for image %s:%s",
name, digest.String())
}

refs, err = imgStore.GetOrasReferrers(name, digest, artifactType)
Expand Down Expand Up @@ -1734,7 +1756,7 @@ func (rh *RouteHandler) GetOrasReferrers(response http.ResponseWriter, request *

rh.c.Log.Info().Str("digest", digest.String()).Str("artifactType", artifactType).Msg("getting manifest")

refs, err := getOrasReferrers(request.Context(), rh, imgStore, name, digest, artifactType) //nolint:contextcheck
refs, err := getOrasReferrers(rh, imgStore, name, digest, artifactType) //nolint:contextcheck
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found")
Expand Down
14 changes: 7 additions & 7 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func TypeOf(v interface{}) string {

func MakeHTTPGetRequest(httpClient *http.Client, username string, password string, resultPtr interface{},
blobURL string, mediaType string, log log.Logger,
) ([]byte, int, error) {
) ([]byte, string, int, error) {
req, err := http.NewRequest(http.MethodGet, blobURL, nil) //nolint
if err != nil {
return nil, 0, err
return nil, "", 0, err
}

req.Header.Set("Accept", mediaType)
Expand All @@ -158,23 +158,23 @@ func MakeHTTPGetRequest(httpClient *http.Client, username string, password strin
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't get blob: %s", blobURL)

return nil, -1, err
return nil, "", -1, err
}

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't get blob: %s", blobURL)

return nil, resp.StatusCode, err
return nil, "", resp.StatusCode, err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Error().Str("status code", fmt.Sprint(resp.StatusCode)).Err(err).Msgf("couldn't get blob: %s", blobURL)

return nil, resp.StatusCode, errors.New(string(body)) //nolint:goerr113
return nil, "", resp.StatusCode, errors.New(string(body)) //nolint:goerr113
}

// read blob
Expand All @@ -184,10 +184,10 @@ func MakeHTTPGetRequest(httpClient *http.Client, username string, password strin
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't unmarshal blob: %s", blobURL)

return body, resp.StatusCode, err
return body, "", resp.StatusCode, err
}

return body, resp.StatusCode, err
return body, resp.Header.Get("Content-Type"), resp.StatusCode, err
}

func DirExists(d string) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestCommon(t *testing.T) {
var resultPtr interface{}
httpClient, err := common.CreateHTTPClient(true, "localhost", tempDir)
So(err, ShouldBeNil)
_, _, err = common.MakeHTTPGetRequest(httpClient, "", "",
_, _, _, err = common.MakeHTTPGetRequest(httpClient, "", "",
resultPtr, baseURL+"/v2/", ispec.MediaTypeImageManifest, log.NewLogger("", ""))
So(err, ShouldNotBeNil)
})
Expand Down
Loading

0 comments on commit 18f3253

Please sign in to comment.