From 94255ac58ccfb68a82963d45ba912e17f3c784d0 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 13 Nov 2024 00:06:20 +0100 Subject: [PATCH] compactor: concurrently get deletion marks for blocks (#9881) * compactor: concurrently get deletion marks for blocks With a lot of blocks churn (for example a lot of OOO blocks) getting the deletion marks for newly discovered blocks can take a long time (13 minutes). This PR introduces changes the bucket index updater to request 16 deletions marks concurrently (same as the number of block deleted in parallel). Signed-off-by: Dimitar Dimitrov * Add CHANGELOG.md entry Signed-off-by: Dimitar Dimitrov * Restore error handling Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov Co-authored-by: Ganesh Vernekar --- CHANGELOG.md | 1 + pkg/compactor/blocks_cleaner.go | 20 +++-- pkg/compactor/blocks_cleaner_test.go | 89 ++++++++++--------- pkg/compactor/compactor.go | 15 ++-- pkg/storage/tsdb/bucketindex/storage_test.go | 4 +- pkg/storage/tsdb/bucketindex/updater.go | 28 ++++-- pkg/storage/tsdb/bucketindex/updater_test.go | 12 +-- pkg/storegateway/gateway_test.go | 2 +- .../metadata_fetcher_filters_test.go | 2 +- 9 files changed, 99 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8bb85614d0..377af9dc66a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ * [ENHANCEMENT] Ingester: improved lock contention affecting read and write latencies during TSDB head compaction. #9822 * [ENHANCEMENT] Distributor: when a label value fails validation due to invalid UTF-8 characters, don't include the invalid characters in the returned error. #9828 * [ENHANCEMENT] Ingester: when experimental ingest storage is enabled, do not buffer records in the Kafka client when fetch concurrency is in use. #9838 #9850 +* [ENHANCEMENT] Compactor: refresh deletion marks when updating the bucket index concurrently. This speeds up updating the bucket index by up to 16 times when there is a lot of blocks churn (thousands of blocks churning every cleanup cycle). #9881 * [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index eaca0a40c2f..55bfeffcdf1 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -33,17 +33,19 @@ import ( ) const ( - defaultDeleteBlocksConcurrency = 16 + defaultDeleteBlocksConcurrency = 16 + defaultGetDeletionMarkersConcurrency = 16 ) type BlocksCleanerConfig struct { - DeletionDelay time.Duration - CleanupInterval time.Duration - CleanupConcurrency int - TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". - DeleteBlocksConcurrency int - NoBlocksFileCleanupEnabled bool - CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs. + DeletionDelay time.Duration + CleanupInterval time.Duration + CleanupConcurrency int + TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". + DeleteBlocksConcurrency int + GetDeletionMarkersConcurrency int + NoBlocksFileCleanupEnabled bool + CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs. } type BlocksCleaner struct { @@ -432,7 +434,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger } // Generate an updated in-memory version of the bucket index. - w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, userLogger) + w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.cfg.GetDeletionMarkersConcurrency, userLogger) idx, partials, err := w.UpdateIndex(ctx, idx) if err != nil { return err diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index cd7dee90368..22305cdb8fb 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -102,11 +102,12 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here"))) cfg := BlocksCleanerConfig{ - DeletionDelay: deletionDelay, - CleanupInterval: time.Minute, - CleanupConcurrency: options.concurrency, - TenantCleanupDelay: options.tenantDeletionDelay, - DeleteBlocksConcurrency: 1, + DeletionDelay: deletionDelay, + CleanupInterval: time.Minute, + CleanupConcurrency: options.concurrency, + TenantCleanupDelay: options.tenantDeletionDelay, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } reg := prometheus.NewPedanticRegistry() @@ -245,10 +246,11 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { } cfg := BlocksCleanerConfig{ - DeletionDelay: deletionDelay, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, + DeletionDelay: deletionDelay, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } logger := log.NewNopLogger() @@ -305,10 +307,11 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}"))) cfg := BlocksCleanerConfig{ - DeletionDelay: deletionDelay, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, + DeletionDelay: deletionDelay, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } logger := log.NewNopLogger() @@ -354,10 +357,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar createTSDBBlock(t, bucketClient, "user-2", 30, 40, 2, nil) cfg := BlocksCleanerConfig{ - DeletionDelay: time.Hour, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } ctx := context.Background() @@ -486,7 +490,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) { id2 := createTSDBBlock(t, bucketClient, "user-1", 6000, 7000, 2, nil) id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, 2, nil) - w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger) + w := bucketindex.NewUpdater(bucketClient, "user-1", nil, 16, logger) idx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) @@ -791,10 +795,11 @@ func TestBlocksCleaner_ShouldRemovePartialBlocksOutsideDelayPeriod(t *testing.T) block2 := createTSDBBlock(t, bucketClient, "user-1", ts(-8), ts(-6), 2, nil) cfg := BlocksCleanerConfig{ - DeletionDelay: time.Hour, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } ctx := context.Background() @@ -861,10 +866,11 @@ func TestBlocksCleaner_ShouldNotRemovePartialBlocksInsideDelayPeriod(t *testing. block2 := createTSDBBlock(t, bucketClient, "user-2", ts(-8), ts(-6), 2, nil) cfg := BlocksCleanerConfig{ - DeletionDelay: time.Hour, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } ctx := context.Background() @@ -949,10 +955,11 @@ func TestBlocksCleaner_ShouldNotRemovePartialBlocksIfConfiguredDelayIsInvalid(t require.NoError(t, err) cfg := BlocksCleanerConfig{ - DeletionDelay: time.Hour, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, } // Configure an invalid delay. @@ -1038,11 +1045,12 @@ func TestComputeCompactionJobs(t *testing.T) { bucketClient = block.BucketWithGlobalMarkers(bucketClient) cfg := BlocksCleanerConfig{ - DeletionDelay: time.Hour, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, - CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour}, + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, + CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour}, } const user = "test" @@ -1257,11 +1265,12 @@ func TestBlocksCleaner_RaceCondition_CleanerUpdatesBucketIndexWhileAnotherCleane logger = log.NewNopLogger() cfgProvider = newMockConfigProvider() cfg = BlocksCleanerConfig{ - DeletionDelay: deletionDelay, - CleanupInterval: time.Minute, - CleanupConcurrency: 1, - DeleteBlocksConcurrency: 1, - NoBlocksFileCleanupEnabled: true, + DeletionDelay: deletionDelay, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + GetDeletionMarkersConcurrency: 1, + NoBlocksFileCleanupEnabled: true, } ) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 66838c0e9f3..3c583baf39f 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -521,13 +521,14 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { // Create the blocks cleaner (service). c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ - DeletionDelay: c.compactorCfg.DeletionDelay, - CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1), - CleanupConcurrency: c.compactorCfg.CleanupConcurrency, - TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency, - NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled, - CompactionBlockRanges: c.compactorCfg.BlockRanges, + DeletionDelay: c.compactorCfg.DeletionDelay, + CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1), + CleanupConcurrency: c.compactorCfg.CleanupConcurrency, + TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, + DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency, + GetDeletionMarkersConcurrency: defaultGetDeletionMarkersConcurrency, + NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled, + CompactionBlockRanges: c.compactorCfg.BlockRanges, }, c.bucketClient, c.shardingStrategy.blocksCleanerOwnsUser, c.cfgProvider, c.parentLogger, c.registerer) // Start blocks cleaner asynchronously, don't wait until initial cleanup is finished. diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index a40399941d0..0baa7314af1 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -56,7 +56,7 @@ func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { block.MockStorageDeletionMark(t, bkt, userID, block.MockStorageBlock(t, bkt, userID, 30, 40)) // Write the index. - u := NewUpdater(bkt, userID, nil, logger) + u := NewUpdater(bkt, userID, nil, 16, logger) expectedIdx, _, err := u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, WriteIndex(ctx, bkt, userID, nil, expectedIdx)) @@ -93,7 +93,7 @@ func BenchmarkReadIndex(b *testing.B) { } // Write the index. - u := NewUpdater(bkt, userID, nil, logger) + u := NewUpdater(bkt, userID, nil, 16, logger) idx, _, err := u.UpdateIndex(ctx, nil) require.NoError(b, err) require.NoError(b, WriteIndex(ctx, bkt, userID, nil, idx)) diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 79158f98d8e..6aaed644be3 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -32,14 +33,16 @@ var ( // Updater is responsible to generate an update in-memory bucket index. type Updater struct { - bkt objstore.InstrumentedBucket - logger log.Logger + bkt objstore.InstrumentedBucket + logger log.Logger + getDeletionMarkersConcurrency int } -func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater { +func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, getDeletionMarkersConcurrency int, logger log.Logger) *Updater { return &Updater{ - bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), - logger: logger, + bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), + getDeletionMarkersConcurrency: getDeletionMarkersConcurrency, + logger: logger, } } @@ -209,23 +212,32 @@ func (w *Updater) updateBlockDeletionMarks(ctx context.Context, old []*BlockDele } // Remaining markers are new ones and we have to fetch them. + discoveredSlice := make([]ulid.ULID, 0, len(discovered)) for id := range discovered { + discoveredSlice = append(discoveredSlice, id) + } + + updatedMarks, err := concurrency.ForEachJobMergeResults(ctx, discoveredSlice, w.getDeletionMarkersConcurrency, func(ctx context.Context, id ulid.ULID) ([]*BlockDeletionMark, error) { m, err := w.updateBlockDeletionMarkIndexEntry(ctx, id) if errors.Is(err, ErrBlockDeletionMarkNotFound) { // This could happen if the block is permanently deleted between the "list objects" and now. level.Warn(w.logger).Log("msg", "skipped missing block deletion mark when updating bucket index", "block", id.String()) - continue + return nil, nil } if errors.Is(err, ErrBlockDeletionMarkCorrupted) { level.Error(w.logger).Log("msg", "skipped corrupted block deletion mark when updating bucket index", "block", id.String(), "err", err) - continue + return nil, nil } if err != nil { return nil, err } - out = append(out, m) + return BlockDeletionMarks{m}, nil + }) + if err != nil { + return nil, err } + out = append(out, updatedMarks...) level.Info(w.logger).Log("msg", "updated deletion markers for recently marked blocks", "count", len(discovered), "total_deletion_markers", len(out)) diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index b1bfa39b5cb..1a3b2d0bbaf 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -41,7 +41,7 @@ func TestUpdater_UpdateIndex(t *testing.T) { block2 := block.MockStorageBlockWithExtLabels(t, bkt, userID, 20, 30, map[string]string{mimir_tsdb.CompactorShardIDExternalLabel: "1_of_5"}) block2Mark := block.MockStorageDeletionMark(t, bkt, userID, block2.BlockMeta) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, 16, logger) returnedIdx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, @@ -90,7 +90,7 @@ func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { // Delete a block's meta.json to simulate a partial block. require.NoError(t, bkt.Delete(ctx, path.Join(userID, block3.ULID.String(), block.MetaFilename))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, 16, logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, @@ -119,7 +119,7 @@ func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { // Overwrite a block's meta.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block3.ULID.String(), block.MetaFilename), bytes.NewReader([]byte("invalid!}")))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, 16, logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, @@ -148,7 +148,7 @@ func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) { // Overwrite a block's deletion-mark.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block2Mark.ID.String(), block.DeletionMarkFilename), bytes.NewReader([]byte("invalid!}")))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, 16, logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, @@ -164,7 +164,7 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { bkt, _ := testutil.PrepareFilesystemBucket(t) for _, oldIdx := range []*Index{nil, {}} { - w := NewUpdater(bkt, userID, nil, log.NewNopLogger()) + w := NewUpdater(bkt, userID, nil, 16, log.NewNopLogger()) idx, partials, err := w.UpdateIndex(ctx, oldIdx) require.NoError(t, err) @@ -203,7 +203,7 @@ func TestUpdater_UpdateIndexFromVersion1ToVersion2(t *testing.T) { require.Equal(t, "3_of_4", block2.Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel]) // Generate index (this produces V2 index, with compactor shard IDs). - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, 16, logger) returnedIdx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 170ac1849b7..c91693f5685 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -1710,7 +1710,7 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, } func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index { - updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()) + updater := bucketindex.NewUpdater(bkt, userID, nil, 16, log.NewNopLogger()) idx, _, err := updater.UpdateIndex(context.Background(), nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx)) diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 557bfa3f752..32c7de429a8 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -74,7 +74,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { if bucketIndexEnabled { var err error - u := bucketindex.NewUpdater(bkt, userID, nil, logger) + u := bucketindex.NewUpdater(bkt, userID, nil, 16, logger) idx, _, err = u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, idx))