Skip to content

Commit

Permalink
Fetcher: Add a BlockIDsFetcher Interface to BaseFetcher (#6902)
Browse files Browse the repository at this point in the history
* add BlockIDsFetcher to BaseFetcher

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* fix lint

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* use chan in the interface method to accept active block ids

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* fix comments

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* fix lint

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* add description of active and parital blocks and modify changelog

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* fix interface description

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

* remove entry in changelog

Signed-off-by: Wen Xu <wenxuamz@amazon.com>

---------

Signed-off-by: Wen Xu <wenxuamz@amazon.com>
  • Loading branch information
wenxu1024 authored Nov 27, 2023
1 parent 5f9f166 commit 8ffb9da
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 69 deletions.
3 changes: 2 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
})
Expand Down
6 changes: 4 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down Expand Up @@ -196,7 +197,8 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ func runStore(
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
18 changes: 12 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}
Expand Down Expand Up @@ -407,7 +408,8 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}
Expand Down Expand Up @@ -508,7 +510,8 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -651,7 +654,8 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down Expand Up @@ -829,7 +833,8 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -1371,7 +1376,8 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
112 changes: 70 additions & 42 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,52 @@ func DefaultModifiedLabelValues() [][]string {
}
}

// Fetcher interface to retieve blockId information from a bucket.
type BlockIDsFetcher interface {
// GetActiveBlocksIDs returning it via channel (streaming) and response.
// Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error)
}

type BaseBlockIDsFetcher struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher {
return &BaseBlockIDsFetcher{
logger: logger,
bkt: bkt,
}
}

func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
partialBlocks = make(map[ulid.ULID]bool)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if _, ok := partialBlocks[id]; !ok {
partialBlocks[id] = true
}
if !IsBlockMetaFile(file) {
return nil
}
partialBlocks[id] = false

select {
case <-ctx.Done():
return ctx.Err()
case ch <- id:
}
return nil
}, objstore.WithRecursiveIter)
return partialBlocks, err
}

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
Expand All @@ -188,9 +234,10 @@ type MetadataFilter interface {
// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Go-routine safe.
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsFetcher BlockIDsFetcher

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -202,12 +249,12 @@ type BaseFetcher struct {
}

// NewBaseFetcher constructs BaseFetcher.
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
return NewBaseFetcherWithMetrics(logger, concurrency, bkt, dir, NewBaseFetcherMetrics(reg))
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg))
}

// NewBaseFetcherWithMetrics constructs BaseFetcher.
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -221,33 +268,34 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.
}

return &BaseFetcher{
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsFetcher: blockIDsFetcher,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
}, nil
}

// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads.
// NOTE: Not suitable to use in production.
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, "", nil, nil)
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil)
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg)
if err != nil {
return nil, err
}
return b.NewMetaFetcher(reg, filters), nil
}

// NewMetaFetcherWithMetrics returns meta fetcher.
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, dir, baseFetcherMetrics)
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -392,33 +440,13 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
})
}

partialBlocks := make(map[ulid.ULID]bool)
var partialBlocks map[ulid.ULID]bool
var err error
// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
return f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if _, ok := partialBlocks[id]; !ok {
partialBlocks[id] = true
}
if !IsBlockMetaFile(file) {
return nil
}
partialBlocks[id] = false

select {
case <-ctx.Done():
return ctx.Err()
case ch <- id:
}

return nil
}, objstore.WithRecursiveIter)
partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
return err
})

if err := eg.Wait(); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, objstore.WithNoopInstr(bkt), dir, r)
noopLogger := log.NewNopLogger()
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt)
baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r)
testutil.Ok(t, err)

fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
logger := log.NewNopLogger()

metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

// 1. No meta, old block, should be removed.
Expand Down
12 changes: 9 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(nil, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
})
testutil.Ok(t, err)
Expand Down Expand Up @@ -194,7 +196,9 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
Expand Down Expand Up @@ -504,7 +508,9 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
Expand Down
2 changes: 2 additions & 0 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,12 @@ func newMetaFetcher(
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
}
baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt)
return thanosblock.NewMetaFetcher(
logger,
concurrency,
fromBkt,
baseBlockIDsFetcher,
"",
reg,
filters,
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,9 @@ func TestBucketStore_Acceptance(t *testing.T) {
chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(tt, err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
maxTime: maxTime,
}

metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(s.logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(s.logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
})
Expand Down
Loading

0 comments on commit 8ffb9da

Please sign in to comment.