-
Notifications
You must be signed in to change notification settings - Fork 543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make the blob storage in pkg/registry pluggable. #1158
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,13 +18,18 @@ import ( | |||||||||||||||||||
"bytes" | ||||||||||||||||||||
"crypto/sha256" | ||||||||||||||||||||
"encoding/hex" | ||||||||||||||||||||
"errors" | ||||||||||||||||||||
"fmt" | ||||||||||||||||||||
"io" | ||||||||||||||||||||
"io/ioutil" | ||||||||||||||||||||
"math/rand" | ||||||||||||||||||||
"net/http" | ||||||||||||||||||||
"path" | ||||||||||||||||||||
"strings" | ||||||||||||||||||||
"sync" | ||||||||||||||||||||
|
||||||||||||||||||||
"github.com/google/go-containerregistry/pkg/name" | ||||||||||||||||||||
v1 "github.com/google/go-containerregistry/pkg/v1" | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
// Returns whether this url should be handled by the blob handler | ||||||||||||||||||||
|
@@ -46,11 +51,26 @@ func isBlob(req *http.Request) bool { | |||||||||||||||||||
|
||||||||||||||||||||
// blobs | ||||||||||||||||||||
type blobs struct { | ||||||||||||||||||||
// Blobs are content addresses. we store them globally underneath their sha and make no distinctions per image. | ||||||||||||||||||||
contents map[string][]byte | ||||||||||||||||||||
// Each upload gets a unique id that writes occur to until finalized. | ||||||||||||||||||||
uploads map[string][]byte | ||||||||||||||||||||
lock sync.Mutex | ||||||||||||||||||||
|
||||||||||||||||||||
bh BlobHandler | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// BlobHandler is the interface for the storage layer underneath this registry. | ||||||||||||||||||||
type BlobHandler interface { | ||||||||||||||||||||
mattmoor marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
// Stat returns the size of the blob whose hash is specified, | ||||||||||||||||||||
// if it exists. If not, it returns (0, error). | ||||||||||||||||||||
Stat(repo name.Repository, h v1.Hash) (int64, error) | ||||||||||||||||||||
|
||||||||||||||||||||
// Get returns true and a reader for consuming the blob specified with the hash, | ||||||||||||||||||||
// if it exists. It now, it returns (nil, error). | ||||||||||||||||||||
Get(repo name.Repository, h v1.Hash) (io.ReadCloser, error) | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something occurred to me about this API, thinking through how I could use it with kontain.me -- this API would require kontain.me to proxy and serve blobs from GCS, instead of HTTP redirecting like I do today, which is a lot faster and cheaper than proxying through Cloud Run. This might just mean that |
||||||||||||||||||||
|
||||||||||||||||||||
// Store stores the stream of content with the given hash, or returns the error | ||||||||||||||||||||
// encountered doing so. | ||||||||||||||||||||
Store(repo name.Repository, h v1.Hash, content io.ReadCloser) error | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { | ||||||||||||||||||||
|
@@ -74,38 +94,96 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { | |||||||||||||||||||
|
||||||||||||||||||||
switch req.Method { | ||||||||||||||||||||
case http.MethodHead: | ||||||||||||||||||||
b.lock.Lock() | ||||||||||||||||||||
defer b.lock.Unlock() | ||||||||||||||||||||
b, ok := b.contents[target] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
h, err := v1.NewHash(target) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...)) | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Resurrecting this because I'm sad we couldn't make it happen) Aside from the question of whether we need to depend on I hear the concern about duplicating validation logic though. Would it be reasonable to export a method from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Me too, but alas I've unblocked myself (and it is glorious 😉 ).
We have a bigger issue here. This repo is now apparently governed by unwritten rules in one person's head, which also have had no rationale articulated anywhere that was arrived at by consensus of the stakeholders in this library. I've clearly demonstrated a willingness to send PRs here (and elsewhere) to fix things, move things forward, and I'm happy to engage in productive discussions about them, but waiting a week to get a Nack based on unwritten rules (that are already violated with what is checked in), is frankly both aggravating and unsustainable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. go-containerregistry/pkg/registry/registry.go Lines 15 to 23 in a0c4bd2
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order for this to disqualify this change, "low dependency" must be interpreted to exclude what it currently imports. 🙃 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I'd recommend a README: https://github.com/google/go-containerregistry#philosophy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hoping to address this in #1166 Taking the issue of dependency philosophy aside, I think I'm still convinced that the dependency on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are assuming details of the blob layer by stating that it can only care about path. Every single cloud vendor registry uses host to route storage requests, Amazon even gives each customer their own hostname per region. I doubt this is how you wrote kontain.me, but…. Hopefully you get my point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see what you're saying. The same code depending on pkg/registry with some blob storage impl, running at Can we document this expectation in the doc comments for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Absolutely. |
||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
sz, err := b.bh.Stat(repo, h) | ||||||||||||||||||||
if errors.Is(err, ErrBlobNotFound) { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusNotFound, | ||||||||||||||||||||
Code: "BLOB_UNKNOWN", | ||||||||||||||||||||
Message: "Unknown blob", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} else if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusInternalServerError, | ||||||||||||||||||||
Code: "BLOB_UNKNOWN", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
resp.Header().Set("Content-Length", fmt.Sprint(len(b))) | ||||||||||||||||||||
resp.Header().Set("Content-Length", fmt.Sprint(sz)) | ||||||||||||||||||||
resp.Header().Set("Docker-Content-Digest", target) | ||||||||||||||||||||
resp.WriteHeader(http.StatusOK) | ||||||||||||||||||||
return nil | ||||||||||||||||||||
|
||||||||||||||||||||
case http.MethodGet: | ||||||||||||||||||||
b.lock.Lock() | ||||||||||||||||||||
defer b.lock.Unlock() | ||||||||||||||||||||
b, ok := b.contents[target] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
h, err := v1.NewHash(target) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...)) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
sz, err := b.bh.Stat(repo, h) | ||||||||||||||||||||
if errors.Is(err, ErrBlobNotFound) { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusNotFound, | ||||||||||||||||||||
Code: "BLOB_UNKNOWN", | ||||||||||||||||||||
Message: "Unknown blob", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} else if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusInternalServerError, | ||||||||||||||||||||
Code: "BLOB_UNKNOWN", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
b, err := b.bh.Get(repo, h) | ||||||||||||||||||||
if errors.Is(err, ErrBlobNotFound) { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusNotFound, | ||||||||||||||||||||
Code: "BLOB_UNKNOWN", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} else if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusInternalServerError, | ||||||||||||||||||||
Code: "BLOB_UNKNOWN", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
defer b.Close() | ||||||||||||||||||||
|
||||||||||||||||||||
resp.Header().Set("Content-Length", fmt.Sprint(len(b))) | ||||||||||||||||||||
resp.Header().Set("Content-Length", fmt.Sprint(sz)) | ||||||||||||||||||||
resp.Header().Set("Docker-Content-Digest", target) | ||||||||||||||||||||
resp.WriteHeader(http.StatusOK) | ||||||||||||||||||||
io.Copy(resp, bytes.NewReader(b)) | ||||||||||||||||||||
io.Copy(resp, b) | ||||||||||||||||||||
return nil | ||||||||||||||||||||
|
||||||||||||||||||||
case http.MethodPost: | ||||||||||||||||||||
|
@@ -131,10 +209,31 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { | |||||||||||||||||||
Message: "digest does not match contents", | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
h, err := v1.NewHash(d) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
// This is not reachable | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...)) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
b.lock.Lock() | ||||||||||||||||||||
defer b.lock.Unlock() | ||||||||||||||||||||
b.contents[d] = l.Bytes() | ||||||||||||||||||||
if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusInternalServerError, | ||||||||||||||||||||
Code: "BLOB_UPLOAD_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
resp.Header().Set("Docker-Content-Digest", d) | ||||||||||||||||||||
resp.WriteHeader(http.StatusCreated) | ||||||||||||||||||||
return nil | ||||||||||||||||||||
|
@@ -231,8 +330,32 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { | |||||||||||||||||||
Message: "digest does not match contents", | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-3]...)) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
h, err := v1.NewHash(digest) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
// This is not reachable | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusBadRequest, | ||||||||||||||||||||
Code: "NAME_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil { | ||||||||||||||||||||
return ®Error{ | ||||||||||||||||||||
Status: http.StatusInternalServerError, | ||||||||||||||||||||
Code: "BLOB_UPLOAD_INVALID", | ||||||||||||||||||||
Message: err.Error(), | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
b.contents[d] = l.Bytes() | ||||||||||||||||||||
delete(b.uploads, target) | ||||||||||||||||||||
resp.Header().Set("Docker-Content-Digest", d) | ||||||||||||||||||||
resp.WriteHeader(http.StatusCreated) | ||||||||||||||||||||
|
@@ -246,3 +369,49 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { | |||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
type defaultBlobStore struct { | ||||||||||||||||||||
m sync.Mutex | ||||||||||||||||||||
contents map[v1.Hash][]byte | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
var _ BlobHandler = (*defaultBlobStore)(nil) | ||||||||||||||||||||
|
||||||||||||||||||||
// ErrBlobNotFound is the error returned (possibly wrapped) | ||||||||||||||||||||
// when a given blob is not found. | ||||||||||||||||||||
var ErrBlobNotFound = errors.New("blob not found") | ||||||||||||||||||||
|
||||||||||||||||||||
// Stat implements BlobHandler | ||||||||||||||||||||
func (dbs *defaultBlobStore) Stat(repo name.Repository, h v1.Hash) (int64, error) { | ||||||||||||||||||||
dbs.m.Lock() | ||||||||||||||||||||
defer dbs.m.Unlock() | ||||||||||||||||||||
b, ok := dbs.contents[h] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
return 0, fmt.Errorf("%w: %s", ErrBlobNotFound, h) | ||||||||||||||||||||
} | ||||||||||||||||||||
return int64(len(b)), nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Get implements BlobHandler | ||||||||||||||||||||
func (dbs *defaultBlobStore) Get(repo name.Repository, h v1.Hash) (io.ReadCloser, error) { | ||||||||||||||||||||
dbs.m.Lock() | ||||||||||||||||||||
defer dbs.m.Unlock() | ||||||||||||||||||||
b, ok := dbs.contents[h] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
return nil, fmt.Errorf("%w: %s", ErrBlobNotFound, h) | ||||||||||||||||||||
} | ||||||||||||||||||||
return ioutil.NopCloser(bytes.NewBuffer(b)), nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Store implements BlobHandler | ||||||||||||||||||||
func (dbs *defaultBlobStore) Store(repo name.Repository, h v1.Hash, rc io.ReadCloser) error { | ||||||||||||||||||||
dbs.m.Lock() | ||||||||||||||||||||
defer dbs.m.Unlock() | ||||||||||||||||||||
defer rc.Close() | ||||||||||||||||||||
b, err := ioutil.ReadAll(rc) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return err | ||||||||||||||||||||
} | ||||||||||||||||||||
dbs.contents[h] = b | ||||||||||||||||||||
return nil | ||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been trying to keep this from having any dependencies outside the stdlib.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too late:
go-containerregistry/pkg/registry/manifest.go
Lines 31 to 32 in 0dfbb56
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, I agree with this goal for implementations of the interfaces, but it seems wasteful to reimplement validation that exists in the
name
and similar packages.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#798 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jon beat me to it 🙈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment stands. It's foolish to reimplement the validation we have in
name
and other places that let us avoid dealing with stringly typed data, and I don't see the risk of a dependency cycle for these superficial things because they should absolutely NOT need a full-blown registry implementation to test interfaces.I could absolutely get behind a
depcheck
style test that this never depend onpkg/v1/remote
in particular.