Skip to content

Commit

Permalink
Make the blob storage in pkg/registry pluggable.
Browse files Browse the repository at this point in the history
With this change folks can use `registry.WithBlobHandler(...)` to substitute a custom blob handling layer.
  • Loading branch information
mattmoor committed Oct 26, 2021
1 parent e7cd6af commit 130a80c
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 22 deletions.
173 changes: 158 additions & 15 deletions pkg/registry/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"path"
"strings"
"sync"

"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
)

// Returns whether this url should be handled by the blob handler
Expand All @@ -46,11 +50,26 @@ func isBlob(req *http.Request) bool {

// blobs
type blobs struct {
// Blobs are content addresses. we store them globally underneath their sha and make no distinctions per image.
contents map[string][]byte
// Each upload gets a unique id that writes occur to until finalized.
uploads map[string][]byte
lock sync.Mutex

bh BlobHandler
}

// BlobHandler is the interface for the storage layer underneath this registry.
type BlobHandler interface {
// Stat returns the size of the blob whose hash is specified, and true,
// if it exists. If not, it returns (0, false).
Stat(repo name.Repository, h v1.Hash) (int, bool)

// Get returns true and a reader for consuming the blob specified with the hash,
// if it exists. It now, it returns (nil, false).
Get(repo name.Repository, h v1.Hash) (io.ReadCloser, bool)

// Store stores the stream of content with the given hash, or returns the error
// encountered doing so.
Store(repo name.Repository, h v1.Hash, content io.ReadCloser) error
}

func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
Expand All @@ -74,9 +93,24 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {

switch req.Method {
case http.MethodHead:
b.lock.Lock()
defer b.lock.Unlock()
b, ok := b.contents[target]
h, err := v1.NewHash(target)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

sz, ok := b.bh.Stat(repo, h)
if !ok {
return &regError{
Status: http.StatusNotFound,
Expand All @@ -85,15 +119,30 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
}
}

resp.Header().Set("Content-Length", fmt.Sprint(len(b)))
resp.Header().Set("Content-Length", fmt.Sprint(sz))
resp.Header().Set("Docker-Content-Digest", target)
resp.WriteHeader(http.StatusOK)
return nil

case http.MethodGet:
b.lock.Lock()
defer b.lock.Unlock()
b, ok := b.contents[target]
h, err := v1.NewHash(target)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

sz, ok := b.bh.Stat(repo, h)
if !ok {
return &regError{
Status: http.StatusNotFound,
Expand All @@ -102,10 +151,20 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
}
}

resp.Header().Set("Content-Length", fmt.Sprint(len(b)))
b, ok := b.bh.Get(repo, h)
if !ok {
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: "Unknown blob",
}
}
defer b.Close()

resp.Header().Set("Content-Length", fmt.Sprint(sz))
resp.Header().Set("Docker-Content-Digest", target)
resp.WriteHeader(http.StatusOK)
io.Copy(resp, bytes.NewReader(b))
io.Copy(resp, b)
return nil

case http.MethodPost:
Expand All @@ -131,10 +190,31 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
Message: "digest does not match contents",
}
}
h, err := v1.NewHash(d)
if err != nil {
// This is not reachable
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-2]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

b.lock.Lock()
defer b.lock.Unlock()
b.contents[d] = l.Bytes()
if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UPLOAD_INVALID",
Message: err.Error(),
}
}
resp.Header().Set("Docker-Content-Digest", d)
resp.WriteHeader(http.StatusCreated)
return nil
Expand Down Expand Up @@ -231,8 +311,32 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
Message: "digest does not match contents",
}
}
repo, err := name.NewRepository(req.URL.Host + path.Join(elem[1:len(elem)-3]...))
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}
h, err := v1.NewHash(digest)
if err != nil {
// This is not reachable
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: err.Error(),
}
}

if err := b.bh.Store(repo, h, ioutil.NopCloser(l)); err != nil {
return &regError{
Status: http.StatusInternalServerError,
Code: "BLOB_UPLOAD_INVALID",
Message: err.Error(),
}
}

b.contents[d] = l.Bytes()
delete(b.uploads, target)
resp.Header().Set("Docker-Content-Digest", d)
resp.WriteHeader(http.StatusCreated)
Expand All @@ -246,3 +350,42 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
}
}
}

type defaultBlobStore struct {
m sync.Mutex
contents map[v1.Hash][]byte
}

var _ BlobHandler = (*defaultBlobStore)(nil)

// Stat implements BlobHandler
func (dbs *defaultBlobStore) Stat(repo name.Repository, h v1.Hash) (int, bool) {
dbs.m.Lock()
defer dbs.m.Unlock()
b, ok := dbs.contents[h]
if !ok {
return 0, false
}
return len(b), true
}

// Get implements BlobHandler
func (dbs *defaultBlobStore) Get(repo name.Repository, h v1.Hash) (io.ReadCloser, bool) {
dbs.m.Lock()
defer dbs.m.Unlock()
b, ok := dbs.contents[h]
return ioutil.NopCloser(bytes.NewBuffer(b)), ok
}

// Store implements BlobHandler
func (dbs *defaultBlobStore) Store(repo name.Repository, h v1.Hash, rc io.ReadCloser) error {
dbs.m.Lock()
defer dbs.m.Unlock()
defer rc.Close()
b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
dbs.contents[h] = b
return nil
}
2 changes: 1 addition & 1 deletion pkg/registry/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func Example() {
s := httptest.NewServer(registry.New())
defer s.Close()
resp, _ := s.Client().Get(s.URL + "/v2/bar/blobs/sha256:...")
resp, _ := s.Client().Get(s.URL + "/v2/bar/blobs/sha256:2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")
fmt.Println(resp.StatusCode)
// Output: 404
}
15 changes: 13 additions & 2 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"log"
"net/http"
"os"

v1 "github.com/google/go-containerregistry/pkg/v1"
)

type registry struct {
Expand Down Expand Up @@ -77,8 +79,10 @@ func New(opts ...Option) http.Handler {
r := &registry{
log: log.New(os.Stderr, "", log.LstdFlags),
blobs: blobs{
contents: map[string][]byte{},
uploads: map[string][]byte{},
uploads: map[string][]byte{},
bh: &defaultBlobStore{
contents: map[v1.Hash][]byte{},
},
},
manifests: manifests{
manifests: map[string]map[string]manifest{},
Expand All @@ -102,3 +106,10 @@ func Logger(l *log.Logger) Option {
r.manifests.log = l
}
}

// WithBlobHandler overrides the default BlobHandler.
func WithBlobHandler(bh BlobHandler) Option {
return func(r *registry) {
r.blobs.bh = bh
}
}
Loading

0 comments on commit 130a80c

Please sign in to comment.