diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e7cc762cd..19b095f344 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,15 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased + +### Added + - [#3259](https://github.com/thanos-io/thanos/pull/3259) Thanos BlockViewer: Added a button in the blockviewer that allows users to download the metadata of a block. - [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before. - [#3276](https://github.com/thanos-io/thanos/pull/3276) Query Frontend: Support query splitting and retry for label names, label values and series requests. - [#3315](https://github.com/thanos-io/thanos/pull/3315) Query Frontend: Support results caching for label names, label values and series requests. - [#3346](https://github.com/thanos-io/thanos/pull/3346) Ruler UI: Fix a bug preventing the /rules endpoint from loading. +- [#3115](https://github.com/thanos-io/thanos/pull/3115) compact: now deletes partially uploaded and blocks with deletion marks concurrently. It does that at the beginning and then every `--compact.cleanup-interval` time period. By default it is 5 minutes. ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 9a6a79487c..4dcc41db92 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -10,6 +10,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/go-kit/kit/log" @@ -301,6 +302,35 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } + var cleanMtx sync.Mutex + // TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex. + cleanPartialMarked := func() error { + cleanMtx.Lock() + defer cleanMtx.Unlock() + + // No need to resync before partial uploads and delete marked blocks. Last sync should be valid. + compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures) + if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + return errors.Wrap(err, "cleaning marked blocks") + } + + if err := sy.SyncMetas(ctx); err != nil { + level.Error(logger).Log("msg", "failed to sync metas", "err", err) + } + return nil + } + + // Do it once at the beginning to ensure that it runs at least once before + // the main loop. + if err := sy.SyncMetas(ctx); err != nil { + cancel() + return errors.Wrap(err, "syncing metas") + } + if err := cleanPartialMarked(); err != nil { + cancel() + return errors.Wrap(err, "cleaning partial and marked blocks") + } + compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction") @@ -345,12 +375,7 @@ func runCompact( return errors.Wrap(err, "retention failed") } - // No need to resync before partial uploads and delete marked blocks. Last sync should be valid. - compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures) - if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { - return errors.Wrap(err, "error cleaning blocks") - } - return nil + return cleanPartialMarked() } g.Add(func() error { @@ -421,6 +446,21 @@ func runCompact( srv.Handle("/", r) + // Periodically remove partial blocks and blocks marked for deletion + // since one iteration potentially could take a long time. + if conf.cleanupBlocksInterval > 0 { + g.Add(func() error { + // Wait the whole period at the beginning because we've executed this on boot. + select { + case <-time.After(conf.cleanupBlocksInterval): + case <-ctx.Done(): + } + return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), cleanPartialMarked) + }, func(error) { + cancel() + }) + } + g.Add(func() error { iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval) _, _, _ = f.Fetch(iterCtx) @@ -463,6 +503,7 @@ type compactConfig struct { disableDownsampling bool blockSyncConcurrency int blockViewerSyncBlockInterval time.Duration + cleanupBlocksInterval time.Duration compactionConcurrency int deleteDelay model.Duration dedupReplicaLabels []string @@ -512,6 +553,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("20").IntVar(&cc.blockSyncConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) + cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration."). + Default("5m").DurationVar(&cc.cleanupBlocksInterval) cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").IntVar(&cc.compactionConcurrency) diff --git a/docs/components/compact.md b/docs/components/compact.md index c4c935e8f7..062ec2ab34 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -147,6 +147,12 @@ Flags: Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI. + --compact.cleanup-interval=5m + How often we should clean up partially uploaded + blocks and blocks with deletion mark in the + background when --wait has been enabled. Setting + it to "0s" disables it - the cleaning will only + happen at the end of an iteration. --compact.concurrency=1 Number of goroutines to use when compacting groups. --delete-delay=48h Time before a block marked for deletion is diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 560020030b..0a1954348f 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -480,7 +480,10 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, } - t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) { + // No replica label with overlaps should halt compactor. This test is sequential + // because we do not want two Thanos Compact instances deleting the same partially + // uploaded blocks and blocks with deletion marks. + { c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -499,22 +502,25 @@ func TestCompactWithStoreGateway(t *testing.T) { // We expect no ops. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total")) + // However, the blocks have been cleaned because that happens concurrently. + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) + // Ensure bucket UI. ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global")) ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded")) testutil.Ok(t, s.Stop(c)) - }) + } + t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) { // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") @@ -524,10 +530,10 @@ func TestCompactWithStoreGateway(t *testing.T) { // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many // compaction groups. Wait for at least first compaction iteration (next is in 5m). testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))