Skip to content

StoreGateway: Allow filtering out recent blocks during sync #5166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [FEATURE] Added zstd as an option for grpc compression #5092
* [FEATURE] Ring: Add new kv store option `dynamodb`. #5026
* [FEATURE] Cache: Support redis as backend for caching bucket and index cache. #5057
* [FEATURE] Querier/Store-Gateway: Added `-blocks-storage.bucket-store.ignore-blocks-within` allowing to filter out the recently created blocks from being synced by queriers and store-gateways. #5166
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055
Expand Down
9 changes: 9 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,15 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: <duration> | default = 6h]

# The blocks created since `now() - ignore_blocks_within` will not be
# synced. This should be used together with `-querier.query-store-after` to
# filter out the blocks that are too new to be queried. A reasonable value
# for this flag would be `-querier.query-store-after -
# blocks-storage.bucket-store.bucket-index.max-stale-period` to give some
# buffer. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

bucket_index:
# True to enable querier and store-gateway to discover blocks in the
# storage via bucket index instead of bucket scanning.
Expand Down
9 changes: 9 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,15 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: <duration> | default = 6h]

# The blocks created since `now() - ignore_blocks_within` will not be
# synced. This should be used together with `-querier.query-store-after` to
# filter out the blocks that are too new to be queried. A reasonable value
# for this flag would be `-querier.query-store-after -
# blocks-storage.bucket-store.bucket-index.max-stale-period` to give some
# buffer. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

bucket_index:
# True to enable querier and store-gateway to discover blocks in the
# storage via bucket index instead of bucket scanning.
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4024,6 +4024,15 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.ignore-deletion-marks-delay
[ignore_deletion_mark_delay: <duration> | default = 6h]

# The blocks created since `now() - ignore_blocks_within` will not be synced.
# This should be used together with `-querier.query-store-after` to filter out
# the blocks that are too new to be queried. A reasonable value for this flag
# would be `-querier.query-store-after -
# blocks-storage.bucket-store.bucket-index.max-stale-period` to give some
# buffer. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

bucket_index:
# True to enable querier and store-gateway to discover blocks in the storage
# via bucket index instead of bucket scanning.
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/blocks_finder_bucket_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type BucketIndexBlocksFinderConfig struct {
IndexLoader bucketindex.LoaderConfig
MaxStalePeriod time.Duration
IgnoreDeletionMarksDelay time.Duration
IgnoreBlocksWithin time.Duration
}

// BucketIndexBlocksFinder implements BlocksFinder interface and find blocks in the bucket
Expand Down
15 changes: 13 additions & 2 deletions pkg/querier/blocks_finder_bucket_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) {
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

// Mock a bucket index.
now := time.Now()
block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 15}
block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil), MinTime: 12, MaxTime: 20}
block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil), MinTime: 20, MaxTime: 30}
block4 := &bucketindex.Block{ID: ulid.MustNew(4, nil), MinTime: 30, MaxTime: 40}
block5 := &bucketindex.Block{ID: ulid.MustNew(5, nil), MinTime: 30, MaxTime: 40} // Time range overlaps with block4, but this block deletion mark is above the threshold.
block5 := &bucketindex.Block{ID: ulid.MustNew(5, nil), MinTime: 30, MaxTime: 40} // Time range overlaps with block4, but this block deletion mark is above the threshold.
block6 := &bucketindex.Block{ID: ulid.MustNew(6, nil), MinTime: now.Add(-2 * time.Hour).UnixMilli(), MaxTime: now.UnixMilli()} // This block is within ignoreBlocksWithin and shouldn't be loaded.
mark3 := &bucketindex.BlockDeletionMark{ID: block3.ID, DeletionTime: time.Now().Unix()}
mark5 := &bucketindex.BlockDeletionMark{ID: block5.ID, DeletionTime: time.Now().Add(-2 * time.Hour).Unix()}

require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{
Version: bucketindex.IndexVersion1,
Blocks: bucketindex.Blocks{block1, block2, block3, block4, block5},
Blocks: bucketindex.Blocks{block1, block2, block3, block4, block5, block6},
BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark3, mark5},
UpdatedAt: time.Now().Unix(),
}))
Expand Down Expand Up @@ -102,6 +104,14 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) {
block3.ID: mark3,
},
},
"query range matching all blocks but should ignore non-queryable block": {
minT: 0,
maxT: block5.MaxTime,
expectedBlocks: bucketindex.Blocks{block4, block3, block2, block1},
expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{
block3.ID: mark3,
},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -209,6 +219,7 @@ func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIn
},
MaxStalePeriod: time.Hour,
IgnoreDeletionMarksDelay: time.Hour,
IgnoreBlocksWithin: 10 * time.Hour,
}

finder := NewBucketIndexBlocksFinder(cfg, bkt, nil, log.NewNopLogger(), nil)
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type BucketScanBlocksFinderConfig struct {
CacheDir string
ConsistencyDelay time.Duration
IgnoreDeletionMarksDelay time.Duration
IgnoreBlocksWithin time.Duration
}

// BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks.
Expand Down Expand Up @@ -378,6 +379,11 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
deletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, d.cfg.IgnoreDeletionMarksDelay, d.cfg.MetasConcurrency)
filters := []block.MetadataFilter{deletionMarkFilter}

// Here we filter out the blocks that are too new to query.
if d.cfg.IgnoreBlocksWithin > 0 {
filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin))
}

f, err := block.NewMetaFetcher(
userLogger,
d.cfg.MetasConcurrency,
Expand Down
11 changes: 11 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,12 @@ func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) {
ctx := context.Background()
s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())

now := time.Now()
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 15)
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 12, 20)
block3 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
block4 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 30, 40)
block5 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", now.Add(-2*time.Hour).UnixMilli(), now.UnixMilli()) // This block is within ignoreBlocksWithin
mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block3))

require.NoError(t, services.StartAndAwaitRunning(ctx, s))
Expand Down Expand Up @@ -451,6 +453,14 @@ func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) {
block3.ULID: mark3,
},
},
"query range matching all blocks but should ignore non-queryable block": {
minT: 0,
maxT: block5.MaxTime,
expectedMetas: []tsdb.BlockMeta{block4, block3, block2, block1},
expectedMarks: map[ulid.ULID]*bucketindex.BlockDeletionMark{
block3.ULID: mark3,
},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -488,5 +498,6 @@ func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig {
TenantsConcurrency: 10,
MetasConcurrency: 10,
IgnoreDeletionMarksDelay: time.Hour,
IgnoreBlocksWithin: 10 * time.Hour, // All blocks created in the last 10 hour shoudn't be scanned.
}
}
2 changes: 2 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
},
MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod,
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
}, bucketClient, limits, logger, reg)
} else {
finder = NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{
Expand All @@ -206,6 +207,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency,
CacheDir: storageCfg.BucketStore.SyncDir,
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
}, bucketClient, limits, logger, reg)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ type BucketStoreConfig struct {
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`

// Chunk pool.
Expand Down Expand Up @@ -282,6 +283,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.IgnoreDeletionMarksDelay, "blocks-storage.bucket-store.ignore-deletion-marks-delay", time.Hour*6, "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"Default is 6h, half of the default value for -compactor.deletion-delay.")
f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "The blocks created since `now() - ignore_blocks_within` will not be synced. This should be used together with `-querier.query-store-after` to filter out the blocks that are too new to be queried. A reasonable value for this flag would be `-querier.query-store-after - blocks-storage.bucket-store.bucket-index.max-stale-period` to give some buffer. 0 to disable.")
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", store.DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazily memory-map an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will release memory-mapped index-headers after 'idle timeout' inactivity.")
Expand Down
5 changes: 5 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
// Remove Cortex external labels so that they're not injected when querying blocks.
}...)

if u.cfg.BucketStore.IgnoreBlocksWithin > 0 {
// Filter out blocks that are too new to be queried.
filters = append(filters, NewIgnoreNonQueryableBlocksFilter(userLogger, u.cfg.BucketStore.IgnoreBlocksWithin))
}

// Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not.
var fetcher block.MetadataFetcher
if u.cfg.BucketStore.BucketIndex.Enabled {
Expand Down
30 changes: 30 additions & 0 deletions pkg/storegateway/metadata_fetcher_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -75,3 +76,32 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta

return nil
}

func NewIgnoreNonQueryableBlocksFilter(logger log.Logger, ignoreWithin time.Duration) *IgnoreNonQueryableBlocksFilter {
return &IgnoreNonQueryableBlocksFilter{
logger: logger,
ignoreWithin: ignoreWithin,
}
}

// IgnoreNonQueryableBlocksFilter ignores blocks that are too new be queried.
// This has be used in conjunction with `-querier.query-store-after` with some buffer.
type IgnoreNonQueryableBlocksFilter struct {
// Blocks that were created since `now() - ignoreWithin` will not be synced.
ignoreWithin time.Duration
logger log.Logger
}

// Filter implements block.MetadataFilter.
func (f *IgnoreNonQueryableBlocksFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
ignoreWithin := time.Now().Add(-f.ignoreWithin).UnixMilli()

for id, m := range metas {
if m.MinTime > ignoreWithin {
level.Debug(f.logger).Log("msg", "ignoring block because it won't be queried", "id", id)
delete(metas, id)
}
}

return nil
}
64 changes: 64 additions & 0 deletions pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"

"github.com/prometheus/prometheus/tsdb"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
Expand Down Expand Up @@ -106,3 +108,65 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks())
}

func TestIgnoreNonQueryableBlocksFilter(t *testing.T) {
now := time.Now()
ctx := context.Background()
logger := log.NewNopLogger()

inputMetas := map[ulid.ULID]*metadata.Meta{
ulid.MustNew(1, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-2 * time.Hour).UnixMilli(),
MaxTime: now.Add(-0 * time.Hour).UnixMilli(),
},
},
ulid.MustNew(2, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-4 * time.Hour).UnixMilli(),
MaxTime: now.Add(-2 * time.Hour).UnixMilli(),
},
},
ulid.MustNew(3, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-6 * time.Hour).UnixMilli(),
MaxTime: now.Add(-4 * time.Hour).UnixMilli(),
},
},
ulid.MustNew(4, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-8 * time.Hour).UnixMilli(),
MaxTime: now.Add(-6 * time.Hour).UnixMilli(),
},
},
}

expectedMetas := map[ulid.ULID]*metadata.Meta{
ulid.MustNew(2, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-4 * time.Hour).UnixMilli(),
MaxTime: now.Add(-2 * time.Hour).UnixMilli(),
},
},
ulid.MustNew(3, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-6 * time.Hour).UnixMilli(),
MaxTime: now.Add(-4 * time.Hour).UnixMilli(),
},
},
ulid.MustNew(4, nil): {
BlockMeta: tsdb.BlockMeta{
MinTime: now.Add(-8 * time.Hour).UnixMilli(),
MaxTime: now.Add(-6 * time.Hour).UnixMilli(),
},
},
}

synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced"}, []string{"state"})
modified := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "modified"}, []string{"state"})

f := NewIgnoreNonQueryableBlocksFilter(logger, 3*time.Hour)

require.NoError(t, f.Filter(ctx, inputMetas, synced, modified))
assert.Equal(t, expectedMetas, inputMetas)
}