Skip to content

Commit

Permalink
compactor: concurrently get deletion marks for blocks (#9881)
Browse files Browse the repository at this point in the history
* compactor: concurrently get deletion marks for blocks

With a lot of blocks churn (for example a lot of OOO blocks) getting the deletion marks for newly discovered blocks can take a long time (13 minutes).

This PR introduces changes the bucket index updater to request 16 deletions marks concurrently (same as the number of block deleted in parallel).

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Restore error handling

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
dimitarvdimitrov and codesome authored Nov 12, 2024
1 parent 7b602e5 commit 94255ac
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [ENHANCEMENT] Ingester: improved lock contention affecting read and write latencies during TSDB head compaction. #9822
* [ENHANCEMENT] Distributor: when a label value fails validation due to invalid UTF-8 characters, don't include the invalid characters in the returned error. #9828
* [ENHANCEMENT] Ingester: when experimental ingest storage is enabled, do not buffer records in the Kafka client when fetch concurrency is in use. #9838 #9850
* [ENHANCEMENT] Compactor: refresh deletion marks when updating the bucket index concurrently. This speeds up updating the bucket index by up to 16 times when there is a lot of blocks churn (thousands of blocks churning every cleanup cycle). #9881
* [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
Expand Down
20 changes: 11 additions & 9 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@ import (
)

const (
defaultDeleteBlocksConcurrency = 16
defaultDeleteBlocksConcurrency = 16
defaultGetDeletionMarkersConcurrency = 16
)

type BlocksCleanerConfig struct {
DeletionDelay time.Duration
CleanupInterval time.Duration
CleanupConcurrency int
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
DeleteBlocksConcurrency int
NoBlocksFileCleanupEnabled bool
CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs.
DeletionDelay time.Duration
CleanupInterval time.Duration
CleanupConcurrency int
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
DeleteBlocksConcurrency int
GetDeletionMarkersConcurrency int
NoBlocksFileCleanupEnabled bool
CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs.
}

type BlocksCleaner struct {
Expand Down Expand Up @@ -432,7 +434,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
}

// Generate an updated in-memory version of the bucket index.
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, userLogger)
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.cfg.GetDeletionMarkersConcurrency, userLogger)
idx, partials, err := w.UpdateIndex(ctx, idx)
if err != nil {
return err
Expand Down
89 changes: 49 additions & 40 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: options.concurrency,
TenantCleanupDelay: options.tenantDeletionDelay,
DeleteBlocksConcurrency: 1,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: options.concurrency,
TenantCleanupDelay: options.tenantDeletionDelay,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -245,10 +246,11 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
}

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -305,10 +307,11 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}")))

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -354,10 +357,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
createTSDBBlock(t, bucketClient, "user-2", 30, 40, 2, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

ctx := context.Background()
Expand Down Expand Up @@ -486,7 +490,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
id2 := createTSDBBlock(t, bucketClient, "user-1", 6000, 7000, 2, nil)
id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, 2, nil)

w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger)
w := bucketindex.NewUpdater(bucketClient, "user-1", nil, 16, logger)
idx, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -791,10 +795,11 @@ func TestBlocksCleaner_ShouldRemovePartialBlocksOutsideDelayPeriod(t *testing.T)
block2 := createTSDBBlock(t, bucketClient, "user-1", ts(-8), ts(-6), 2, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

ctx := context.Background()
Expand Down Expand Up @@ -861,10 +866,11 @@ func TestBlocksCleaner_ShouldNotRemovePartialBlocksInsideDelayPeriod(t *testing.
block2 := createTSDBBlock(t, bucketClient, "user-2", ts(-8), ts(-6), 2, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

ctx := context.Background()
Expand Down Expand Up @@ -949,10 +955,11 @@ func TestBlocksCleaner_ShouldNotRemovePartialBlocksIfConfiguredDelayIsInvalid(t
require.NoError(t, err)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
}

// Configure an invalid delay.
Expand Down Expand Up @@ -1038,11 +1045,12 @@ func TestComputeCompactionJobs(t *testing.T) {
bucketClient = block.BucketWithGlobalMarkers(bucketClient)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour},
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour},
}

const user = "test"
Expand Down Expand Up @@ -1257,11 +1265,12 @@ func TestBlocksCleaner_RaceCondition_CleanerUpdatesBucketIndexWhileAnotherCleane
logger = log.NewNopLogger()
cfgProvider = newMockConfigProvider()
cfg = BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
NoBlocksFileCleanupEnabled: true,
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
GetDeletionMarkersConcurrency: 1,
NoBlocksFileCleanupEnabled: true,
}
)

Expand Down
15 changes: 8 additions & 7 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,14 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {

// Create the blocks cleaner (service).
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled,
CompactionBlockRanges: c.compactorCfg.BlockRanges,
DeletionDelay: c.compactorCfg.DeletionDelay,
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
GetDeletionMarkersConcurrency: defaultGetDeletionMarkersConcurrency,
NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled,
CompactionBlockRanges: c.compactorCfg.BlockRanges,
}, c.bucketClient, c.shardingStrategy.blocksCleanerOwnsUser, c.cfgProvider, c.parentLogger, c.registerer)

// Start blocks cleaner asynchronously, don't wait until initial cleanup is finished.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/bucketindex/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) {
block.MockStorageDeletionMark(t, bkt, userID, block.MockStorageBlock(t, bkt, userID, 30, 40))

// Write the index.
u := NewUpdater(bkt, userID, nil, logger)
u := NewUpdater(bkt, userID, nil, 16, logger)
expectedIdx, _, err := u.UpdateIndex(ctx, nil)
require.NoError(t, err)
require.NoError(t, WriteIndex(ctx, bkt, userID, nil, expectedIdx))
Expand Down Expand Up @@ -93,7 +93,7 @@ func BenchmarkReadIndex(b *testing.B) {
}

// Write the index.
u := NewUpdater(bkt, userID, nil, logger)
u := NewUpdater(bkt, userID, nil, 16, logger)
idx, _, err := u.UpdateIndex(ctx, nil)
require.NoError(b, err)
require.NoError(b, WriteIndex(ctx, bkt, userID, nil, idx))
Expand Down
28 changes: 20 additions & 8 deletions pkg/storage/tsdb/bucketindex/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand All @@ -32,14 +33,16 @@ var (

// Updater is responsible to generate an update in-memory bucket index.
type Updater struct {
bkt objstore.InstrumentedBucket
logger log.Logger
bkt objstore.InstrumentedBucket
logger log.Logger
getDeletionMarkersConcurrency int
}

func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater {
func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, getDeletionMarkersConcurrency int, logger log.Logger) *Updater {
return &Updater{
bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider),
logger: logger,
bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider),
getDeletionMarkersConcurrency: getDeletionMarkersConcurrency,
logger: logger,
}
}

Expand Down Expand Up @@ -209,23 +212,32 @@ func (w *Updater) updateBlockDeletionMarks(ctx context.Context, old []*BlockDele
}

// Remaining markers are new ones and we have to fetch them.
discoveredSlice := make([]ulid.ULID, 0, len(discovered))
for id := range discovered {
discoveredSlice = append(discoveredSlice, id)
}

updatedMarks, err := concurrency.ForEachJobMergeResults(ctx, discoveredSlice, w.getDeletionMarkersConcurrency, func(ctx context.Context, id ulid.ULID) ([]*BlockDeletionMark, error) {
m, err := w.updateBlockDeletionMarkIndexEntry(ctx, id)
if errors.Is(err, ErrBlockDeletionMarkNotFound) {
// This could happen if the block is permanently deleted between the "list objects" and now.
level.Warn(w.logger).Log("msg", "skipped missing block deletion mark when updating bucket index", "block", id.String())
continue
return nil, nil
}
if errors.Is(err, ErrBlockDeletionMarkCorrupted) {
level.Error(w.logger).Log("msg", "skipped corrupted block deletion mark when updating bucket index", "block", id.String(), "err", err)
continue
return nil, nil
}
if err != nil {
return nil, err
}

out = append(out, m)
return BlockDeletionMarks{m}, nil
})
if err != nil {
return nil, err
}
out = append(out, updatedMarks...)

level.Info(w.logger).Log("msg", "updated deletion markers for recently marked blocks", "count", len(discovered), "total_deletion_markers", len(out))

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/tsdb/bucketindex/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestUpdater_UpdateIndex(t *testing.T) {
block2 := block.MockStorageBlockWithExtLabels(t, bkt, userID, 20, 30, map[string]string{mimir_tsdb.CompactorShardIDExternalLabel: "1_of_5"})
block2Mark := block.MockStorageDeletionMark(t, bkt, userID, block2.BlockMeta)

w := NewUpdater(bkt, userID, nil, logger)
w := NewUpdater(bkt, userID, nil, 16, logger)
returnedIdx, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqual(t, returnedIdx, bkt, userID,
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) {
// Delete a block's meta.json to simulate a partial block.
require.NoError(t, bkt.Delete(ctx, path.Join(userID, block3.ULID.String(), block.MetaFilename)))

w := NewUpdater(bkt, userID, nil, logger)
w := NewUpdater(bkt, userID, nil, 16, logger)
idx, partials, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqual(t, idx, bkt, userID,
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) {
// Overwrite a block's meta.json with invalid data.
require.NoError(t, bkt.Upload(ctx, path.Join(userID, block3.ULID.String(), block.MetaFilename), bytes.NewReader([]byte("invalid!}"))))

w := NewUpdater(bkt, userID, nil, logger)
w := NewUpdater(bkt, userID, nil, 16, logger)
idx, partials, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqual(t, idx, bkt, userID,
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) {
// Overwrite a block's deletion-mark.json with invalid data.
require.NoError(t, bkt.Upload(ctx, path.Join(userID, block2Mark.ID.String(), block.DeletionMarkFilename), bytes.NewReader([]byte("invalid!}"))))

w := NewUpdater(bkt, userID, nil, logger)
w := NewUpdater(bkt, userID, nil, 16, logger)
idx, partials, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqual(t, idx, bkt, userID,
Expand All @@ -164,7 +164,7 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) {
bkt, _ := testutil.PrepareFilesystemBucket(t)

for _, oldIdx := range []*Index{nil, {}} {
w := NewUpdater(bkt, userID, nil, log.NewNopLogger())
w := NewUpdater(bkt, userID, nil, 16, log.NewNopLogger())
idx, partials, err := w.UpdateIndex(ctx, oldIdx)

require.NoError(t, err)
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestUpdater_UpdateIndexFromVersion1ToVersion2(t *testing.T) {
require.Equal(t, "3_of_4", block2.Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel])

// Generate index (this produces V2 index, with compactor shard IDs).
w := NewUpdater(bkt, userID, nil, logger)
w := NewUpdater(bkt, userID, nil, 16, logger)
returnedIdx, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqual(t, returnedIdx, bkt, userID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string,
}

func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index {
updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger())
updater := bucketindex.NewUpdater(bkt, userID, nil, 16, log.NewNopLogger())
idx, _, err := updater.UpdateIndex(context.Background(), nil)
require.NoError(t, err)
require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
if bucketIndexEnabled {
var err error

u := bucketindex.NewUpdater(bkt, userID, nil, logger)
u := bucketindex.NewUpdater(bkt, userID, nil, 16, logger)
idx, _, err = u.UpdateIndex(ctx, nil)
require.NoError(t, err)
require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, idx))
Expand Down

0 comments on commit 94255ac

Please sign in to comment.