Skip to content

Commit af9e20c

Browse files
authored
Moved bucket client pkg (#3555)
* Moved bucket client pkg Signed-off-by: Marco Pracucci <marco@pracucci.com> * Removed references to blocks storage from pkg/storage/backend Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed blocksconvert Signed-off-by: Marco Pracucci <marco@pracucci.com> * Renamed pkg/storage/backend to pkg/storage/bucket Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed Makefile Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed integration tests Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent 5fa5cb3 commit af9e20c

35 files changed

+237
-214
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ lint:
177177
github.com/cortexproject/cortex/pkg/frontend/v2" \
178178
./pkg/querier/...
179179
faillint -paths "github.com/cortexproject/cortex/pkg/querier/..." ./pkg/scheduler/...
180+
faillint -paths "github.com/cortexproject/cortex/pkg/storage/tsdb/..." ./pkg/storage/bucket/...
180181

181182
# Validate Kubernetes spec files. Requires:
182183
# https://kubeval.instrumenta.dev

integration/e2ecortex/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
"github.com/cortexproject/cortex/integration/e2e"
1313
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
14-
"github.com/cortexproject/cortex/pkg/storage/backend/s3"
14+
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
1515
"github.com/cortexproject/cortex/pkg/util/flagext"
1616
)
1717

pkg/compactor/blocks_cleaner.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/thanos-io/thanos/pkg/compact"
1717
"github.com/thanos-io/thanos/pkg/objstore"
1818

19+
"github.com/cortexproject/cortex/pkg/storage/bucket"
1920
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2021
"github.com/cortexproject/cortex/pkg/util"
2122
"github.com/cortexproject/cortex/pkg/util/concurrency"
@@ -128,7 +129,7 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context) error {
128129

129130
func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error {
130131
userLogger := util.WithUserID(userID, c.logger)
131-
userBucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)
132+
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient)
132133

133134
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, c.cfg.DeletionDelay, c.cfg.MetaSyncConcurrency)
134135

@@ -178,7 +179,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error {
178179
return nil
179180
}
180181

181-
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, userBucket *cortex_tsdb.UserBucketClient, userLogger log.Logger) {
182+
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
182183
for blockID, blockErr := range partials {
183184
// We can safely delete only blocks which are partial because the meta.json is missing.
184185
if blockErr != block.ErrorSyncMetaNotFound {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"github.com/stretchr/testify/require"
1919
"github.com/thanos-io/thanos/pkg/block/metadata"
2020

21-
"github.com/cortexproject/cortex/pkg/storage/backend/filesystem"
21+
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
2222
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2323
"github.com/cortexproject/cortex/pkg/util/services"
2424
)

pkg/compactor/compactor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/thanos-io/thanos/pkg/objstore"
2424

2525
"github.com/cortexproject/cortex/pkg/ring"
26+
"github.com/cortexproject/cortex/pkg/storage/bucket"
2627
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2728
"github.com/cortexproject/cortex/pkg/util"
2829
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -156,7 +157,7 @@ type Compactor struct {
156157
// NewCompactor makes a new Compactor.
157158
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
158159
createDependencies := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) {
159-
bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
160+
bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
160161
if err != nil {
161162
return nil, nil, nil, errors.Wrap(err, "failed to create the bucket client")
162163
}
@@ -459,7 +460,7 @@ func (c *Compactor) compactUsers(ctx context.Context) error {
459460
}
460461

461462
func (c *Compactor) compactUser(ctx context.Context, userID string) error {
462-
bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)
463+
bucket := bucket.NewUserBucketClient(userID, c.bucketClient)
463464

464465
reg := prometheus.NewRegistry()
465466
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)

pkg/compactor/compactor_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
"github.com/cortexproject/cortex/pkg/ring"
3333
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
34+
"github.com/cortexproject/cortex/pkg/storage/bucket"
3435
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
3536
"github.com/cortexproject/cortex/pkg/util/concurrency"
3637
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -122,7 +123,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
122123
t.Parallel()
123124

124125
// No user blocks stored in the bucket.
125-
bucketClient := &cortex_tsdb.BucketClientMock{}
126+
bucketClient := &bucket.ClientMock{}
126127
bucketClient.MockIter("", []string{}, nil)
127128

128129
c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
@@ -265,7 +266,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
265266
t.Parallel()
266267

267268
// Fail to iterate over the bucket while discovering users.
268-
bucketClient := &cortex_tsdb.BucketClientMock{}
269+
bucketClient := &bucket.ClientMock{}
269270
bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket"))
270271

271272
c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
@@ -415,7 +416,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
415416
t.Parallel()
416417

417418
// Mock the bucket to contain two users, each one with one block.
418-
bucketClient := &cortex_tsdb.BucketClientMock{}
419+
bucketClient := &bucket.ClientMock{}
419420
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
420421
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
421422
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
@@ -521,7 +522,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
521522
cfg.DeletionDelay = 10 * time.Minute // Delete block after 10 minutes
522523

523524
// Mock the bucket to contain two users, each one with one block.
524-
bucketClient := &cortex_tsdb.BucketClientMock{}
525+
bucketClient := &bucket.ClientMock{}
525526
bucketClient.MockIter("", []string{"user-1"}, nil)
526527
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
527528

@@ -624,7 +625,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
624625
t.Parallel()
625626

626627
// Mock the bucket to contain two users, each one with one block.
627-
bucketClient := &cortex_tsdb.BucketClientMock{}
628+
bucketClient := &bucket.ClientMock{}
628629
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
629630
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
630631
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
@@ -698,7 +699,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
698699
}
699700

700701
// Mock the bucket to contain all users, each one with one block.
701-
bucketClient := &cortex_tsdb.BucketClientMock{}
702+
bucketClient := &bucket.ClientMock{}
702703
bucketClient.MockIter("", userIDs, nil)
703704
for _, userID := range userIDs {
704705
bucketClient.MockIter(userID+"/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)

pkg/cortex/cortex_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
"github.com/cortexproject/cortex/pkg/ring"
1313
"github.com/cortexproject/cortex/pkg/ring/kv"
1414
"github.com/cortexproject/cortex/pkg/ruler"
15-
"github.com/cortexproject/cortex/pkg/storage/backend/s3"
15+
"github.com/cortexproject/cortex/pkg/storage/bucket"
16+
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
1617
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1718
"github.com/cortexproject/cortex/pkg/util/flagext"
1819
"github.com/cortexproject/cortex/pkg/util/services"
@@ -28,8 +29,8 @@ func TestCortex(t *testing.T) {
2829
},
2930
Ingester: ingester.Config{
3031
BlocksStorageConfig: tsdb.BlocksStorageConfig{
31-
Bucket: tsdb.BucketConfig{
32-
Backend: tsdb.BackendS3,
32+
Bucket: bucket.Config{
33+
Backend: bucket.S3,
3334
S3: s3.Config{
3435
Endpoint: "localhost",
3536
},
@@ -46,8 +47,8 @@ func TestCortex(t *testing.T) {
4647
},
4748
},
4849
BlocksStorage: tsdb.BlocksStorageConfig{
49-
Bucket: tsdb.BucketConfig{
50-
Backend: tsdb.BackendS3,
50+
Bucket: bucket.Config{
51+
Backend: bucket.S3,
5152
S3: s3.Config{
5253
Endpoint: "localhost",
5354
},

pkg/ingester/ingester_v2.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/cortexproject/cortex/pkg/ingester/client"
3131
"github.com/cortexproject/cortex/pkg/ring"
32+
"github.com/cortexproject/cortex/pkg/storage/bucket"
3233
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
3334
"github.com/cortexproject/cortex/pkg/tenant"
3435
"github.com/cortexproject/cortex/pkg/util"
@@ -373,7 +374,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
373374

374375
// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.
375376
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
376-
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
377+
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
377378
if err != nil {
378379
return nil, errors.Wrap(err, "failed to create the bucket client")
379380
}
@@ -426,7 +427,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
426427
// Special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react
427428
// on Flush method and flush all openened TSDBs when called.
428429
func NewV2ForFlusher(cfg Config, registerer prometheus.Registerer) (*Ingester, error) {
429-
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
430+
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
430431
if err != nil {
431432
return nil, errors.Wrap(err, "failed to create the bucket client")
432433
}
@@ -1259,7 +1260,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
12591260
userLogger,
12601261
tsdbPromReg,
12611262
udir,
1262-
cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket),
1263+
bucket.NewUserBucketClient(userID, i.TSDBState.bucket),
12631264
func() labels.Labels { return l },
12641265
metadata.ReceiveSource,
12651266
false, // No need to upload compacted blocks. Cortex compactor takes care of that.

pkg/querier/blocks_scanner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/thanos-io/thanos/pkg/block/metadata"
2121
"github.com/thanos-io/thanos/pkg/objstore"
2222

23+
"github.com/cortexproject/cortex/pkg/storage/bucket"
2324
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2425
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2526
"github.com/cortexproject/cortex/pkg/storegateway"
@@ -361,7 +362,7 @@ func (d *BlocksScanner) getOrCreateMetaFetcher(userID string) (block.MetadataFet
361362

362363
func (d *BlocksScanner) createMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) {
363364
userLogger := util.WithUserID(userID, d.logger)
364-
userBucket := cortex_tsdb.NewUserBucketClient(userID, d.bucketClient)
365+
userBucket := bucket.NewUserBucketClient(userID, d.bucketClient)
365366
userReg := prometheus.NewRegistry()
366367

367368
// The following filters have been intentionally omitted:

pkg/querier/blocks_scanner_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
"github.com/thanos-io/thanos/pkg/block/metadata"
2424
"github.com/thanos-io/thanos/pkg/objstore"
2525

26-
"github.com/cortexproject/cortex/pkg/storage/backend/filesystem"
27-
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
26+
"github.com/cortexproject/cortex/pkg/storage/bucket"
27+
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
2828
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2929
"github.com/cortexproject/cortex/pkg/util/services"
3030
)
@@ -86,7 +86,7 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) {
8686
defer os.RemoveAll(cacheDir) //nolint: errcheck
8787

8888
ctx := context.Background()
89-
bucket := &cortex_tsdb.BucketClientMock{}
89+
bucket := &bucket.ClientMock{}
9090
reg := prometheus.NewPedanticRegistry()
9191

9292
cfg := prepareBlocksScannerConfig()
@@ -139,7 +139,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T)
139139
tenantIDs := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
140140

141141
// Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket.
142-
bucket := &cortex_tsdb.BucketClientMock{}
142+
bucket := &bucket.ClientMock{}
143143
bucket.MockIter("", tenantIDs, nil)
144144
for _, tenantID := range tenantIDs {
145145
bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() {
@@ -177,7 +177,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T)
177177
}
178178

179179
// Mock the bucket to introduce a 1s sleep while syncing each block in the bucket.
180-
bucket := &cortex_tsdb.BucketClientMock{}
180+
bucket := &bucket.ClientMock{}
181181
bucket.MockIter("", []string{"user-1"}, nil)
182182
bucket.MockIter("user-1/", blockPaths, nil)
183183
bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) {

0 commit comments

Comments
 (0)