Skip to content

Split compactor cleaner metrics #6827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
* [ENHANCEMENT] Compactor: Emit partition metrics separate from cleaner job. #6827
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# How long to cache list of partitioned groups for an user. 0 disables
# caching
# CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl
[partitioned_groups_list_ttl: <duration> | default = 0s]

# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# How long to cache list of partitioned groups for an user. 0 disables
# caching
# CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl
[partitioned_groups_list_ttl: <duration> | default = 0s]

# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,11 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.metadata-cache.bucket-index-max-size-bytes
[bucket_index_max_size_bytes: <int> | default = 1048576]

# How long to cache list of partitioned groups for an user. 0 disables
# caching
# CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl
[partitioned_groups_list_ttl: <duration> | default = 0s]

# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]
Expand Down
147 changes: 99 additions & 48 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
go func() {
c.runDeleteUserCleanup(ctx, deleteChan)
}()
var metricsChan chan *cleanerJob
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle &&
c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
metricsChan = make(chan *cleanerJob)
defer close(metricsChan)
go func() {
c.runEmitPartitionMetricsWorker(ctx, metricsChan)
}()
}

for {
select {
Expand Down Expand Up @@ -276,6 +285,17 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
c.enqueueJobFailed.WithLabelValues(deletedStatus).Inc()
}

if metricsChan != nil {
select {
case metricsChan <- &cleanerJob{
users: activeUsers,
timestamp: cleanJobTimestamp,
}:
default:
level.Warn(c.logger).Log("msg", "unable to push metrics job to metricsChan")
}
}

case <-ctx.Done():
return nil
}
Expand All @@ -295,10 +315,25 @@ func (c *BlocksCleaner) checkRunError(runType string, err error) {
}
}

func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan chan *cleanerJob) {
func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobChan <-chan *cleanerJob) {
for job := range jobChan {
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
userLogger := util_log.WithUserID(userID, c.logger)
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID)
return nil
})

if err != nil {
level.Error(c.logger).Log("msg", "emit metrics failed", "err", err.Error())
}
}
}

func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan <-chan *cleanerJob) {
for job := range jobChan {
if job.timestamp < time.Now().Add(-c.cfg.CleanupInterval).Unix() {
level.Warn(c.logger).Log("Active user cleaner job too old. Ignoring to get recent data")
level.Warn(c.logger).Log("msg", "Active user cleaner job too old. Ignoring to get recent data")
continue
}
err := c.cleanUpActiveUsers(ctx, job.users, false)
Expand Down Expand Up @@ -746,59 +781,14 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
}

func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
path string
status PartitionedGroupStatus
})
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
if strings.Contains(file, PartitionVisitMarkerDirectory) {
return nil
}
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
if err != nil {
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
return nil
}

status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
path string
status PartitionedGroupStatus
}{
path: file,
status: status,
}
return nil
})

existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
if err != nil {
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
}

remainingCompactions := 0
inProgressCompactions := 0
var oldestPartitionGroup *PartitionedGroupInfo
defer func() {
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
if c.oldestPartitionGroupOffset != nil {
if oldestPartitionGroup != nil {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
} else {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
}
}
}()
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
partitionedGroupInfoFile := extraInfo.path

remainingCompactions += extraInfo.status.PendingPartitions
inProgressCompactions += extraInfo.status.InProgressPartitions
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
oldestPartitionGroup = partitionedGroupInfo
}
if extraInfo.status.CanDelete {
if extraInfo.status.IsCompleted {
// Try to remove all blocks included in partitioned group info
Expand Down Expand Up @@ -829,6 +819,67 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
}
}

func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
if err != nil {
level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err)
return
}

remainingCompactions := 0
inProgressCompactions := 0
var oldestPartitionGroup *PartitionedGroupInfo
defer func() {
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
if oldestPartitionGroup != nil {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
} else {
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
}
}()
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
remainingCompactions += extraInfo.status.PendingPartitions
inProgressCompactions += extraInfo.status.InProgressPartitions
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
oldestPartitionGroup = partitionedGroupInfo
}
}
}

func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger) (map[*PartitionedGroupInfo]struct {
path string
status PartitionedGroupStatus
}, error) {
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
path string
status PartitionedGroupStatus
})
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
if strings.Contains(file, PartitionVisitMarkerDirectory) {
return nil
}
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
if err != nil {
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
return nil
}

status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
path string
status PartitionedGroupStatus
}{
path: file,
status: status,
}
return nil
})
return existentPartitionedGroupInfo, err
}

// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
// and index are updated accordingly.
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
Expand Down
125 changes: 124 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,6 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
require.False(t, block2DeletionMarkerExists)

}

func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
Expand Down Expand Up @@ -1127,6 +1126,130 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
`)))
}

func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
ShardingStrategy: util.ShardingStrategyShuffle,
CompactionStrategy: util.CompactionStrategyPartitioning,
}

ctx := context.Background()
logger := log.NewNopLogger()
registry := prometheus.NewPedanticRegistry()
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, registry)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
dummyCounterVec := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"test"})
remainingPlannedCompactions := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
}, commonLabels)

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 15*time.Minute, cfgProvider, logger, "test-cleaner", registry, time.Minute, 30*time.Second, dummyCounterVec, remainingPlannedCompactions)

ts := func(hours int) int64 {
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
}

userID := "user-1"
partitionedGroupID := uint32(123)
partitionCount := 5
startTime := ts(-10)
endTime := ts(-8)
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
partitionedGroupInfo := PartitionedGroupInfo{
PartitionedGroupID: partitionedGroupID,
PartitionCount: partitionCount,
Partitions: []Partition{
{
PartitionID: 0,
},
{
PartitionID: 1,
},
{
PartitionID: 2,
},
{
PartitionID: 3,
},
{
PartitionID: 4,
},
},
RangeStart: startTime,
RangeEnd: endTime,
CreationTime: time.Now().Add(-1 * time.Hour).Unix(),
Version: PartitionedGroupInfoVersion1,
}
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
require.NoError(t, err)

//InProgress with valid VisitTime
v0 := &partitionVisitMarker{
PartitionedGroupID: partitionedGroupID,
PartitionID: 0,
Status: InProgress,
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
}
v0Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v0)
err = v0Manager.updateVisitMarker(ctx)
require.NoError(t, err)

//InProgress with expired VisitTime
v1 := &partitionVisitMarker{
PartitionedGroupID: partitionedGroupID,
PartitionID: 1,
Status: InProgress,
VisitTime: time.Now().Add(-30 * time.Minute).Unix(),
}
v1Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v1)
err = v1Manager.updateVisitMarker(ctx)
require.NoError(t, err)

//V2 and V3 are pending
//V4 is completed
v4 := &partitionVisitMarker{
PartitionedGroupID: partitionedGroupID,
PartitionID: 4,
Status: Completed,
VisitTime: time.Now().Add(-20 * time.Minute).Unix(),
}
v4Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v4)
err = v4Manager.updateVisitMarker(ctx)
require.NoError(t, err)

cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID)

metricNames := []string{
"cortex_compactor_remaining_planned_compactions",
"cortex_compactor_in_progress_compactions",
"cortex_compactor_oldest_partition_offset",
}

// Check tracked Prometheus metrics
expectedMetrics := `
# HELP cortex_compactor_in_progress_compactions Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy
# TYPE cortex_compactor_in_progress_compactions gauge
cortex_compactor_in_progress_compactions{user="user-1"} 1
# HELP cortex_compactor_oldest_partition_offset Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy
# TYPE cortex_compactor_oldest_partition_offset gauge
cortex_compactor_oldest_partition_offset{user="user-1"} 3600
# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy
# TYPE cortex_compactor_remaining_planned_compactions gauge
cortex_compactor_remaining_planned_compactions{user="user-1"} 3
`

assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
parquetConverterEnabled map[string]bool
Expand Down
Loading