-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update route to add proxy related middleware Add proxy controller Signed-off-by: stonezdj <stonezdj@gmail.com>
- Loading branch information
Showing
9 changed files
with
1,085 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,215 @@ | ||
// Copyright Project Harbor Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package proxy | ||
|
||
import ( | ||
"context" | ||
"github.com/opencontainers/go-digest" | ||
"io" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/docker/distribution" | ||
"github.com/docker/distribution/manifest/manifestlist" | ||
"github.com/goharbor/harbor/src/common/models" | ||
"github.com/goharbor/harbor/src/controller/artifact" | ||
"github.com/goharbor/harbor/src/controller/blob" | ||
"github.com/goharbor/harbor/src/lib" | ||
"github.com/goharbor/harbor/src/lib/errors" | ||
"github.com/goharbor/harbor/src/lib/log" | ||
"github.com/goharbor/harbor/src/replication/registry" | ||
v1 "github.com/opencontainers/image-spec/specs-go/v1" | ||
) | ||
|
||
const ( | ||
// wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready | ||
maxManifestListWait = 20 | ||
maxManifestWait = 10 | ||
sleepIntervalSec = 20 | ||
) | ||
|
||
var ( | ||
// Ctl is a global proxy controller instance | ||
ctl Controller | ||
once sync.Once | ||
) | ||
|
||
// Controller defines the operations related with pull through proxy | ||
type Controller interface { | ||
// UseLocalBlob check if the blob should use local copy | ||
UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool | ||
// UseLocalManifest check manifest should use local copy | ||
UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool | ||
// ProxyBlob proxy the blob request to the remote server, p is the proxy project | ||
// art is the ArtifactInfo which includes the digest of the blob | ||
ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) | ||
// ProxyManifest proxy the manifest request to the remote server, p is the proxy project, | ||
// art is the ArtifactInfo which includes the tag or digest of the manifest | ||
ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) | ||
} | ||
type controller struct { | ||
blobCtl blob.Controller | ||
registryMgr registry.Manager | ||
artifactCtl artifact.Controller | ||
local localInterface | ||
} | ||
|
||
// ControllerInstance -- Get the proxy controller instance | ||
func ControllerInstance() Controller { | ||
// Lazy load the controller | ||
// Because LocalHelper is not ready unless core startup completely | ||
once.Do(func() { | ||
ctl = &controller{ | ||
blobCtl: blob.Ctl, | ||
registryMgr: registry.NewDefaultManager(), | ||
artifactCtl: artifact.Ctl, | ||
local: newLocalHelper(), | ||
} | ||
}) | ||
|
||
return ctl | ||
} | ||
|
||
func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool { | ||
if len(art.Digest) == 0 { | ||
return false | ||
} | ||
exist, err := c.local.BlobExist(ctx, art) | ||
if err != nil { | ||
return false | ||
} | ||
return exist | ||
} | ||
|
||
func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool { | ||
if len(art.Digest) == 0 { | ||
return false | ||
} | ||
return c.local.ManifestExist(ctx, art) | ||
} | ||
|
||
func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) { | ||
var man distribution.Manifest | ||
remoteRepo := getRemoteRepo(art) | ||
r, err := newRemoteHelper(p.RegistryID) | ||
if err != nil { | ||
return man, err | ||
} | ||
ref := getReference(art) | ||
man, err = r.Manifest(remoteRepo, ref) | ||
if err != nil { | ||
if errors.IsNotFoundErr(err) { | ||
go func() { | ||
c.local.DeleteManifest(remoteRepo, art.Tag) | ||
}() | ||
} | ||
return man, err | ||
} | ||
ct, _, err := man.Payload() | ||
if err != nil { | ||
return man, err | ||
} | ||
// Push manifest in background | ||
go func() { | ||
c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, r) | ||
}() | ||
|
||
return man, nil | ||
} | ||
|
||
func (c *controller) ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) { | ||
remoteRepo := getRemoteRepo(art) | ||
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo) | ||
rHelper, err := newRemoteHelper(p.RegistryID) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
|
||
size, bReader, err := rHelper.BlobReader(remoteRepo, art.Digest) | ||
if err != nil { | ||
log.Errorf("failed to pull blob, error %v", err) | ||
return 0, nil, err | ||
} | ||
desc := distribution.Descriptor{Size: size, Digest: digest.Digest(art.Digest)} | ||
go func() { | ||
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, rHelper) | ||
if err != nil { | ||
log.Errorf("error while putting blob to local repo, %v", err) | ||
} | ||
}() | ||
return size, bReader, nil | ||
} | ||
|
||
func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r remoteInterface) error { | ||
log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest) | ||
_, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest)) | ||
if err != nil { | ||
log.Errorf("failed to create blob reader, error %v", err) | ||
return err | ||
} | ||
defer bReader.Close() | ||
err = c.local.PushBlob(localRepo, desc, bReader) | ||
return err | ||
} | ||
|
||
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r remoteInterface) { | ||
if contType == manifestlist.MediaTypeManifestList || contType == v1.MediaTypeImageIndex { | ||
err := c.local.PushManifestList(ctx, art.Repository, getReference(art), man) | ||
if err != nil { | ||
log.Errorf("error when push manifest list to local :%v", err) | ||
} | ||
return | ||
} | ||
var waitBlobs []distribution.Descriptor | ||
for n := 0; n < maxManifestWait; n++ { | ||
time.Sleep(sleepIntervalSec * time.Second) | ||
waitBlobs = c.local.CheckDependencies(ctx, art.Repository, man) | ||
if len(waitBlobs) == 0 { | ||
break | ||
} | ||
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag) | ||
} | ||
if len(waitBlobs) > 0 { | ||
// docker client will skip to pull layers exist in local | ||
// these blobs is not exist in the proxy server | ||
// it will cause the manifest dependency check always fail | ||
// need to push these blobs before push manifest to avoid failure | ||
log.Debug("Waiting blobs not empty, push it to local repo directly") | ||
for _, desc := range waitBlobs { | ||
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, r) | ||
if err != nil { | ||
log.Errorf("Failed to push blob to local repo, error: %v", err) | ||
return | ||
} | ||
} | ||
} | ||
err := c.local.PushManifest(art.Repository, getReference(art), man) | ||
if err != nil { | ||
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err) | ||
} | ||
} | ||
|
||
// getRemoteRepo get the remote repository name, used in proxy cache | ||
func getRemoteRepo(art lib.ArtifactInfo) string { | ||
return strings.TrimPrefix(art.Repository, art.ProjectName+"/") | ||
} | ||
|
||
func getReference(art lib.ArtifactInfo) string { | ||
if len(art.Tag) > 0 { | ||
return art.Tag | ||
} | ||
return art.Digest | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// Copyright Project Harbor Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package proxy | ||
|
||
import ( | ||
"context" | ||
"github.com/docker/distribution" | ||
"github.com/goharbor/harbor/src/controller/artifact" | ||
"github.com/goharbor/harbor/src/controller/blob" | ||
"github.com/goharbor/harbor/src/lib" | ||
"github.com/goharbor/harbor/src/replication/registry" | ||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/suite" | ||
"io" | ||
"testing" | ||
) | ||
|
||
type localInterfaceMock struct { | ||
mock.Mock | ||
} | ||
|
||
func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) { | ||
args := l.Called(ctx, art) | ||
return args.Bool(0), args.Error(1) | ||
} | ||
|
||
func (l *localInterfaceMock) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool { | ||
args := l.Called(ctx, art) | ||
return args.Bool(0) | ||
} | ||
|
||
func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error { | ||
panic("implement me") | ||
} | ||
|
||
func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error { | ||
panic("implement me") | ||
} | ||
|
||
func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error { | ||
panic("implement me") | ||
} | ||
|
||
func (l *localInterfaceMock) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor { | ||
panic("implement me") | ||
} | ||
|
||
func (l *localInterfaceMock) DeleteManifest(repo, ref string) { | ||
panic("implement me") | ||
} | ||
|
||
type proxyControllerTestSuite struct { | ||
suite.Suite | ||
local *localInterfaceMock | ||
ctr Controller | ||
} | ||
|
||
func (p *proxyControllerTestSuite) SetupTest() { | ||
p.local = &localInterfaceMock{} | ||
p.ctr = &controller{ | ||
blobCtl: blob.Ctl, | ||
registryMgr: registry.NewDefaultManager(), | ||
artifactCtl: artifact.Ctl, | ||
local: p.local, | ||
} | ||
} | ||
|
||
func (p *proxyControllerTestSuite) TestUseLocalManifest_True() { | ||
ctx := context.Background() | ||
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" | ||
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} | ||
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil) | ||
result := p.ctr.UseLocalManifest(ctx, art) | ||
p.Assert().True(result) | ||
} | ||
|
||
func (p *proxyControllerTestSuite) TestUseLocalManifest_False() { | ||
ctx := context.Background() | ||
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" | ||
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} | ||
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(false, nil) | ||
result := p.ctr.UseLocalManifest(ctx, art) | ||
p.Assert().False(result) | ||
} | ||
|
||
func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() { | ||
ctx := context.Background() | ||
art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"} | ||
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil) | ||
result := p.ctr.UseLocalManifest(ctx, art) | ||
p.Assert().False(result) | ||
} | ||
|
||
func (p *proxyControllerTestSuite) TestUseLocalBlob_True() { | ||
ctx := context.Background() | ||
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" | ||
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} | ||
p.local.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil) | ||
result := p.ctr.UseLocalBlob(ctx, art) | ||
p.Assert().True(result) | ||
} | ||
|
||
func (p *proxyControllerTestSuite) TestUseLocalBlob_False() { | ||
ctx := context.Background() | ||
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" | ||
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} | ||
p.local.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil) | ||
result := p.ctr.UseLocalBlob(ctx, art) | ||
p.Assert().False(result) | ||
} | ||
|
||
func TestProxyControllerTestSuite(t *testing.T) { | ||
suite.Run(t, &proxyControllerTestSuite{}) | ||
} | ||
|
||
func TestProxyCacheRemoteRepo(t *testing.T) { | ||
cases := []struct { | ||
name string | ||
in lib.ArtifactInfo | ||
want string | ||
}{ | ||
{ | ||
name: `normal test`, | ||
in: lib.ArtifactInfo{ProjectName: "dockerhub_proxy", Repository: "dockerhub_proxy/firstfloor/hello-world"}, | ||
want: "firstfloor/hello-world", | ||
}, | ||
} | ||
for _, tt := range cases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
got := getRemoteRepo(tt.in) | ||
if got != tt.want { | ||
t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want) | ||
} | ||
}) | ||
} | ||
} | ||
func TestGetRef(t *testing.T) { | ||
cases := []struct { | ||
name string | ||
in lib.ArtifactInfo | ||
want string | ||
}{ | ||
{ | ||
name: `normal`, | ||
in: lib.ArtifactInfo{Repository: "hello-world", Tag: "latest", Digest: "sha256:aabbcc"}, | ||
want: "latest", | ||
}, | ||
{ | ||
name: `digest_only`, | ||
in: lib.ArtifactInfo{Repository: "hello-world", Tag: "", Digest: "sha256:aabbcc"}, | ||
want: "sha256:aabbcc", | ||
}, | ||
} | ||
for _, tt := range cases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
got := getReference(tt.in) | ||
if got != tt.want { | ||
t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.