Skip to content

Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants #3627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [ENHANCEMENT] Blocks storage: added block index attributes caching support to metadata cache. The TTL can be configured via `-blocks-storage.bucket-store.metadata-cache.block-index-attributes-ttl`. #3629
* [ENHANCEMENT] Alertmanager: Add support for Azure blob storage. #3634
* [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613
* [ENHANCEMENT] Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants. #3627
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603
* [BUGFIX] Ingester: do not close idle TSDBs while blocks shipping is in progress. #3630
Expand Down
3 changes: 1 addition & 2 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ compactor:
# CLI flag: -compactor.compaction-interval
[compaction_interval: <duration> | default = 1h]

# How many times to retry a failed compaction during a single compaction
# interval
# How many times to retry a failed compaction within a single compaction run.
# CLI flag: -compactor.compaction-retries
[compaction_retries: <int> | default = 3]

Expand Down
3 changes: 1 addition & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4086,8 +4086,7 @@ The `compactor_config` configures the compactor for the blocks storage.
# CLI flag: -compactor.compaction-interval
[compaction_interval: <duration> | default = 1h]

# How many times to retry a failed compaction during a single compaction
# interval
# How many times to retry a failed compaction within a single compaction run.
# CLI flag: -compactor.compaction-retries
[compaction_retries: <int> | default = 3]

Expand Down
91 changes: 59 additions & 32 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -81,7 +80,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MetaSyncConcurrency, "compactor.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions")
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction within a single compaction run.")
f.IntVar(&cfg.CompactionConcurrency, "compactor.compaction-concurrency", 1, "Max number of concurrent compactions running.")
f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.")
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
Expand Down Expand Up @@ -367,15 +366,15 @@ func (c *Compactor) stopping(_ error) error {

func (c *Compactor) running(ctx context.Context) error {
// Run an initial compaction before starting the interval.
c.compactUsersWithRetries(ctx)
c.compactUsers(ctx)

ticker := time.NewTicker(util.DurationWithJitter(c.compactorCfg.CompactionInterval, 0.05))
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.compactUsersWithRetries(ctx)
c.compactUsers(ctx)
case <-ctx.Done():
return nil
case err := <-c.ringSubservicesWatcher.Chan():
Expand All @@ -384,44 +383,31 @@ func (c *Compactor) running(ctx context.Context) error {
}
}

func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
retries := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
})
func (c *Compactor) compactUsers(ctx context.Context) {
succeeded := false

c.compactionRunsStarted.Inc()

for retries.Ongoing() {
if err := c.compactUsers(ctx); err == nil {
defer func() {
if succeeded {
c.compactionRunsCompleted.Inc()
c.compactionRunsLastSuccess.SetToCurrentTime()
return
} else if errors.Is(err, context.Canceled) {
return
} else {
c.compactionRunsFailed.Inc()
}

retries.Wait()
}

c.compactionRunsFailed.Inc()
}

func (c *Compactor) compactUsers(ctx context.Context) error {
// Reset progress metrics once done.
defer func() {
// Reset progress metrics once done.
c.compactionRunDiscoveredTenants.Set(0)
c.compactionRunSkippedTenants.Set(0)
c.compactionRunSucceededTenants.Set(0)
c.compactionRunFailedTenants.Set(0)
}()

level.Info(c.logger).Log("msg", "discovering users from bucket")
users, err := c.discoverUsers(ctx)
users, err := c.discoverUsersWithRetries(ctx)
if err != nil {
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
return errors.Wrap(err, "failed to discover users from bucket")
return
}

level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users))
Expand All @@ -434,13 +420,11 @@ func (c *Compactor) compactUsers(ctx context.Context) error {
users[i], users[j] = users[j], users[i]
})

errs := tsdb_errors.NewMulti()

for _, userID := range users {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if ctx.Err() != nil {
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
return ctx.Err()
return
}

// Ensure the user ID belongs to our shard.
Expand All @@ -466,18 +450,38 @@ func (c *Compactor) compactUsers(ctx context.Context) error {

level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)

if err = c.compactUser(ctx, userID); err != nil {
if err = c.compactUserWithRetries(ctx, userID); err != nil {
c.compactionRunFailedTenants.Inc()
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
errs.Add(errors.Wrapf(err, "failed to compact user blocks (user: %s)", userID))
continue
}

c.compactionRunSucceededTenants.Inc()
level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID)
}

return errs.Err()
succeeded = true
}

func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) error {
var lastErr error

retries := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
})

for retries.Ongoing() {
lastErr = c.compactUser(ctx, userID)
if lastErr == nil {
return nil
}

retries.Wait()
}

return lastErr
}

func (c *Compactor) compactUser(ctx context.Context, userID string) error {
Expand Down Expand Up @@ -570,6 +574,29 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
return nil
}

func (c *Compactor) discoverUsersWithRetries(ctx context.Context) ([]string, error) {
var lastErr error

retries := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
})

for retries.Ongoing() {
var users []string

users, lastErr = c.discoverUsers(ctx)
if lastErr == nil {
return users, nil
}

retries.Wait()
}

return nil, lastErr
}

func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
var users []string

Expand Down
4 changes: 0 additions & 4 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,6 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
`level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`,
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
Expand Down