diff --git a/pkg/registry/blobs.go b/pkg/registry/blobs.go index 067227271..f38de088e 100644 --- a/pkg/registry/blobs.go +++ b/pkg/registry/blobs.go @@ -20,11 +20,15 @@ import ( "encoding/hex" "fmt" "io" + "io/ioutil" "math/rand" "net/http" "path" "strings" "sync" + + "github.com/google/go-containerregistry/pkg/name" + v1 "github.com/google/go-containerregistry/pkg/v1" ) // Returns whether this url should be handled by the blob handler @@ -46,11 +50,26 @@ func isBlob(req *http.Request) bool { // blobs type blobs struct { - // Blobs are content addresses. we store them globally underneath their sha and make no distinctions per image. - contents map[string][]byte // Each upload gets a unique id that writes occur to until finalized. uploads map[string][]byte lock sync.Mutex + + bh BlobHandler +} + +// BlobHandler is the interface for the storage layer underneath this registry. +type BlobHandler interface { + // Stat returns the size of the blob whose hash is specified, and true, + // if it exists. If not, it returns (0, false). + Stat(repo name.Repository, h v1.Hash) (int, bool) + + // Get returns true and a reader for consuming the blob specified with the hash, + // if it exists. It now, it returns (nil, false). + Get(repo name.Repository, h v1.Hash) (io.ReadCloser, bool) + + // Store stores the stream of content with the given hash, or returns the error + // encountered doing so. + Store(repo name.Repository, h v1.Hash, content io.ReadCloser) error } func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { @@ -74,9 +93,24 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { switch req.Method { case http.MethodHead: - b.lock.Lock() - defer b.lock.Unlock() - b, ok := b.contents[target] + h, err := v1.NewHash(target) + if err != nil { + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...)) + if err != nil { + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + + sz, ok := b.bh.Stat(repo, h) if !ok { return ®Error{ Status: http.StatusNotFound, @@ -85,15 +119,30 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { } } - resp.Header().Set("Content-Length", fmt.Sprint(len(b))) + resp.Header().Set("Content-Length", fmt.Sprint(sz)) resp.Header().Set("Docker-Content-Digest", target) resp.WriteHeader(http.StatusOK) return nil case http.MethodGet: - b.lock.Lock() - defer b.lock.Unlock() - b, ok := b.contents[target] + h, err := v1.NewHash(target) + if err != nil { + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...)) + if err != nil { + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + + sz, ok := b.bh.Stat(repo, h) if !ok { return ®Error{ Status: http.StatusNotFound, @@ -102,10 +151,20 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { } } - resp.Header().Set("Content-Length", fmt.Sprint(len(b))) + b, ok := b.bh.Get(repo, h) + if !ok { + return ®Error{ + Status: http.StatusNotFound, + Code: "BLOB_UNKNOWN", + Message: "Unknown blob", + } + } + defer b.Close() + + resp.Header().Set("Content-Length", fmt.Sprint(sz)) resp.Header().Set("Docker-Content-Digest", target) resp.WriteHeader(http.StatusOK) - io.Copy(resp, bytes.NewReader(b)) + io.Copy(resp, b) return nil case http.MethodPost: @@ -131,10 +190,31 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { Message: "digest does not match contents", } } + h, err := v1.NewHash(d) + if err != nil { + // This is not reachable + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...)) + if err != nil { + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } - b.lock.Lock() - defer b.lock.Unlock() - b.contents[d] = l.Bytes() + if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil { + return ®Error{ + Status: http.StatusInternalServerError, + Code: "BLOB_UPLOAD_INVALID", + Message: err.Error(), + } + } resp.Header().Set("Docker-Content-Digest", d) resp.WriteHeader(http.StatusCreated) return nil @@ -231,8 +311,32 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { Message: "digest does not match contents", } } + repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-3]...)) + if err != nil { + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + h, err := v1.NewHash(digest) + if err != nil { + // This is not reachable + return ®Error{ + Status: http.StatusBadRequest, + Code: "NAME_INVALID", + Message: err.Error(), + } + } + + if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil { + return ®Error{ + Status: http.StatusInternalServerError, + Code: "BLOB_UPLOAD_INVALID", + Message: err.Error(), + } + } - b.contents[d] = l.Bytes() delete(b.uploads, target) resp.Header().Set("Docker-Content-Digest", d) resp.WriteHeader(http.StatusCreated) @@ -246,3 +350,42 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { } } } + +type defaultBlobStore struct { + m sync.Mutex + contents map[v1.Hash][]byte +} + +var _ BlobHandler = (*defaultBlobStore)(nil) + +// Stat implements BlobHandler +func (dbs *defaultBlobStore) Stat(repo name.Repository, h v1.Hash) (int, bool) { + dbs.m.Lock() + defer dbs.m.Unlock() + b, ok := dbs.contents[h] + if !ok { + return 0, false + } + return len(b), true +} + +// Get implements BlobHandler +func (dbs *defaultBlobStore) Get(repo name.Repository, h v1.Hash) (io.ReadCloser, bool) { + dbs.m.Lock() + defer dbs.m.Unlock() + b, ok := dbs.contents[h] + return ioutil.NopCloser(bytes.NewBuffer(b)), ok +} + +// Store implements BlobHandler +func (dbs *defaultBlobStore) Store(repo name.Repository, h v1.Hash, rc io.ReadCloser) error { + dbs.m.Lock() + defer dbs.m.Unlock() + defer rc.Close() + b, err := ioutil.ReadAll(rc) + if err != nil { + return err + } + dbs.contents[h] = b + return nil +} diff --git a/pkg/registry/example_test.go b/pkg/registry/example_test.go index e70ce4601..fef994910 100644 --- a/pkg/registry/example_test.go +++ b/pkg/registry/example_test.go @@ -24,7 +24,7 @@ import ( func Example() { s := httptest.NewServer(registry.New()) defer s.Close() - resp, _ := s.Client().Get(s.URL + "/v2/bar/blobs/sha256:...") + resp, _ := s.Client().Get(s.URL + "/v2/bar/blobs/sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae") fmt.Println(resp.StatusCode) // Output: 404 } diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index c56dae26d..fec09e771 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -27,6 +27,8 @@ import ( "log" "net/http" "os" + + v1 "github.com/google/go-containerregistry/pkg/v1" ) type registry struct { @@ -77,8 +79,10 @@ func New(opts ...Option) http.Handler { r := ®istry{ log: log.New(os.Stderr, "", log.LstdFlags), blobs: blobs{ - contents: map[string][]byte{}, - uploads: map[string][]byte{}, + uploads: map[string][]byte{}, + bh: &defaultBlobStore{ + contents: map[v1.Hash][]byte{}, + }, }, manifests: manifests{ manifests: map[string]map[string]manifest{}, @@ -102,3 +106,10 @@ func Logger(l *log.Logger) Option { r.manifests.log = l } } + +// WithBlobHandler overrides the default BlobHandler. +func WithBlobHandler(bh BlobHandler) Option { + return func(r *registry) { + r.blobs.bh = bh + } +} diff --git a/pkg/registry/registry_test.go b/pkg/registry/registry_test.go index af9a384db..7e19b424f 100644 --- a/pkg/registry/registry_test.go +++ b/pkg/registry/registry_test.go @@ -92,19 +92,31 @@ func TestCalls(t *testing.T) { { Description: "GET non existent blob", Method: "GET", - URL: "/v2/foo/blobs/sha256:asd", + URL: "/v2/foo/blobs/sha256:3c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", Code: http.StatusNotFound, }, { - Description: "HEAD non existent blob", + Description: "HEAD bad hash", Method: "HEAD", URL: "/v2/foo/blobs/sha256:asd", + Code: http.StatusBadRequest, + }, + { + Description: "HEAD non existent blob", + Method: "HEAD", + URL: "/v2/foo/blobs/sha256:3c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", Code: http.StatusNotFound, }, + { + Description: "HEAD bad repository", + Method: "HEAD", + URL: "/v2/foo|bar/blobs/sha256:3c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", + Code: http.StatusBadRequest, + }, { Description: "bad blob verb", Method: "FOO", - URL: "/v2/foo/blobs/sha256:asd", + URL: "/v2/foo/blobs/sha256:3c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", Code: http.StatusBadRequest, }, { @@ -115,6 +127,12 @@ func TestCalls(t *testing.T) { Code: http.StatusOK, Header: map[string]string{"Docker-Content-Digest": "sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"}, }, + { + Description: "GET bad blob digest", + Method: "GET", + URL: "/v2/foo/blobs/sha256:asd", + Code: http.StatusBadRequest, + }, { Description: "GET blob", Digests: map[string]string{"sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae": "foo"}, @@ -123,6 +141,12 @@ func TestCalls(t *testing.T) { Code: http.StatusOK, Header: map[string]string{"Docker-Content-Digest": "sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"}, }, + { + Description: "GET bad blob repo", + Method: "GET", + URL: "/v2/foo|bar/blobs/sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", + Code: http.StatusBadRequest, + }, { Description: "HEAD blob", Digests: map[string]string{"sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae": "foo"}, @@ -147,6 +171,12 @@ func TestCalls(t *testing.T) { Code: http.StatusAccepted, Header: map[string]string{"Range": "0-0"}, }, + { + Description: "uploadurl", + Method: "POST", + URL: "/v2/foo/blobs/not-uploads", + Code: http.StatusBadRequest, + }, { Description: "uploadurl", Method: "POST", @@ -160,6 +190,13 @@ func TestCalls(t *testing.T) { URL: "/v2/foo/blobs/uploads/1", Code: http.StatusBadRequest, }, + { + Description: "monolithic upload bad repo", + Method: "POST", + URL: "/v2/foo|bar/blobs/uploads?digest=sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", + Code: http.StatusBadRequest, + Body: "foo", + }, { Description: "monolithic upload good digest", Method: "POST", @@ -183,6 +220,13 @@ func TestCalls(t *testing.T) { Body: "foo", Header: map[string]string{"Docker-Content-Digest": "sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"}, }, + { + Description: "upload bad repository", + Method: "PUT", + URL: "/v2/foo|bar/blobs/uploads/1?digest=sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", + Code: http.StatusBadRequest, + Body: "foo", + }, { Description: "upload bad digest", Method: "PUT", @@ -201,6 +245,13 @@ func TestCalls(t *testing.T) { "Location": "/v2/foo/blobs/uploads/1", }, }, + { + Description: "stream upload bad URL", + Method: "PATCH", + URL: "/v2/foo/blobs/oopsloads", + Code: http.StatusBadRequest, + Body: "foo", + }, { Description: "stream duplicate upload", Method: "PATCH", @@ -487,8 +538,10 @@ func TestCalls(t *testing.T) { if err != nil { t.Fatalf("Error getting %q: %v", tc.URL, err) } + defer resp.Body.Close() + content, _ := ioutil.ReadAll(resp.Body) if resp.StatusCode != tc.Code { - t.Errorf("Incorrect status code, got %d, want %d", resp.StatusCode, tc.Code) + t.Errorf("Incorrect status code, got %d, want %d (%s)", resp.StatusCode, tc.Code, string(content)) } for k, v := range tc.Header {