Skip to content

Commit

Permalink
fix(gc): align the blobs in the artifact trash and useless blobs
Browse files Browse the repository at this point in the history
During the GC mark stage, blobs corresponding to newly deleted artifacts
during the query process are removed to ensure the consistency of data
in trash and blobs and prevent dirty data from being left behind.

Signed-off-by: chlins <chlins.zhang@gmail.com>
  • Loading branch information
chlins committed Jul 17, 2024
1 parent 0da13eb commit c947027
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 0 deletions.
49 changes: 49 additions & 0 deletions src/jobservice/job/impl/gc/garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {

// mark
func (gc *GarbageCollector) mark(ctx job.Context) error {
// the following logic for querying the deleted artifacts and useless blobs may took long time, records the start and end time
// for querying the artifacts which be deleted in this interval.
start := time.Now()
arts, err := gc.deletedArt(ctx)
if err != nil {
gc.logger.Errorf("failed to get deleted Artifacts in gc job, with error: %v", err)
Expand All @@ -226,6 +229,8 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
gc.logger.Errorf("failed to get gc candidate: %v", err)
return err
}
end := time.Now()

if len(orphanBlobs) != 0 {
blobs = append(blobs, orphanBlobs...)
}
Expand All @@ -237,12 +242,28 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
return nil
}

trashBlobs, err := gc.listTrashBlobsByTimeRange(ctx, start, end)
if err != nil {
gc.logger.Errorf("failed to lists the blobs associated with the artifacts deleted in the time range %s-%s, error: %v", start, end, err)
return err
}

Check warning on line 249 in src/jobservice/job/impl/gc/garbage_collection.go

View check run for this annotation

Codecov / codecov/patch

src/jobservice/job/impl/gc/garbage_collection.go#L247-L249

Added lines #L247 - L249 were not covered by tests
// the trash blobs should be retained and wait for next GC because their artifact trash has not been filtered in this cycle.
shouldRetainedBlobs := make(map[string]bool)
for _, blob := range trashBlobs {
shouldRetainedBlobs[blob.Digest] = true
}

Check warning on line 254 in src/jobservice/job/impl/gc/garbage_collection.go

View check run for this annotation

Codecov / codecov/patch

src/jobservice/job/impl/gc/garbage_collection.go#L253-L254

Added lines #L253 - L254 were not covered by tests

// update delete status for the candidates.
blobCt := 0
mfCt := 0
makeSize := int64(0)

for _, blob := range blobs {
if shouldRetainedBlobs[blob.Digest] {
gc.logger.Debugf("skip mark the blob %s, because it should be retained in this GC cycle.", blob.Digest)
continue

Check warning on line 264 in src/jobservice/job/impl/gc/garbage_collection.go

View check run for this annotation

Codecov / codecov/patch

src/jobservice/job/impl/gc/garbage_collection.go#L263-L264

Added lines #L263 - L264 were not covered by tests
}

if !gc.dryRun {
if gc.shouldStop(ctx) {
return errGcStop
Expand Down Expand Up @@ -728,6 +749,34 @@ func (gc *GarbageCollector) shouldStop(ctx job.Context) bool {
return false
}

// listTrashBlobsByTimeRange queries the blobs associated with the artifact be deleted in the fixed time range.
func (gc *GarbageCollector) listTrashBlobsByTimeRange(ctx job.Context, start, end time.Time) ([]*blobModels.Blob, error) {
query := &q.Query{
Keywords: map[string]interface{}{
"creation_time": &q.Range{
Max: end,
Min: start,
},
},
}
arts, err := gc.artrashMgr.List(ctx.SystemContext(), query)
if err != nil {
return nil, err
}

Check warning on line 765 in src/jobservice/job/impl/gc/garbage_collection.go

View check run for this annotation

Codecov / codecov/patch

src/jobservice/job/impl/gc/garbage_collection.go#L764-L765

Added lines #L764 - L765 were not covered by tests

var blobs []*blobModels.Blob
for _, art := range arts {
artBlobs, err := gc.blobMgr.GetByArt(ctx.SystemContext(), art.Digest)
if err != nil {
return nil, err
}

Check warning on line 772 in src/jobservice/job/impl/gc/garbage_collection.go

View check run for this annotation

Codecov / codecov/patch

src/jobservice/job/impl/gc/garbage_collection.go#L769-L772

Added lines #L769 - L772 were not covered by tests

blobs = append(blobs, artBlobs...)

Check warning on line 774 in src/jobservice/job/impl/gc/garbage_collection.go

View check run for this annotation

Codecov / codecov/patch

src/jobservice/job/impl/gc/garbage_collection.go#L774

Added line #L774 was not covered by tests
}

return blobs, nil
}

func saveGCRes(ctx job.Context, sweepSize, blobs, manifests int64) error {
gcObj := struct {
SweepSize int64 `json:"freed_space"`
Expand Down
2 changes: 2 additions & 0 deletions src/jobservice/job/impl/gc/garbage_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (suite *gcTestSuite) TestRun() {
}, nil)
suite.artifactCtl.On("Delete").Return(nil)
suite.artrashMgr.On("Filter").Return([]model.ArtifactTrash{}, nil)
suite.artrashMgr.On("List").Return([]model.ArtifactTrash{}, nil)

mock.OnAnything(suite.projectCtl, "List").Return([]*proModels.Project{
{
Expand Down Expand Up @@ -309,6 +310,7 @@ func (suite *gcTestSuite) TestMark() {
ManifestMediaType: schema2.MediaTypeManifest,
},
}, nil)
suite.artrashMgr.On("List").Return([]model.ArtifactTrash{}, nil)

mock.OnAnything(suite.projectCtl, "List").Return([]*proModels.Project{
{
Expand Down
18 changes: 18 additions & 0 deletions src/pkg/artifactrash/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
)

Expand All @@ -34,6 +35,8 @@ type DAO interface {
Filter(ctx context.Context, cutOff time.Time) (arts []model.ArtifactTrash, err error)
// Flush cleans the trash table record, which creation_time must be less than or equal to the cut-off.
Flush(ctx context.Context, cutOff time.Time) (err error)
// List lists the artifact trash by query.
List(ctx context.Context, query *q.Query) (arts []model.ArtifactTrash, err error)
}

// New returns an instance of the default DAO
Expand Down Expand Up @@ -110,3 +113,18 @@ func (d *dao) Flush(ctx context.Context, cutOff time.Time) (err error) {
}
return nil
}

// List lists the artifact trash by query.
func (d *dao) List(ctx context.Context, query *q.Query) (arts []model.ArtifactTrash, err error) {
arts = []model.ArtifactTrash{}
qs, err := orm.QuerySetter(ctx, &model.ArtifactTrash{}, query)
if err != nil {
return nil, err
}

Check warning on line 123 in src/pkg/artifactrash/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/artifactrash/dao/dao.go#L122-L123

Added lines #L122 - L123 were not covered by tests

if _, err = qs.All(&arts); err != nil {
return nil, err
}

Check warning on line 127 in src/pkg/artifactrash/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/artifactrash/dao/dao.go#L126-L127

Added lines #L126 - L127 were not covered by tests

return arts, nil
}
35 changes: 35 additions & 0 deletions src/pkg/artifactrash/dao/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

errors "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
artdao "github.com/goharbor/harbor/src/pkg/artifact/dao"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
htesting "github.com/goharbor/harbor/src/testing"
Expand Down Expand Up @@ -188,6 +189,40 @@ func (d *daoTestSuite) TestFlush() {
d.Require().Nil(err)
}

func (d *daoTestSuite) TestList() {
digest := d.Suite.DigestString()
aft1 := &model.ArtifactTrash{
ManifestMediaType: v1.MediaTypeImageManifest,
RepositoryName: "projectA/hello-world",
Digest: digest,
}
aft2 := &model.ArtifactTrash{
ManifestMediaType: v1.MediaTypeImageManifest,
RepositoryName: "projectB/hello-world",
Digest: digest,
}
// create aft1 and aft2
id1, err := d.dao.Create(d.ctx, aft1)
d.Require().Nil(err)
defer d.dao.Delete(d.ctx, id1)

id2, err := d.dao.Create(d.ctx, aft2)
d.Require().Nil(err)
defer d.dao.Delete(d.ctx, id2)

// lists afts by id
query := &q.Query{
Keywords: map[string]interface{}{
"id": &q.OrList{Values: []interface{}{id1, id2}},
},
}
afts, err := d.dao.List(d.ctx, query)
d.Require().Nil(err)
d.Require().Equal(2, len(afts))
d.Require().Equal("projectA/hello-world", afts[0].RepositoryName)
d.Require().Equal("projectB/hello-world", afts[1].RepositoryName)
}

func TestDaoTestSuite(t *testing.T) {
suite.Run(t, &daoTestSuite{})
}
7 changes: 7 additions & 0 deletions src/pkg/artifactrash/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/artifactrash/dao"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
)
Expand All @@ -39,6 +40,8 @@ type Manager interface {
// Flush cleans the trash table record, which creation_time is not in the time window.
// The unit of timeWindow is hour, the represent cut-off is time.now() - timeWindow * time.Hours
Flush(ctx context.Context, timeWindow int64) (err error)
// List lists the artifact trash by query.
List(ctx context.Context, query *q.Query) (arts []model.ArtifactTrash, err error)
}

// NewManager returns an instance of the default manager
Expand Down Expand Up @@ -67,3 +70,7 @@ func (m *manager) Filter(ctx context.Context, timeWindow int64) (arts []model.Ar
func (m *manager) Flush(ctx context.Context, timeWindow int64) (err error) {
return m.dao.Flush(ctx, time.Now().Add(-time.Duration(timeWindow)*time.Hour))
}

func (m *manager) List(ctx context.Context, query *q.Query) (arts []model.ArtifactTrash, err error) {
return m.dao.List(ctx, query)
}
23 changes: 23 additions & 0 deletions src/pkg/artifactrash/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package artifactrash

import (
"context"
"testing"
"time"

v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
)

Expand All @@ -31,6 +33,10 @@ func (f *fakeDao) Flush(ctx context.Context, timeWindow time.Time) (err error) {
args := f.Called()
return args.Error(0)
}
func (f *fakeDao) List(ctx context.Context, query *q.Query) (arts []model.ArtifactTrash, err error) {
args := f.Called()
return args.Get(0).([]model.ArtifactTrash), args.Error(1)
}

type managerTestSuite struct {
suite.Suite
Expand Down Expand Up @@ -83,3 +89,20 @@ func (m *managerTestSuite) TestFlush() {
m.Require().Nil(err)
m.dao.AssertExpectations(m.T())
}

func (m *managerTestSuite) TestList() {
m.dao.On("List", mock.Anything).Return([]model.ArtifactTrash{
{
ManifestMediaType: v1.MediaTypeImageManifest,
RepositoryName: "test/hello-world",
Digest: "5678",
},
}, nil)
arts, err := m.mgr.List(nil, &q.Query{})
m.Require().Nil(err)
m.Equal(len(arts), 1)
}

func TestManagerTestSuite(t *testing.T) {
suite.Run(t, &managerTestSuite{})
}
7 changes: 7 additions & 0 deletions src/testing/pkg/artifactrash/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/stretchr/testify/mock"

"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
)

Expand Down Expand Up @@ -36,3 +37,9 @@ func (f *FakeManager) Flush(ctx context.Context, timeWindow int64) (err error) {
args := f.Called()
return args.Error(0)
}

// List ...
func (f *FakeManager) List(ctx context.Context, query *q.Query) (arts []model.ArtifactTrash, err error) {
args := f.Called()
return args.Get(0).([]model.ArtifactTrash), args.Error(1)
}

0 comments on commit c947027

Please sign in to comment.