From 98a784da64fee9928e19337a60beb27c94934141 Mon Sep 17 00:00:00 2001 From: Aymeric Date: Fri, 8 Oct 2021 01:18:59 +0200 Subject: [PATCH] store: valide block sync concurrency parameter Must be equal or greater than 1 to avoid blocked program. --- CHANGELOG.md | 1 + cmd/thanos/store.go | 2 +- pkg/store/bucket.go | 17 +++++++++++++++++ pkg/store/bucket_test.go | 26 ++++++++++++++++++++++++++ 4 files changed, 45 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d67abc400..a2695830e66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races +- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: valide block sync concurrency parameter ### Added diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 19b6dbf8370..25310f9ef3b 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -113,7 +113,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").DurationVar(&sc.syncInterval) - cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d181f047315..d22c612bb1b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -92,6 +92,12 @@ const ( // Labels for metrics. labelEncode = "encode" labelDecode = "decode" + + minBlockSyncConcurrency = 1 +) + +var ( + errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") ) type bucketStoreMetrics struct { @@ -298,6 +304,13 @@ type BucketStore struct { enableSeriesResponseHints bool } +func (b *BucketStore) validate() error { + if b.blockSyncConcurrency < minBlockSyncConcurrency { + return errBlockSyncConcurrencyNotValid + } + return nil +} + type noopCache struct{} func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {} @@ -407,6 +420,10 @@ func NewBucketStore( s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too + if err := s.validate(); err != nil { + return nil, errors.Wrap(err, "validate config") + } + if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e635ab22c1e..24e499287e2 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -556,6 +556,32 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { } } +func TestBucketStoreConfig_validate(t *testing.T) { + tests := map[string]struct { + config BucketStore + expected error + }{ + "should pass on valid config": { + config: BucketStore{ + blockSyncConcurrency: 1, + }, + expected: nil, + }, + "should fail on blockSyncConcurrency < 1": { + config: BucketStore{ + blockSyncConcurrency: 0, + }, + expected: errBlockSyncConcurrencyNotValid, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + testutil.Equals(t, testData.expected, testData.config.validate()) + }) + } +} + func TestBucketStore_Info(t *testing.T) { defer testutil.TolerantVerifyLeak(t)