Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the blob storage in pkg/registry pluggable. #1158

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 188 additions & 19 deletions pkg/registry/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ import (
"bytes"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"path"
"strings"
"sync"

"github.com/google/go-containerregistry/pkg/name"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to keep this from having any dependencies outside the stdlib.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too late:

v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I agree with this goal for implementations of the interfaces, but it seems wasteful to reimplement validation that exists in the name and similar packages.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jon beat me to it 🙈

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment stands. It's foolish to reimplement the validation we have in name and other places that let us avoid dealing with stringly typed data, and I don't see the risk of a dependency cycle for these superficial things because they should absolutely NOT need a full-blown registry implementation to test interfaces.

I could absolutely get behind a depcheck style test that this never depend on pkg/v1/remote in particular.

v1 "github.com/google/go-containerregistry/pkg/v1"
)

// Returns whether this url should be handled by the blob handler
Expand All @@ -46,11 +51,26 @@ func isBlob(req *http.Request) bool {

// blobs
type blobs struct {
// Blobs are content addresses. we store them globally underneath their sha and make no distinctions per image.
contents map[string][]byte
// Each upload gets a unique id that writes occur to until finalized.
uploads map[string][]byte
lock sync.Mutex

bh BlobHandler
}

// BlobHandler is the interface for the storage layer underneath this registry.
type BlobHandler interface {
mattmoor marked this conversation as resolved.
Show resolved Hide resolved
// Stat returns the size of the blob whose hash is specified,
// if it exists. If not, it returns (0, error).
Stat(repo name.Repository, h v1.Hash) (int64, error)

// Get returns true and a reader for consuming the blob specified with the hash,
// if it exists. It now, it returns (nil, error).
Get(repo name.Repository, h v1.Hash) (io.ReadCloser, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something occurred to me about this API, thinking through how I could use it with kontain.me -- this API would require kontain.me to proxy and serve blobs from GCS, instead of HTTP redirecting like I do today, which is a lot faster and cheaper than proxying through Cloud Run.

This might just mean that pkg/registry isn't going to be a good fit for kontain.me, and that's fine, but I wanted to surface that here since I think it's a pretty nice use case that isn't supported by this proposal.


// Store stores the stream of content with the given hash, or returns the error
// encountered doing so.
Store(repo name.Repository, h v1.Hash, content io.ReadCloser) error
}

func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
Expand All @@ -74,38 +94,96 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {

switch req.Method {
case http.MethodHead:
b.lock.Lock()
defer b.lock.Unlock()
b, ok := b.contents[target]
if !ok {
h, err := v1.NewHash(target)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Resurrecting this because I'm sad we couldn't make it happen)

Aside from the question of whether we need to depend on pkg/name here, I wonder if req.URL.Host is adding anything besides letting the string pass through NewRepository. Really all blob storers want is the subrepo path.

I hear the concern about duplicating validation logic though. Would it be reasonable to export a method from pkg/name to validate a subrepo path just so pkg/registry could use it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sad we couldn't make it happen

Me too, but alas I've unblocked myself (and it is glorious 😉 ).

I hear the concern about duplicating validation logic though

We have a bigger issue here. This repo is now apparently governed by unwritten rules in one person's head, which also have had no rationale articulated anywhere that was arrived at by consensus of the stakeholders in this library.

I've clearly demonstrated a willingness to send PRs here (and elsewhere) to fix things, move things forward, and I'm happy to engage in productive discussions about them, but waiting a week to get a Nack based on unwritten rules (that are already violated with what is checked in), is frankly both aggravating and unsustainable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Package registry implements a docker V2 registry and the OCI distribution specification.
//
// It is designed to be used anywhere a low dependency container registry is needed, with an
// initial focus on tests.
//
// Its goal is to be standards compliant and its strictness will increase over time.
//
// This is currently a low flightmiles system. It's likely quite safe to use in tests; If you're using it
// in production, please let us know how and send us CL's for integration tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order for this to disqualify this change, "low dependency" must be interpreted to exclude what it currently imports. 🙃

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a bigger issue here. This repo is now apparently governed by unwritten rules in one person's head, which also have had no rationale articulated anywhere that was arrived at by consensus of the stakeholders in this library.

Hoping to address this in #1166

Taking the issue of dependency philosophy aside, I think I'm still convinced that the dependency on pkg/name is unnecessary, or at least having a full name.Repository in the interface is. If not for validation of the repository path, you don't actually need a full name.Repository, and could just get by with Get(string path, h v1.Hash), and not add any new dependencies. Path validation itself is fairly simple, and could be moved to internal/something and shared by pkg/name and pkg/registry.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are assuming details of the blob layer by stating that it can only care about path. Every single cloud vendor registry uses host to route storage requests, Amazon even gives each customer their own hostname per region. I doubt this is how you wrote kontain.me, but….

Hopefully you get my point.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you're saying. The same code depending on pkg/registry with some blob storage impl, running at foo.registry.example and bar.registry.example, might decide to store those blobs differently based on the host.

Can we document this expectation in the doc comments for BlobHandler? It wasn't immediately clear to me (because I'm a big dumb dummy) why the host was included, except to make path validation easier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document this expectation in the doc comments for BlobHandler

Absolutely.

if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

sz, err := b.bh.Stat(repo, h)
if errors.Is(err, ErrBlobNotFound) {
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: "Unknown blob",
Message: err.Error(),
}
} else if err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UNKNOWN",
Message: err.Error(),
}
}

resp.Header().Set("Content-Length", fmt.Sprint(len(b)))
resp.Header().Set("Content-Length", fmt.Sprint(sz))
resp.Header().Set("Docker-Content-Digest", target)
resp.WriteHeader(http.StatusOK)
return nil

case http.MethodGet:
b.lock.Lock()
defer b.lock.Unlock()
b, ok := b.contents[target]
if !ok {
h, err := v1.NewHash(target)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

sz, err := b.bh.Stat(repo, h)
if errors.Is(err, ErrBlobNotFound) {
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: "Unknown blob",
Message: err.Error(),
}
} else if err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UNKNOWN",
Message: err.Error(),
}
}

b, err := b.bh.Get(repo, h)
if errors.Is(err, ErrBlobNotFound) {
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: err.Error(),
}
} else if err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UNKNOWN",
Message: err.Error(),
}
}
defer b.Close()

resp.Header().Set("Content-Length", fmt.Sprint(len(b)))
resp.Header().Set("Content-Length", fmt.Sprint(sz))
resp.Header().Set("Docker-Content-Digest", target)
resp.WriteHeader(http.StatusOK)
io.Copy(resp, bytes.NewReader(b))
io.Copy(resp, b)
return nil

case http.MethodPost:
Expand All @@ -131,10 +209,31 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
Message: "digest does not match contents",
}
}
h, err := v1.NewHash(d)
if err != nil {
// This is not reachable
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

b.lock.Lock()
defer b.lock.Unlock()
b.contents[d] = l.Bytes()
if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UPLOAD_INVALID",
Message: err.Error(),
}
}
resp.Header().Set("Docker-Content-Digest", d)
resp.WriteHeader(http.StatusCreated)
return nil
Expand Down Expand Up @@ -231,8 +330,32 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
Message: "digest does not match contents",
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-3]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
h, err := v1.NewHash(digest)
if err != nil {
// This is not reachable
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UPLOAD_INVALID",
Message: err.Error(),
}
}

b.contents[d] = l.Bytes()
delete(b.uploads, target)
resp.Header().Set("Docker-Content-Digest", d)
resp.WriteHeader(http.StatusCreated)
Expand All @@ -246,3 +369,49 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
}
}
}

type defaultBlobStore struct {
m sync.Mutex
contents map[v1.Hash][]byte
}

var _ BlobHandler = (*defaultBlobStore)(nil)

// ErrBlobNotFound is the error returned (possibly wrapped)
// when a given blob is not found.
var ErrBlobNotFound = errors.New("blob not found")

// Stat implements BlobHandler
func (dbs *defaultBlobStore) Stat(repo name.Repository, h v1.Hash) (int64, error) {
dbs.m.Lock()
defer dbs.m.Unlock()
b, ok := dbs.contents[h]
if !ok {
return 0, fmt.Errorf("%w: %s", ErrBlobNotFound, h)
}
return int64(len(b)), nil
}

// Get implements BlobHandler
func (dbs *defaultBlobStore) Get(repo name.Repository, h v1.Hash) (io.ReadCloser, error) {
dbs.m.Lock()
defer dbs.m.Unlock()
b, ok := dbs.contents[h]
if !ok {
return nil, fmt.Errorf("%w: %s", ErrBlobNotFound, h)
}
return ioutil.NopCloser(bytes.NewBuffer(b)), nil
}

// Store implements BlobHandler
func (dbs *defaultBlobStore) Store(repo name.Repository, h v1.Hash, rc io.ReadCloser) error {
dbs.m.Lock()
defer dbs.m.Unlock()
defer rc.Close()
b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
dbs.contents[h] = b
return nil
}
2 changes: 1 addition & 1 deletion pkg/registry/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func Example() {
s := httptest.NewServer(registry.New())
defer s.Close()
resp, _ := s.Client().Get(s.URL + "/v2/bar/blobs/sha256:...")
resp, _ := s.Client().Get(s.URL + "/v2/bar/blobs/sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")
fmt.Println(resp.StatusCode)
// Output: 404
}
15 changes: 13 additions & 2 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"log"
"net/http"
"os"

v1 "github.com/google/go-containerregistry/pkg/v1"
)

type registry struct {
Expand Down Expand Up @@ -77,8 +79,10 @@ func New(opts ...Option) http.Handler {
r := &registry{
log: log.New(os.Stderr, "", log.LstdFlags),
blobs: blobs{
contents: map[string][]byte{},
uploads: map[string][]byte{},
uploads: map[string][]byte{},
bh: &defaultBlobStore{
contents: map[v1.Hash][]byte{},
},
},
manifests: manifests{
manifests: map[string]map[string]manifest{},
Expand All @@ -102,3 +106,10 @@ func Logger(l *log.Logger) Option {
r.manifests.log = l
}
}

// WithBlobHandler overrides the default BlobHandler.
func WithBlobHandler(bh BlobHandler) Option {
return func(r *registry) {
r.blobs.bh = bh
}
}
Loading