Skip to content

Commit

Permalink
fix(zb): fixed remote repositories cleanup
Browse files Browse the repository at this point in the history
fix(storage/local): also put deduped blobs in cache, not just origin blobs

this caused an error when trying to delete deduped blobs
from multiple repositories

fix(storage/s3): check blob is present in cache before deleting

this is an edge case where dedupe is false but cacheDriver is not nil
(because in s3 we open the cache.db if storage find it in rootDir)
it caused an error when trying to delete blobs uploaded with dedupe false

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
  • Loading branch information
eusebiu-constantin-petu-dbk committed May 19, 2023
1 parent 9ca85e0 commit 69835a5
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 35 deletions.
85 changes: 74 additions & 11 deletions cmd/zb/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand All @@ -18,21 +19,83 @@ import (
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"

"zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api"
"zotregistry.io/zot/pkg/test"
)

func makeHTTPGetRequest(url string, resultPtr interface{}, client *resty.Client) error {
resp, err := client.R().Get(url)
if err != nil {
return err
}

if resp.StatusCode() != http.StatusOK {
log.Printf("unable to make GET request on %s, response status code: %d", url, resp.StatusCode())

return errors.New(string(resp.Body())) //nolint: goerr113
}

err = json.Unmarshal(resp.Body(), resultPtr)
if err != nil {
return err
}

return nil
}

func makeHTTPDeleteRequest(url string, client *resty.Client) error {
resp, err := client.R().Delete(url)
if err != nil {
return err
}

if resp.StatusCode() != http.StatusAccepted {
log.Printf("unable to make DELETE request on %s, response status code: %d", url, resp.StatusCode())

return errors.New(string(resp.Body())) //nolint: goerr113
}

return nil
}

func deleteTestRepo(repos []string, url string, client *resty.Client) error {
for _, repo := range repos {
resp, err := client.R().Delete((fmt.Sprintf("%s/v2/%s/", url, repo)))
var tags api.ImageTags

// get tags
err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/tags/list", url, repo), &tags, client)
if err != nil {
return err
}

// request specific check
statusCode := resp.StatusCode()
if statusCode != http.StatusAccepted {
return errors.ErrUnknownCode
for _, tag := range tags.Tags {
var manifest ispec.Manifest

// first get tag manifest to get containing blobs
err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), &manifest, client)
if err != nil {
return err
}

// delete blobs
for _, blob := range manifest.Layers {
err := makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, blob.Digest.String()), client)
if err != nil {
return err
}
}

// delete config blob
err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, manifest.Config.Digest.String()), client)
if err != nil {
return err
}

// delete manifest
err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), client)
if err != nil {
return err
}
}
}

Expand Down Expand Up @@ -273,7 +336,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

loc := test.Location(url, resp)
Expand Down Expand Up @@ -311,7 +374,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

// upload image config blob
Expand All @@ -325,7 +388,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

loc = test.Location(url, resp)
Expand All @@ -345,7 +408,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

// create a manifest
Expand Down Expand Up @@ -388,7 +451,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

manifestHash[repo] = manifestTag
Expand Down
6 changes: 5 additions & 1 deletion cmd/zb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func NewPerfRootCmd() *cobra.Command {

var concurrency, requests int

var skipCleanup bool

rootCmd := &cobra.Command{
Use: "zb <url>",
Short: "`zb`",
Expand Down Expand Up @@ -46,7 +48,7 @@ func NewPerfRootCmd() *cobra.Command {

requests = concurrency * (requests / concurrency)

Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR)
Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR, skipCleanup)
},
}

Expand All @@ -66,6 +68,8 @@ func NewPerfRootCmd() *cobra.Command {
"Number of requests to perform")
rootCmd.Flags().StringVarP(&outFmt, "output-format", "o", "",
"Output format of test results: stdout (default), json, ci-cd")
rootCmd.Flags().BoolVar(&skipCleanup, "skip-cleanup", false,
"Clean up pushed repos from remote registry after running benchmark (default true)")

// "version"
rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit")
Expand Down
53 changes: 36 additions & 17 deletions cmd/zb/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type testFunc func(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error

//nolint:gosec
Expand All @@ -279,6 +280,7 @@ func GetCatalog(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand Down Expand Up @@ -336,9 +338,11 @@ func GetCatalog(
}

// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -350,6 +354,7 @@ func PushMonolithStreamed(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand All @@ -363,9 +368,11 @@ func PushMonolithStreamed(
}

// clean up
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -377,6 +384,7 @@ func PushChunkStreamed(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand All @@ -390,9 +398,11 @@ func PushChunkStreamed(
}

// clean up
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -404,6 +414,7 @@ func Pull(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand Down Expand Up @@ -472,9 +483,11 @@ func Pull(
}

// clean up
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -486,6 +499,7 @@ func MixedPullAndPush(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand Down Expand Up @@ -519,9 +533,11 @@ func MixedPullAndPush(
}

// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -633,7 +649,7 @@ var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this tes
func Perf(
workdir, url, auth, repo string,
concurrency int, requests int,
outFmt string, srcIPs string, srcCIDR string,
outFmt string, srcIPs string, srcCIDR string, skipCleanup bool,
) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
// logging
Expand Down Expand Up @@ -702,7 +718,10 @@ func Perf(
log.Fatal(err)
}

_ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient)
err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup)
if err != nil {
log.Fatal(err)
}
}()
}
wg.Wait()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,13 @@ retry:
}
}

// also put dedupe blob in cache
if err := is.cache.PutBlob(dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")

return err
}

if err := os.Remove(src); err != nil {
is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob")

Expand Down
31 changes: 29 additions & 2 deletions pkg/storage/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,8 +1107,15 @@ func TestDedupeLinks(t *testing.T) {
UseRelPaths: true,
}, log)

imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
testCase.dedupe, true, log, metrics, nil, cacheDriver)
var imgStore storage.ImageStore

if testCase.dedupe {
imgStore = local.NewImageStore(dir, false, storage.DefaultGCDelay,
testCase.dedupe, true, log, metrics, nil, cacheDriver)
} else {
imgStore = local.NewImageStore(dir, false, storage.DefaultGCDelay,
testCase.dedupe, true, log, metrics, nil, nil)
}

Convey(fmt.Sprintf("Dedupe %t", testCase.dedupe), t, func(c C) {
// manifest1
Expand Down Expand Up @@ -1238,6 +1245,16 @@ func TestDedupeLinks(t *testing.T) {
So(os.SameFile(fi1, fi2), ShouldEqual, testCase.expected)

if !testCase.dedupe {
Convey("delete blobs from storage/cache should work when dedupe is false", func() {
So(blobDigest1, ShouldEqual, blobDigest2)

err = imgStore.DeleteBlob("dedupe1", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest1))
So(err, ShouldBeNil)

err = imgStore.DeleteBlob("dedupe2", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest2))
So(err, ShouldBeNil)
})

Convey("Intrerrupt rebuilding and restart, checking idempotency", func() {
for i := 0; i < 10; i++ {
taskScheduler, cancel := runAndGetScheduler()
Expand Down Expand Up @@ -1358,6 +1375,16 @@ func TestDedupeLinks(t *testing.T) {
})
}

Convey("delete blobs from storage/cache should work when dedupe is true", func() {
So(blobDigest1, ShouldEqual, blobDigest2)

err = imgStore.DeleteBlob("dedupe1", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest1))
So(err, ShouldBeNil)

err = imgStore.DeleteBlob("dedupe2", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest2))
So(err, ShouldBeNil)
})

Convey("storage and cache inconsistency", func() {
// delete blobs
err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,11 +1358,13 @@ func (is *ObjectStorage) DeleteBlob(repo string, digest godigest.Digest) error {
}

// remove cache entry and move blob contents to the next candidate if there is any
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest.String()).Str("blobPath", blobPath).
Msg("unable to remove blob path from cache")
if ok := is.cache.HasBlob(digest, blobPath); ok {
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest.String()).Str("blobPath", blobPath).
Msg("unable to remove blob path from cache")

return err
return err
}
}

// if the deleted blob is one with content
Expand Down
Loading

0 comments on commit 69835a5

Please sign in to comment.