Skip to content

Moved bucket client pkg #3555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ lint:
github.com/cortexproject/cortex/pkg/frontend/v2" \
./pkg/querier/...
faillint -paths "github.com/cortexproject/cortex/pkg/querier/..." ./pkg/scheduler/...
faillint -paths "github.com/cortexproject/cortex/pkg/storage/tsdb/..." ./pkg/storage/bucket/...

# Validate Kubernetes spec files. Requires:
# https://kubeval.instrumenta.dev
Expand Down
2 changes: 1 addition & 1 deletion integration/e2ecortex/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/pkg/storage/backend/s3"
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

Expand Down
5 changes: 3 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
Expand Down Expand Up @@ -128,7 +129,7 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context) error {

func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error {
userLogger := util.WithUserID(userID, c.logger)
userBucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient)

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, c.cfg.DeletionDelay, c.cfg.MetaSyncConcurrency)

Expand Down Expand Up @@ -178,7 +179,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error {
return nil
}

func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, userBucket *cortex_tsdb.UserBucketClient, userLogger log.Logger) {
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
for blockID, blockErr := range partials {
// We can safely delete only blocks which are partial because the meta.json is missing.
if blockErr != block.ErrorSyncMetaNotFound {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/backend/filesystem"
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down
5 changes: 3 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -156,7 +157,7 @@ type Compactor struct {
// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
createDependencies := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to create the bucket client")
}
Expand Down Expand Up @@ -459,7 +460,7 @@ func (c *Compactor) compactUsers(ctx context.Context) error {
}

func (c *Compactor) compactUser(ctx context.Context, userID string) error {
bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)
bucket := bucket.NewUserBucketClient(userID, c.bucketClient)

reg := prometheus.NewRegistry()
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)
Expand Down
13 changes: 7 additions & 6 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
t.Parallel()

// No user blocks stored in the bucket.
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{}, nil)

c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
t.Parallel()

// Fail to iterate over the bucket while discovering users.
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket"))

c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
t.Parallel()

// Mock the bucket to contain two users, each one with one block.
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
Expand Down Expand Up @@ -521,7 +522,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
cfg.DeletionDelay = 10 * time.Minute // Delete block after 10 minutes

// Mock the bucket to contain two users, each one with one block.
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)

Expand Down Expand Up @@ -624,7 +625,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
t.Parallel()

// Mock the bucket to contain two users, each one with one block.
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
Expand Down Expand Up @@ -698,7 +699,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
}

// Mock the bucket to contain all users, each one with one block.
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", userIDs, nil)
for _, userID := range userIDs {
bucketClient.MockIter(userID+"/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
Expand Down
11 changes: 6 additions & 5 deletions pkg/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storage/backend/s3"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand All @@ -28,8 +29,8 @@ func TestCortex(t *testing.T) {
},
Ingester: ingester.Config{
BlocksStorageConfig: tsdb.BlocksStorageConfig{
Bucket: tsdb.BucketConfig{
Backend: tsdb.BackendS3,
Bucket: bucket.Config{
Backend: bucket.S3,
S3: s3.Config{
Endpoint: "localhost",
},
Expand All @@ -46,8 +47,8 @@ func TestCortex(t *testing.T) {
},
},
BlocksStorage: tsdb.BlocksStorageConfig{
Bucket: tsdb.BucketConfig{
Backend: tsdb.BackendS3,
Bucket: bucket.Config{
Backend: bucket.S3,
S3: s3.Config{
Endpoint: "localhost",
},
Expand Down
7 changes: 4 additions & 3 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -373,7 +374,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer

// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
if err != nil {
return nil, errors.Wrap(err, "failed to create the bucket client")
}
Expand Down Expand Up @@ -426,7 +427,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
// Special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react
// on Flush method and flush all openened TSDBs when called.
func NewV2ForFlusher(cfg Config, registerer prometheus.Registerer) (*Ingester, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer)
if err != nil {
return nil, errors.Wrap(err, "failed to create the bucket client")
}
Expand Down Expand Up @@ -1259,7 +1260,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
userLogger,
tsdbPromReg,
udir,
cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket),
bucket.NewUserBucketClient(userID, i.TSDBState.bucket),
func() labels.Labels { return l },
metadata.ReceiveSource,
false, // No need to upload compacted blocks. Cortex compactor takes care of that.
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/blocks_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storegateway"
Expand Down Expand Up @@ -361,7 +362,7 @@ func (d *BlocksScanner) getOrCreateMetaFetcher(userID string) (block.MetadataFet

func (d *BlocksScanner) createMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) {
userLogger := util.WithUserID(userID, d.logger)
userBucket := cortex_tsdb.NewUserBucketClient(userID, d.bucketClient)
userBucket := bucket.NewUserBucketClient(userID, d.bucketClient)
userReg := prometheus.NewRegistry()

// The following filters have been intentionally omitted:
Expand Down
10 changes: 5 additions & 5 deletions pkg/querier/blocks_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/backend/filesystem"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) {
defer os.RemoveAll(cacheDir) //nolint: errcheck

ctx := context.Background()
bucket := &cortex_tsdb.BucketClientMock{}
bucket := &bucket.ClientMock{}
reg := prometheus.NewPedanticRegistry()

cfg := prepareBlocksScannerConfig()
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T)
tenantIDs := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}

// Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket.
bucket := &cortex_tsdb.BucketClientMock{}
bucket := &bucket.ClientMock{}
bucket.MockIter("", tenantIDs, nil)
for _, tenantID := range tenantIDs {
bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() {
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T)
}

// Mock the bucket to introduce a 1s sleep while syncing each block in the bucket.
bucket := &cortex_tsdb.BucketClientMock{}
bucket := &bucket.ClientMock{}
bucket.MockIter("", []string{"user-1"}, nil)
bucket.MockIter("user-1/", blockPaths, nil)
bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storegateway"
Expand Down Expand Up @@ -153,7 +154,7 @@ func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consist
func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
var stores BlocksStoreSet

bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg.Bucket, "querier", logger, reg)
bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, "querier", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create bucket client")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`
}

// RegisterFlags registers the flags for TSDB Azure storage
// RegisterFlags registers the flags for Azure storage
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("blocks-storage.", f)
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix registers the flags for TSDB Azure storage
// RegisterFlagsWithPrefix registers the flags for Azure storage
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key")
Expand Down
Loading