Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bucket tool retention command #4406

Merged
merged 3 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying.
- [#4392](https://github.com/thanos-io/thanos/pull/4392) Tools: Added `--delete-blocks` to bucket rewrite tool to mark the original blocks for deletion after rewriting is done.
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.

### Fixed

Expand Down
116 changes: 116 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
prommodel "github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
Expand Down Expand Up @@ -85,6 +86,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketCleanup(cmd, objStoreConfig)
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -988,3 +990,117 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return nil
})
}

func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
var (
retentionRaw, retentionFiveMin, retentionOneHr prommodel.Duration
)

cmd := app.Command("retention", "Retention applies retention policies on the given bucket. Please make sure no compactor is running on the same bucket at the same time.")
deleteDelay := cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket.").Default("48h").Duration()
consistencyDelay := cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
Default("30m").Duration()
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()
selectorRelabelConf := extkingpin.RegisterSelectorRelabelFlags(cmd)
cmd.Flag("retention.resolution-raw",
"How long to retain raw samples in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&retentionRaw)
cmd.Flag("retention.resolution-5m", "How long to retain samples of resolution 1 (5 minutes) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&retentionFiveMin)
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&retentionOneHr)
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
retentionByResolution := map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(retentionRaw),
compact.ResolutionLevel5m: time.Duration(retentionFiveMin),
compact.ResolutionLevel1h: time.Duration(retentionOneHr),
}

if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw])
}
if retentionByResolution[compact.ResolutionLevel5m].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of 5 min aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel5m])
}
if retentionByResolution[compact.ResolutionLevel1h].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String())
if err != nil {
return err
}

// Dummy actor to immediately kill the group after the run function returns.
g.Add(func() error { return nil }, func(error) {})

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, *consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)),
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0))},
)
sy, err = compact.NewMetaSyncer(
logger,
reg,
bkt,
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
*blockSyncConcurrency)
if err != nil {
return errors.Wrap(err, "create syncer")
}
}

ctx := context.Background()
level.Info(logger).Log("msg", "syncing blocks metadata")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync blocks")
}

level.Info(logger).Log("msg", "synced blocks done")

level.Warn(logger).Log("msg", "GLOBAL COMPACTOR SHOULD __NOT__ BE RUNNING ON THE SAME BUCKET")

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, stubCounter); err != nil {
return errors.Wrap(err, "retention failed")
}
return nil
})
}
8 changes: 8 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Subcommands:
*IRREVERSIBLE* after certain time (delete delay), so do backup your blocks
first.

tools bucket retention [<flags>]
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.

tools rules-check --rules=RULES
Check if the rule files are valid or not.

Expand Down Expand Up @@ -184,6 +188,10 @@ Subcommands:
*IRREVERSIBLE* after certain time (delete delay), so do backup your blocks
first.

tools bucket retention [<flags>]
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.


```

Expand Down