Skip to content

Perform final tenant cleanup after last block is deleted. #3613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [ENHANCEMENT] Memberlist: client can now keep a size-bounded buffer with sent and received messages and display them in the admin UI (/memberlist) for troubleshooting. #3581 #3602
* [ENHANCEMENT] Blocks storage: added block index attributes caching support to metadata cache. The TTL can be configured via `-blocks-storage.bucket-store.metadata-cache.block-index-attributes-ttl`. #3629
* [ENHANCEMENT] Alertmanager: Add support for Azure blob storage. #3634
* [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603

Expand Down
2 changes: 2 additions & 0 deletions development/tsdb-blocks-storage-s3/config/cortex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ compactor:
data_dir: /tmp/cortex-compactor
consistency_delay: 1m
sharding_enabled: true
cleanup_interval: 1m
tenant_cleanup_delay: 1m
sharding_ring:
kvstore:
store: consul
Expand Down
21 changes: 21 additions & 0 deletions development/tsdb-blocks-storage-s3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,24 @@ services:
- 18013:18013
volumes:
- ./config:/cortex/config

purger:
build:
context: .
dockerfile: dev.dockerfile
image: cortex
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18014 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=purger -server.http-listen-port=8014 -server.grpc-listen-port=9014"]
depends_on:
- consul
- minio
environment:
- JAEGER_AGENT_HOST=jaeger
- JAEGER_AGENT_PORT=6831
- JAEGER_TAGS=app=querier-scheduler
- JAEGER_SAMPLER_TYPE=const
- JAEGER_SAMPLER_PARAM=1
ports:
- 8014:8014
- 18014:18014
volumes:
- ./config:/cortex/config
5 changes: 5 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ compactor:
# CLI flag: -compactor.deletion-delay
[deletion_delay: <duration> | default = 12h]

# For tenants marked for deletion, this is time between deleting of last
# block, and doing final cleanup (marker files, debug files) of the tenant.
# CLI flag: -compactor.tenant-cleanup-delay
[tenant_cleanup_delay: <duration> | default = 6h]

# When enabled, at compactor startup the bucket will be scanned and all found
# deletion marks inside the block location will be copied to the markers
# global location too. This option can (and should) be safely disabled as soon
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4113,6 +4113,11 @@ The `compactor_config` configures the compactor for the blocks storage.
# CLI flag: -compactor.deletion-delay
[deletion_delay: <duration> | default = 12h]

# For tenants marked for deletion, this is time between deleting of last block,
# and doing final cleanup (marker files, debug files) of the tenant.
# CLI flag: -compactor.tenant-cleanup-delay
[tenant_cleanup_delay: <duration> | default = 6h]

# When enabled, at compactor startup the bucket will be scanned and all found
# deletion marks inside the block location will be copied to the markers global
# location too. This option can (and should) be safely disabled as soon as the
Expand Down
7 changes: 4 additions & 3 deletions pkg/chunk/purger/blocks_purger_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request)
return
}

err = cortex_tsdb.WriteTenantDeletionMark(r.Context(), api.bucketClient, userID)
err = cortex_tsdb.WriteTenantDeletionMark(r.Context(), api.bucketClient, userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -58,8 +59,8 @@ func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request)
type DeleteTenantStatusResponse struct {
TenantID string `json:"tenant_id"`
BlocksDeleted bool `json:"blocks_deleted"`
RuleGroupsDeleted bool `json:"rule_groups_deleted"`
AlertManagerConfigDeleted bool `json:"alert_manager_config_deleted"`
RuleGroupsDeleted bool `json:"rule_groups_deleted,omitempty"` // Not yet supported.
AlertManagerConfigDeleted bool `json:"alert_manager_config_deleted,omitempty"` // Not yet supported.
}

func (api *BlocksPurgerAPI) DeleteTenantStatus(w http.ResponseWriter, r *http.Request) {
Expand Down
57 changes: 49 additions & 8 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type BlocksCleanerConfig struct {
DeletionDelay time.Duration
CleanupInterval time.Duration
CleanupConcurrency int
BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
}

type BlocksCleaner struct {
Expand Down Expand Up @@ -160,18 +161,18 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error {

return concurrency.ForEachUser(ctx, allUsers, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
if isDeleted[userID] {
return errors.Wrapf(c.deleteUser(ctx, userID), "failed to delete blocks for user marked for deletion: %s", userID)
return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userID), "failed to delete user marked for deletion: %s", userID)
}
return errors.Wrapf(c.cleanUser(ctx, userID, firstRun), "failed to delete blocks for user: %s", userID)
})
}

// Remove all blocks for user marked for deletion.
func (c *BlocksCleaner) deleteUser(ctx context.Context, userID string) error {
// Remove blocks and remaining data for tenant marked for deletion.
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID string) error {
userLogger := util.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient)

level.Info(userLogger).Log("msg", "deleting blocks for user marked for deletion")
level.Info(userLogger).Log("msg", "deleting blocks for tenant marked for deletion")

// We immediately delete the bucket index, to signal to its consumers that
// the tenant has "no blocks" in the storage.
Expand All @@ -180,7 +181,7 @@ func (c *BlocksCleaner) deleteUser(ctx context.Context, userID string) error {
}
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)

var deleted, failed int
var deletedBlocks, failed int
err := userBucket.Iter(ctx, "", func(name string) error {
if err := ctx.Err(); err != nil {
return err
Expand All @@ -199,7 +200,7 @@ func (c *BlocksCleaner) deleteUser(ctx context.Context, userID string) error {
return nil // Continue with other blocks.
}

deleted++
deletedBlocks++
c.blocksCleanedTotal.Inc()
level.Info(userLogger).Log("msg", "deleted block", "block", id)
return nil
Expand All @@ -223,7 +224,47 @@ func (c *BlocksCleaner) deleteUser(ctx context.Context, userID string) error {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)

level.Info(userLogger).Log("msg", "finished deleting blocks for user marked for deletion", "deletedBlocks", deleted)
if deletedBlocks > 0 {
level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks)
}

mark, err := cortex_tsdb.ReadTenantDeletionMark(ctx, c.bucketClient, userID)
if err != nil {
return errors.Wrap(err, "failed to read tenant deletion mark")
}
if mark == nil {
return errors.Wrap(err, "cannot find tenant deletion mark anymore")
}

// If we have just deleted some blocks, update "finished" time. Also update "finished" time if it wasn't set yet, but there are no blocks.
// Note: this UPDATES the tenant deletion mark. Components that use caching bucket will NOT SEE this update,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance of multiple callers updating it? I assume if they do they will write similar times and it will be ok?

Copy link
Contributor Author

@pstibrany pstibrany Jan 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within one compactor, it is only called once.

Multiple compactors may happen to call this method concurrently (eg. one compactor is doing compaction cleanup, and another compactor starts and "takes over" the user). In that case, they may overwrite each-others timestamp if they both happen to delete some blocks or read tenant deletion mark with no timestamp. In that case last write will win, which will delay eventual final cleanup of data for tenant.

// but that is fine -- they only check whether tenant deletion marker exists or not.
if deletedBlocks > 0 || mark.FinishedTime == 0 {
level.Info(userLogger).Log("msg", "updating finished time in tenant deletion mark")
mark.FinishedTime = time.Now().Unix()
return errors.Wrap(cortex_tsdb.WriteTenantDeletionMark(ctx, c.bucketClient, userID, mark), "failed to update tenant deletion mark")
}

if time.Since(time.Unix(mark.FinishedTime, 0)) < c.cfg.TenantCleanupDelay {
return nil
}

level.Info(userLogger).Log("msg", "cleaning up remaining blocks data for tenant marked for deletion")

// Let's do final cleanup of tenant.
if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger); err != nil {
return errors.Wrap(err, "failed to delete "+block.DebugMetas)
} else if deleted > 0 {
level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted)
}

// Tenant deletion mark file is inside Markers as well.
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
return errors.Wrap(err, "failed to delete marker files")
} else if deleted > 0 {
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
}

return nil
}

Expand Down
25 changes: 20 additions & 5 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"

Expand All @@ -31,10 +32,11 @@ import (

func TestBlocksCleaner(t *testing.T) {
for _, options := range []testBlocksCleanerOptions{
{concurrency: 1},
{concurrency: 1, tenantDeletionDelay: 0, user4FilesExist: false},
{concurrency: 1, tenantDeletionDelay: 2 * time.Hour, user4FilesExist: true},
{concurrency: 1, markersMigrationEnabled: true},
{concurrency: 2},
{concurrency: 10},
{concurrency: 1, markersMigrationEnabled: true},
} {
options := options

Expand All @@ -48,11 +50,13 @@ func TestBlocksCleaner(t *testing.T) {
type testBlocksCleanerOptions struct {
concurrency int
markersMigrationEnabled bool
tenantDeletionDelay time.Duration
user4FilesExist bool // User 4 has "FinishedTime" in tenant deletion marker set to "1h" ago.
}

func (o testBlocksCleanerOptions) String() string {
return fmt.Sprintf("concurrency=%d markers migration enabled=%v",
o.concurrency, o.markersMigrationEnabled)
return fmt.Sprintf("concurrency=%d, markers migration enabled=%v, tenant deletion delay=%v",
o.concurrency, o.markersMigrationEnabled, o.tenantDeletionDelay)
}

func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
Expand Down Expand Up @@ -85,10 +89,17 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.

// Blocks for user-3, marked for deletion.
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3"))
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3", tsdb.NewTenantDeletionMark(time.Now())))
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)

// User-4 with no more blocks, but couple of mark and debug files. Should be fully deleted.
user4Mark := tsdb.NewTenantDeletionMark(time.Now())
user4Mark.FinishedTime = time.Now().Unix() - 60 // Set to check final user cleanup.
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-4", user4Mark))
user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json")
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))

// The fixtures have been created. If the bucket client wasn't wrapped to write
// deletion marks to the global location too, then this is the right time to do it.
if options.markersMigrationEnabled {
Expand All @@ -100,6 +111,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
CleanupInterval: time.Minute,
CleanupConcurrency: options.concurrency,
BlockDeletionMarksMigrationEnabled: options.markersMigrationEnabled,
TenantCleanupDelay: options.tenantDeletionDelay,
}

reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -138,6 +150,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
{path: path.Join("user-3", block10.String(), "index"), expectedExists: false},
// Tenant deletion mark is not removed.
{path: path.Join("user-3", tsdb.TenantDeletionMarkPath), expectedExists: true},
// User-4 is removed fully.
{path: path.Join("user-4", tsdb.TenantDeletionMarkPath), expectedExists: options.user4FilesExist},
{path: path.Join("user-4", block.DebugMetas, "meta.json"), expectedExists: options.user4FilesExist},
} {
exists, err := bucketClient.Exists(ctx, tc.path)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
CleanupInterval time.Duration `yaml:"cleanup_interval"`
CleanupConcurrency int `yaml:"cleanup_concurrency"`
DeletionDelay time.Duration `yaml:"deletion_delay"`
TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"`

// Whether the migration of block deletion marks to the global markers location is enabled.
BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`
Expand Down Expand Up @@ -88,6 +89,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.")
f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.")

f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
Expand Down Expand Up @@ -341,6 +343,7 @@ func (c *Compactor) starting(ctx context.Context) error {
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.parentLogger, c.registerer)

// Ensure an initial cleanup occurred before starting the compactor.
Expand Down
11 changes: 7 additions & 4 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,15 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
t.Parallel()

cfg := prepareConfig()
cfg.DeletionDelay = 10 * time.Minute // Delete block after 10 minutes
cfg.DeletionDelay = 10 * time.Minute // Delete block after 10 minutes
cfg.TenantCleanupDelay = 10 * time.Minute // To make sure it's not 0.

// Mock the bucket to contain two users, each one with one block.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockExists(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), true, nil)
bucketClient.MockGet(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), `{"deletion_time": 1}`, nil)
bucketClient.MockUpload(path.Join("user-1", cortex_tsdb.TenantDeletionMarkPath), nil)

bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
Expand Down Expand Up @@ -690,11 +692,12 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)

assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
`level=info component=cleaner org_id=user-1 msg="deleting blocks for user marked for deletion"`,
`level=info component=cleaner org_id=user-1 msg="deleting blocks for tenant marked for deletion"`,
`level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTVP434PA9VFXSW2JKB3392D/meta.json bucket=mock`,
`level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTVP434PA9VFXSW2JKB3392D/index bucket=mock`,
`level=info component=cleaner org_id=user-1 msg="deleted block" block=01DTVP434PA9VFXSW2JKB3392D`,
`level=info component=cleaner org_id=user-1 msg="finished deleting blocks for user marked for deletion" deletedBlocks=1`,
`level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`,
`level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`,
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
`level=info component=compactor msg="discovering users from bucket"`,
`level=info component=compactor msg="discovered users from bucket" users=1`,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T)
numObjects := len(bucket.Objects())
require.NotZero(t, numObjects)

require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID))
require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID, cortex_tsdb.NewTenantDeletionMark(time.Now())))
numObjects++ // For deletion marker

db := i.getTSDB(userID)
Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/bucket/bucket_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package bucket

import (
"context"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/thanos-io/thanos/pkg/objstore"
)

// DeletePrefix removes all objects with given prefix, recursively.
// It returns number of deleted objects.
// If deletion of any object fails, it returns error and stops.
func DeletePrefix(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger) (int, error) {
result := 0
err := bkt.Iter(ctx, prefix, func(name string) error {
if strings.HasSuffix(name, objstore.DirDelim) {
deleted, err := DeletePrefix(ctx, bkt, name, logger)
result += deleted
return err
}

if err := bkt.Delete(ctx, name); err != nil {
return err
}
result++
level.Debug(logger).Log("msg", "deleted file", "file", name)
return nil
})

return result, err
}
Loading