Skip to content

Add cleaner logic to clean partition compaction blocks and related files #6507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compactor
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -39,6 +40,8 @@ type BlocksCleanerConfig struct {
CleanupConcurrency int
BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
ShardingStrategy string
CompactionStrategy string
}

type BlocksCleaner struct {
Expand All @@ -57,6 +60,7 @@ type BlocksCleaner struct {

cleanerVisitMarkerTimeout time.Duration
cleanerVisitMarkerFileUpdateInterval time.Duration
compactionVisitMarkerTimeout time.Duration

// Metrics.
runsStarted *prometheus.CounterVec
Expand All @@ -73,24 +77,44 @@ type BlocksCleaner struct {
tenantBucketIndexLastUpdate *prometheus.GaugeVec
tenantBlocksCleanedTotal *prometheus.CounterVec
tenantCleanDuration *prometheus.GaugeVec
remainingPlannedCompactions *prometheus.GaugeVec
inProgressCompactions *prometheus.GaugeVec
oldestPartitionGroupOffset *prometheus.GaugeVec
}

func NewBlocksCleaner(
cfg BlocksCleanerConfig,
bucketClient objstore.InstrumentedBucket,
usersScanner *cortex_tsdb.UsersScanner,
compactionVisitMarkerTimeout time.Duration,
cfgProvider ConfigProvider,
logger log.Logger,
ringLifecyclerID string,
reg prometheus.Registerer,
cleanerVisitMarkerTimeout time.Duration,
cleanerVisitMarkerFileUpdateInterval time.Duration,
blocksMarkedForDeletion *prometheus.CounterVec,
remainingPlannedCompactions *prometheus.GaugeVec,
) *BlocksCleaner {

var inProgressCompactions *prometheus.GaugeVec
var oldestPartitionGroupOffset *prometheus.GaugeVec
if cfg.ShardingStrategy == util.ShardingStrategyShuffle && cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
inProgressCompactions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_compactor_in_progress_compactions",
Help: "Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy",
}, commonLabels)
oldestPartitionGroupOffset = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_compactor_oldest_partition_offset",
Help: "Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy",
}, commonLabels)
}

c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
usersScanner: usersScanner,
compactionVisitMarkerTimeout: compactionVisitMarkerTimeout,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
ringLifecyclerID: ringLifecyclerID,
Expand Down Expand Up @@ -153,6 +177,9 @@ func NewBlocksCleaner(
Name: "cortex_bucket_clean_duration_seconds",
Help: "Duration of cleaner runtime for a tenant in seconds",
}, commonLabels),
remainingPlannedCompactions: remainingPlannedCompactions,
inProgressCompactions: inProgressCompactions,
oldestPartitionGroupOffset: oldestPartitionGroupOffset,
}

c.Service = services.NewBasicService(c.starting, c.loop, nil)
Expand Down Expand Up @@ -327,6 +354,13 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
c.remainingPlannedCompactions.DeleteLabelValues(userID)
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
c.inProgressCompactions.DeleteLabelValues(userID)
c.oldestPartitionGroupOffset.DeleteLabelValues(userID)
}
}
}
}
c.lastOwnedUsers = allUsers
Expand Down Expand Up @@ -447,6 +481,15 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted)
}

if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
// Clean up partitioned group info files
if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger); err != nil {
return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory)
} else if deleted > 0 {
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted)
}
}

if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
return errors.Wrap(err, "failed to delete marker files")
} else if deleted > 0 {
Expand Down Expand Up @@ -592,6 +635,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
}
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin = time.Now()
c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID)
level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}

c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
Expand All @@ -600,6 +649,90 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
return nil
}

func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
path string
status PartitionedGroupStatus
})
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
if strings.Contains(file, PartitionVisitMarkerDirectory) {
return nil
}
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
if err != nil {
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
return nil
}

status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
path string
status PartitionedGroupStatus
}{
path: file,
status: status,
}
return nil
})

if err != nil {
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see we return any error in the iter function above. Is this error mainly failure of listing files in the bucket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. In this code, we try to get as many as partition group info files. If there is any issue inside iter function, we just stop and processing what we got. Any retriable error would be covered by next cleaner cycle.

}

remainingCompactions := 0
inProgressCompactions := 0
var oldestPartitionGroup *PartitionedGroupInfo
defer func() {
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
if c.oldestPartitionGroupOffset != nil {
if oldestPartitionGroup != nil {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
} else {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
}
}
}()
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
partitionedGroupInfoFile := extraInfo.path

remainingCompactions += extraInfo.status.PendingPartitions
inProgressCompactions += extraInfo.status.InProgressPartitions
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
oldestPartitionGroup = partitionedGroupInfo
}
if extraInfo.status.CanDelete {
if extraInfo.status.IsCompleted {
// Try to remove all blocks included in partitioned group info
if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil {
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
// if one block can not be marked for deletion, we should
// skip delete this partitioned group. next iteration
// would try it again.
continue
}
}

if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
}
}

if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
// Remove partition visit markers
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile)
}
}
}
}

// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
// and index are updated accordingly.
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
Expand Down
96 changes: 90 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -86,8 +87,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -193,8 +195,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -354,8 +357,9 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -418,8 +422,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -476,8 +481,9 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
Expand Down Expand Up @@ -617,8 +623,9 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand Down Expand Up @@ -811,6 +818,83 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
}
}

func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

ts := func(hours int) int64 {
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
}

userID := "user-1"
partitionedGroupID := uint32(123)
partitionCount := 1
startTime := ts(-10)
endTime := ts(-8)
block1 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
ShardingStrategy: util.ShardingStrategyShuffle,
CompactionStrategy: util.CompactionStrategyPartitioning,
}

ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)

partitionedGroupInfo := PartitionedGroupInfo{
PartitionedGroupID: partitionedGroupID,
PartitionCount: partitionCount,
Partitions: []Partition{
{
PartitionID: 0,
Blocks: []ulid.ULID{block1},
},
},
RangeStart: startTime,
RangeEnd: endTime,
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
Version: PartitionedGroupInfoVersion1,
}
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
require.NoError(t, err)

visitMarker := &partitionVisitMarker{
PartitionedGroupID: partitionedGroupID,
PartitionID: 0,
Status: Completed,
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
}
visitMarkerManager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", visitMarker)
err = visitMarkerManager.updateVisitMarker(ctx)
require.NoError(t, err)

cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID)

partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
require.NoError(t, err)
require.False(t, partitionedGroupFileExists)

block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
require.True(t, block1DeletionMarkerExists)

}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,10 @@ func (c *Compactor) starting(ctx context.Context) error {
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion)
ShardingStrategy: c.compactorCfg.ShardingStrategy,
CompactionStrategy: c.compactorCfg.CompactionStrategy,
}, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)

// Ensure an initial cleanup occurred before starting the compactor.
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
Expand Down
Loading
Loading