Skip to content

Commit 546d044

Browse files
alexqylemengmengydanielblandoalanprotdamnever
authored
Introduced lock file to shuffle sharding grouper (#4805)
* Introduced lock file to shuffle sharding grouper Signed-off-by: Alex Le <leqiyue@amazon.com> * let redis cache logs log with context (#4785) * let redis cache logs log with context Signed-off-by: Mengmeng Yang <mengmengyang616@gmail.com> * fix import Signed-off-by: Mengmeng Yang <mengmengyang616@gmail.com> Signed-off-by: Alex Le <leqiyue@amazon.com> * DoBatch preference to 4xx if error (#4783) * DoBatch preference to 4xx if error Signed-off-by: Daniel Blando <ddeluigg@amazon.com> * Fix comment Signed-off-by: Daniel Blando <ddeluigg@amazon.com> Signed-off-by: Alex Le <leqiyue@amazon.com> * Updated CHANGELOG and ordered imports Signed-off-by: Alex Le <leqiyue@amazon.com> * Fixed lint and removed groupCallLimit Signed-off-by: Alex Le <leqiyue@amazon.com> * Changed lock file to json format and make sure planner would not pick up group that is locked by other compactor Signed-off-by: Alex Le <leqiyue@amazon.com> * Fix updateCachedShippedBlocks - new thanos (#4806) Signed-off-by: Alan Protasio <approtas@amazon.com> Signed-off-by: Alex Le <leqiyue@amazon.com> * Join memberlist on starting with no retry (#4804) Signed-off-by: Daniel Blando <ddeluigg@amazon.com> * Fix alertmanager log message (#4801) Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com> Signed-off-by: Alex Le <leqiyue@amazon.com> * Grafana Cloud uses Mimir now, so remove Grafana Cloud as hosted service in documents (#4809) * Grafana Cloud uses Mimir, for of Cortex, now Signed-off-by: Alvin Lin <alvinlin123@gmail.com> * Improve doc Signed-off-by: Alvin Lin <alvinlin@amazon.com> Signed-off-by: Alex Le <leqiyue@amazon.com> * Created block_locker to handle all block lock file operations. Added block lock metrics. Signed-off-by: Alex Le <leqiyue@amazon.com> * Moved lock file heart beat into planner and refined planner logic to make sure blocks are locked by current compactor Signed-off-by: Alex Le <leqiyue@amazon.com> * Updated documents Signed-off-by: Alex Le <leqiyue@amazon.com> * Return concurrency number of group. Use ticker for lock file heart beat Signed-off-by: Alex Le <leqiyue@amazon.com> * Renamed lock file to be visit marker file Signed-off-by: Alex Le <leqiyue@amazon.com> * Fixed unit test Signed-off-by: Alex Le <leqiyue@amazon.com> * Make sure visited block can be picked by compactor visited it Signed-off-by: Alex Le <leqiyue@amazon.com> Signed-off-by: Alex Le <leqiyue@amazon.com> Signed-off-by: Mengmeng Yang <mengmengyang616@gmail.com> Signed-off-by: Daniel Blando <ddeluigg@amazon.com> Signed-off-by: Alan Protasio <approtas@amazon.com> Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com> Signed-off-by: Alvin Lin <alvinlin@amazon.com> Signed-off-by: Alex Le <emoc1989@gmail.com> Co-authored-by: Mengmeng Yang <mengmengyang616@gmail.com> Co-authored-by: Daniel Blando <ddeluigg@amazon.com> Co-authored-by: Alan Protasio <approtas@amazon.com> Co-authored-by: Xiaochao Dong <the.xcdong@gmail.com> Co-authored-by: Alvin Lin <alvinlin@amazon.com>
1 parent a27f06b commit 546d044

11 files changed

+778
-121
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783
4545
* [ENHANCEMENT] Cortex now built with Go 1.18. #4829
4646
* [ENHANCEMENT] Ingester: Prevent ingesters to become unhealthy during wall replay. #4847
47+
* [ENHANCEMENT] Compactor: Introduced visit marker file for blocks so blocks are under compaction will not be picked up by another compactor. #4805
4748
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
4849
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
4950
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818

docs/blocks-storage/compactor.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,14 @@ compactor:
254254
# Timeout for waiting on compactor to become ACTIVE in the ring.
255255
# CLI flag: -compactor.ring.wait-active-instance-timeout
256256
[wait_active_instance_timeout: <duration> | default = 10m]
257+
258+
# How long block visit marker file should be considered as expired and able to
259+
# be picked up by compactor again.
260+
# CLI flag: -compactor.block-visit-marker-timeout
261+
[block_visit_marker_timeout: <duration> | default = 5m]
262+
263+
# How frequently block visit marker file should be updated duration
264+
# compaction.
265+
# CLI flag: -compactor.block-visit-marker-file-update-interval
266+
[block_visit_marker_file_update_interval: <duration> | default = 1m]
257267
```

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3855,6 +3855,15 @@ sharding_ring:
38553855
# Timeout for waiting on compactor to become ACTIVE in the ring.
38563856
# CLI flag: -compactor.ring.wait-active-instance-timeout
38573857
[wait_active_instance_timeout: <duration> | default = 10m]
3858+
3859+
# How long block visit marker file should be considered as expired and able to
3860+
# be picked up by compactor again.
3861+
# CLI flag: -compactor.block-visit-marker-timeout
3862+
[block_visit_marker_timeout: <duration> | default = 5m]
3863+
3864+
# How frequently block visit marker file should be updated duration compaction.
3865+
# CLI flag: -compactor.block-visit-marker-file-update-interval
3866+
[block_visit_marker_file_update_interval: <duration> | default = 1m]
38583867
```
38593868

38603869
### `store_gateway_config`

pkg/compactor/block_visit_marker.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package compactor
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io/ioutil"
9+
"path"
10+
"strings"
11+
"time"
12+
13+
"github.com/go-kit/log"
14+
"github.com/go-kit/log/level"
15+
"github.com/pkg/errors"
16+
"github.com/prometheus/client_golang/prometheus"
17+
"github.com/thanos-io/thanos/pkg/block/metadata"
18+
"github.com/thanos-io/thanos/pkg/objstore"
19+
)
20+
21+
const BlockVisitMarkerFile = "block.visit"
22+
23+
var (
24+
ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found")
25+
ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
26+
)
27+
28+
type BlockVisitMarker struct {
29+
CompactorID string `json:"compactorID"`
30+
VisitTime time.Time `json:"visitTime"`
31+
}
32+
33+
func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool {
34+
return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout))
35+
}
36+
37+
func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool {
38+
return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) && b.CompactorID == compactorID
39+
}
40+
41+
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
42+
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
43+
visitMarkerFileReader, err := bkt.Get(ctx, visitMarkerFile)
44+
if err != nil {
45+
if bkt.IsObjNotFoundErr(err) {
46+
return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile)
47+
}
48+
blockVisitMarkerReadFailed.Inc()
49+
return nil, errors.Wrapf(err, "get block visit marker file: %s", visitMarkerFile)
50+
}
51+
b, err := ioutil.ReadAll(visitMarkerFileReader)
52+
if err != nil {
53+
blockVisitMarkerReadFailed.Inc()
54+
return nil, errors.Wrapf(err, "read block visit marker file: %s", visitMarkerFile)
55+
}
56+
blockVisitMarker := BlockVisitMarker{}
57+
err = json.Unmarshal(b, &blockVisitMarker)
58+
if err != nil {
59+
blockVisitMarkerReadFailed.Inc()
60+
return nil, errors.Wrapf(ErrorUnmarshalBlockVisitMarker, "block visit marker file: %s, error: %v", visitMarkerFile, err.Error())
61+
}
62+
return &blockVisitMarker, nil
63+
}
64+
65+
func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, compactorID string, blockVisitMarkerWriteFailed prometheus.Counter) error {
66+
blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile)
67+
blockVisitMarker := BlockVisitMarker{
68+
CompactorID: compactorID,
69+
VisitTime: time.Now(),
70+
}
71+
visitMarkerFileContent, err := json.Marshal(blockVisitMarker)
72+
if err != nil {
73+
blockVisitMarkerWriteFailed.Inc()
74+
return err
75+
}
76+
err = bkt.Upload(ctx, blockVisitMarkerFilePath, bytes.NewReader(visitMarkerFileContent))
77+
if err != nil {
78+
blockVisitMarkerWriteFailed.Inc()
79+
return err
80+
}
81+
return nil
82+
}
83+
84+
func markBlocksVisited(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerWriteFailed prometheus.Counter) {
85+
for _, block := range blocks {
86+
blockID := block.ULID.String()
87+
err := UpdateBlockVisitMarker(ctx, bkt, blockID, compactorID, blockVisitMarkerWriteFailed)
88+
if err != nil {
89+
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err)
90+
}
91+
}
92+
}
93+
94+
func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) {
95+
var blockIds []string
96+
for _, block := range blocks {
97+
blockIds = append(blockIds, block.ULID.String())
98+
}
99+
blocksInfo := strings.Join(blockIds, ",")
100+
level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo))
101+
ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval)
102+
defer ticker.Stop()
103+
heartBeat:
104+
for {
105+
level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo))
106+
markBlocksVisited(ctx, bkt, logger, blocks, compactorID, blockVisitMarkerWriteFailed)
107+
108+
select {
109+
case <-ctx.Done():
110+
break heartBeat
111+
case <-ticker.C:
112+
continue
113+
}
114+
}
115+
level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo))
116+
}

pkg/compactor/compactor.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ var (
5353
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5454
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
5555

56-
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
56+
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
5757
return compact.NewDefaultGrouper(
5858
logger,
5959
bkt,
@@ -68,8 +68,9 @@ var (
6868
cfg.BlocksFetchConcurrency)
6969
}
7070

71-
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
71+
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
7272
return NewShuffleShardingGrouper(
73+
ctx,
7374
logger,
7475
bkt,
7576
false, // Do not accept malformed indexes
@@ -83,10 +84,15 @@ var (
8384
cfg,
8485
ring,
8586
ringLifecycle.Addr,
87+
ringLifecycle.ID,
8688
limits,
8789
userID,
8890
cfg.BlockFilesConcurrency,
89-
cfg.BlocksFetchConcurrency)
91+
cfg.BlocksFetchConcurrency,
92+
cfg.CompactionConcurrency,
93+
cfg.BlockVisitMarkerTimeout,
94+
blockVisitMarkerReadFailed,
95+
blockVisitMarkerWriteFailed)
9096
}
9197

9298
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
@@ -95,7 +101,7 @@ var (
95101
return nil, nil, err
96102
}
97103

98-
plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner {
104+
plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
99105
return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
100106
}
101107

@@ -108,9 +114,9 @@ var (
108114
return nil, nil, err
109115
}
110116

111-
plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner {
117+
plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {
112118

113-
return NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks)
119+
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
114120
}
115121
return compactor, plannerFactory, nil
116122
}
@@ -127,6 +133,8 @@ type BlocksGrouperFactory func(
127133
blocksMarkedForNoCompact prometheus.Counter,
128134
garbageCollectedBlocks prometheus.Counter,
129135
remainingPlannedCompactions prometheus.Gauge,
136+
blockVisitMarkerReadFailed prometheus.Counter,
137+
blockVisitMarkerWriteFailed prometheus.Counter,
130138
ring *ring.Ring,
131139
ringLifecycler *ring.Lifecycler,
132140
limit Limits,
@@ -142,9 +150,14 @@ type BlocksCompactorFactory func(
142150
) (compact.Compactor, PlannerFactory, error)
143151

144152
type PlannerFactory func(
153+
ctx context.Context,
154+
bkt objstore.Bucket,
145155
logger log.Logger,
146156
cfg Config,
147157
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
158+
ringLifecycle *ring.Lifecycler,
159+
blockVisitMarkerReadFailed prometheus.Counter,
160+
blockVisitMarkerWriteFailed prometheus.Counter,
148161
) compact.Planner
149162

150163
// Limits defines limits used by the Compactor.
@@ -190,6 +203,10 @@ type Config struct {
190203
// Allow downstream projects to customise the blocks compactor.
191204
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
192205
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`
206+
207+
// Block visit marker file config
208+
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
209+
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
193210
}
194211

195212
// RegisterFlags registers the Compactor flags.
@@ -223,6 +240,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
223240

224241
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
225242
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
243+
244+
f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
245+
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")
226246
}
227247

228248
func (cfg *Config) Validate(limits validation.Limits) error {
@@ -306,6 +326,8 @@ type Compactor struct {
306326
blocksMarkedForNoCompaction prometheus.Counter
307327
garbageCollectedBlocks prometheus.Counter
308328
remainingPlannedCompactions prometheus.Gauge
329+
blockVisitMarkerReadFailed prometheus.Counter
330+
blockVisitMarkerWriteFailed prometheus.Counter
309331

310332
// TSDB syncer metrics
311333
syncerMetrics *syncerMetrics
@@ -423,6 +445,14 @@ func newCompactor(
423445
Name: "cortex_compactor_garbage_collected_blocks_total",
424446
Help: "Total number of blocks marked for deletion by compactor.",
425447
}),
448+
blockVisitMarkerReadFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
449+
Name: "cortex_compactor_block_visit_marker_read_failed",
450+
Help: "Number of block visit marker file failed to be read.",
451+
}),
452+
blockVisitMarkerWriteFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
453+
Name: "cortex_compactor_block_visit_marker_write_failed",
454+
Help: "Number of block visit marker file failed to be written.",
455+
}),
426456
remainingPlannedCompactions: remainingPlannedCompactions,
427457
limits: limits,
428458
}
@@ -760,11 +790,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
760790
return errors.Wrap(err, "failed to create syncer")
761791
}
762792

793+
currentCtx, cancel := context.WithCancel(ctx)
794+
defer cancel()
763795
compactor, err := compact.NewBucketCompactor(
764796
ulogger,
765797
syncer,
766-
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID),
767-
c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter),
798+
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID),
799+
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed),
768800
c.blocksCompactor,
769801
path.Join(c.compactorCfg.DataDir, "compact"),
770802
bucket,

0 commit comments

Comments
 (0)