From ec2187aca10c748bce9ab400e30bb85bbcbb5f46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 22 Oct 2020 14:57:32 +0300 Subject: [PATCH] cmd: compact: clean partial / marked blocks concurrently (#3115) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cmd: compact: clean partial / marked blocks concurrently Clean partially uploaded and blocks marked for deletion concurrently with the whole compaction/downsampling process. One iteration could potentially take a few days so it should be nice to periodically clean unneeded blocks in the background. Without this, there are huge spikes in block storage usage. The spike's size depends on how long it takes to complete one iteration. The implementation of this is simple - factored out the deletion part into a separate function. It is called at the end of an iteration + concurrently if `--wait` has been specified. Add a mutex to protect from concurrent runs. Delete blocks from the deletion mark map so that we wouldn't try to delete same blocks twice or more. Signed-off-by: Giedrius Statkevičius * *: update changelog, e2e tests Signed-off-by: Giedrius Statkevičius * cmd: compact: fix according to comments Remove "error" from the `error` and just directly call the function. Signed-off-by: Giedrius Statkevičius * CHANGELOG: cleanups Forgot to remove this part while solving conflicts. Signed-off-by: Giedrius Statkevičius * CHANGELOG: update Signed-off-by: Giedrius Statkevičius * CHANGELOG: clean whitespace Signed-off-by: Giedrius Statkevičius Signed-off-by: Chans321 --- CHANGELOG.md | 4 +++ cmd/thanos/compact.go | 55 +++++++++++++++++++++++++++++++++----- docs/components/compact.md | 6 +++++ test/e2e/compact_test.go | 18 ++++++++----- 4 files changed, 71 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e7cc762cd..19b095f344 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,15 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased + +### Added + - [#3259](https://github.com/thanos-io/thanos/pull/3259) Thanos BlockViewer: Added a button in the blockviewer that allows users to download the metadata of a block. - [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before. - [#3276](https://github.com/thanos-io/thanos/pull/3276) Query Frontend: Support query splitting and retry for label names, label values and series requests. - [#3315](https://github.com/thanos-io/thanos/pull/3315) Query Frontend: Support results caching for label names, label values and series requests. - [#3346](https://github.com/thanos-io/thanos/pull/3346) Ruler UI: Fix a bug preventing the /rules endpoint from loading. +- [#3115](https://github.com/thanos-io/thanos/pull/3115) compact: now deletes partially uploaded and blocks with deletion marks concurrently. It does that at the beginning and then every `--compact.cleanup-interval` time period. By default it is 5 minutes. ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 9a6a79487c..4dcc41db92 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -10,6 +10,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/go-kit/kit/log" @@ -301,6 +302,35 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } + var cleanMtx sync.Mutex + // TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex. + cleanPartialMarked := func() error { + cleanMtx.Lock() + defer cleanMtx.Unlock() + + // No need to resync before partial uploads and delete marked blocks. Last sync should be valid. + compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures) + if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + return errors.Wrap(err, "cleaning marked blocks") + } + + if err := sy.SyncMetas(ctx); err != nil { + level.Error(logger).Log("msg", "failed to sync metas", "err", err) + } + return nil + } + + // Do it once at the beginning to ensure that it runs at least once before + // the main loop. + if err := sy.SyncMetas(ctx); err != nil { + cancel() + return errors.Wrap(err, "syncing metas") + } + if err := cleanPartialMarked(); err != nil { + cancel() + return errors.Wrap(err, "cleaning partial and marked blocks") + } + compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction") @@ -345,12 +375,7 @@ func runCompact( return errors.Wrap(err, "retention failed") } - // No need to resync before partial uploads and delete marked blocks. Last sync should be valid. - compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures) - if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { - return errors.Wrap(err, "error cleaning blocks") - } - return nil + return cleanPartialMarked() } g.Add(func() error { @@ -421,6 +446,21 @@ func runCompact( srv.Handle("/", r) + // Periodically remove partial blocks and blocks marked for deletion + // since one iteration potentially could take a long time. + if conf.cleanupBlocksInterval > 0 { + g.Add(func() error { + // Wait the whole period at the beginning because we've executed this on boot. + select { + case <-time.After(conf.cleanupBlocksInterval): + case <-ctx.Done(): + } + return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), cleanPartialMarked) + }, func(error) { + cancel() + }) + } + g.Add(func() error { iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval) _, _, _ = f.Fetch(iterCtx) @@ -463,6 +503,7 @@ type compactConfig struct { disableDownsampling bool blockSyncConcurrency int blockViewerSyncBlockInterval time.Duration + cleanupBlocksInterval time.Duration compactionConcurrency int deleteDelay model.Duration dedupReplicaLabels []string @@ -512,6 +553,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("20").IntVar(&cc.blockSyncConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) + cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration."). + Default("5m").DurationVar(&cc.cleanupBlocksInterval) cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").IntVar(&cc.compactionConcurrency) diff --git a/docs/components/compact.md b/docs/components/compact.md index c4c935e8f7..062ec2ab34 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -147,6 +147,12 @@ Flags: Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI. + --compact.cleanup-interval=5m + How often we should clean up partially uploaded + blocks and blocks with deletion mark in the + background when --wait has been enabled. Setting + it to "0s" disables it - the cleaning will only + happen at the end of an iteration. --compact.concurrency=1 Number of goroutines to use when compacting groups. --delete-delay=48h Time before a block marked for deletion is diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 560020030b..0a1954348f 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -480,7 +480,10 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, } - t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) { + // No replica label with overlaps should halt compactor. This test is sequential + // because we do not want two Thanos Compact instances deleting the same partially + // uploaded blocks and blocks with deletion marks. + { c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -499,22 +502,25 @@ func TestCompactWithStoreGateway(t *testing.T) { // We expect no ops. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total")) + // However, the blocks have been cleaned because that happens concurrently. + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) + // Ensure bucket UI. ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global")) ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded")) testutil.Ok(t, s.Stop(c)) - }) + } + t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) { // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") @@ -524,10 +530,10 @@ func TestCompactWithStoreGateway(t *testing.T) { // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many // compaction groups. Wait for at least first compaction iteration (next is in 5m). testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))