Skip to content

Commit

Permalink
fix(zb): fixed remote repositories cleanup
Browse files Browse the repository at this point in the history
fix(storage): also put deduped blobs in cache, not just origin blobs

this caused an error when trying to delete deduped blobs
from multiple repositories
  • Loading branch information
eusebiu-constantin-petu-dbk committed May 19, 2023
1 parent 9ca85e0 commit 016fcdc
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 29 deletions.
85 changes: 74 additions & 11 deletions cmd/zb/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand All @@ -18,21 +19,83 @@ import (
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"

"zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api"
"zotregistry.io/zot/pkg/test"
)

func makeHTTPGetRequest(url string, resultPtr interface{}, client *resty.Client) error {
resp, err := client.R().Get(url)
if err != nil {
return err
}

if resp.StatusCode() != http.StatusOK {
log.Printf("unable to make GET request on %s, response status code: %d", url, resp.StatusCode())

return errors.New(string(resp.Body())) //nolint: goerr113
}

err = json.Unmarshal(resp.Body(), resultPtr)
if err != nil {
return err
}

return nil
}

func makeHTTPDeleteRequest(url string, client *resty.Client) error {
resp, err := client.R().Delete(url)
if err != nil {
return err
}

if resp.StatusCode() != http.StatusAccepted {
log.Printf("unable to make DELETE request on %s, response status code: %d", url, resp.StatusCode())

return errors.New(string(resp.Body())) //nolint: goerr113
}

return nil
}

func deleteTestRepo(repos []string, url string, client *resty.Client) error {
for _, repo := range repos {
resp, err := client.R().Delete((fmt.Sprintf("%s/v2/%s/", url, repo)))
var tags api.ImageTags

// get tags
err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/tags/list", url, repo), &tags, client)
if err != nil {
return err
}

// request specific check
statusCode := resp.StatusCode()
if statusCode != http.StatusAccepted {
return errors.ErrUnknownCode
for _, tag := range tags.Tags {
var manifest ispec.Manifest

// first get tag manifest to get containing blobs
err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), &manifest, client)
if err != nil {
return err
}

// delete blobs
for _, blob := range manifest.Layers {
err := makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, blob.Digest.String()), client)
if err != nil {
return err
}
}

// delete config blob
err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, manifest.Config.Digest.String()), client)
if err != nil {
return err
}

// delete manifest
err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), client)
if err != nil {
return err
}
}
}

Expand Down Expand Up @@ -273,7 +336,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

loc := test.Location(url, resp)
Expand Down Expand Up @@ -311,7 +374,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

// upload image config blob
Expand All @@ -325,7 +388,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusAccepted {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

loc = test.Location(url, resp)
Expand All @@ -345,7 +408,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

// create a manifest
Expand Down Expand Up @@ -388,7 +451,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusCreated {
return nil, repos, errors.ErrUnknownCode
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
}

manifestHash[repo] = manifestTag
Expand Down
6 changes: 5 additions & 1 deletion cmd/zb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func NewPerfRootCmd() *cobra.Command {

var concurrency, requests int

var skipCleanup bool

rootCmd := &cobra.Command{
Use: "zb <url>",
Short: "`zb`",
Expand Down Expand Up @@ -46,7 +48,7 @@ func NewPerfRootCmd() *cobra.Command {

requests = concurrency * (requests / concurrency)

Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR)
Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR, skipCleanup)
},
}

Expand All @@ -66,6 +68,8 @@ func NewPerfRootCmd() *cobra.Command {
"Number of requests to perform")
rootCmd.Flags().StringVarP(&outFmt, "output-format", "o", "",
"Output format of test results: stdout (default), json, ci-cd")
rootCmd.Flags().BoolVar(&skipCleanup, "skip-cleanup", false,
"Clean up pushed repos from remote registry after running benchmark (default true)")

// "version"
rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit")
Expand Down
53 changes: 36 additions & 17 deletions cmd/zb/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type testFunc func(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error

//nolint:gosec
Expand All @@ -279,6 +280,7 @@ func GetCatalog(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand Down Expand Up @@ -336,9 +338,11 @@ func GetCatalog(
}

// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -350,6 +354,7 @@ func PushMonolithStreamed(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand All @@ -363,9 +368,11 @@ func PushMonolithStreamed(
}

// clean up
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -377,6 +384,7 @@ func PushChunkStreamed(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand All @@ -390,9 +398,11 @@ func PushChunkStreamed(
}

// clean up
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -404,6 +414,7 @@ func Pull(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand Down Expand Up @@ -472,9 +483,11 @@ func Pull(
}

// clean up
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand All @@ -486,6 +499,7 @@ func MixedPullAndPush(
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string

Expand Down Expand Up @@ -519,9 +533,11 @@ func MixedPullAndPush(
}

// clean up
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -633,7 +649,7 @@ var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this tes
func Perf(
workdir, url, auth, repo string,
concurrency int, requests int,
outFmt string, srcIPs string, srcCIDR string,
outFmt string, srcIPs string, srcCIDR string, skipCleanup bool,
) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
// logging
Expand Down Expand Up @@ -702,7 +718,10 @@ func Perf(
log.Fatal(err)
}

_ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient)
err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup)
if err != nil {
log.Fatal(err)
}
}()
}
wg.Wait()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,13 @@ retry:
}
}

// also put dedupe blob in cache
if err := is.cache.PutBlob(dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")

return err
}

if err := os.Remove(src); err != nil {
is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob")

Expand Down

0 comments on commit 016fcdc

Please sign in to comment.