Skip to content

Commit

Permalink
Merge pull request #72 from vinted/disable_check_with_vcompaction
Browse files Browse the repository at this point in the history
compact: disable sources check with vertical compaction
  • Loading branch information
GiedriusS authored Dec 15, 2023
2 parents ba0096f + 4b52132 commit bf3d201
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func runCompact(
insBkt,
conf.compactionConcurrency,
conf.skipBlockWithOutOfOrderChunks,
enableVerticalCompaction,
)
if err != nil {
return errors.Wrap(err, "create bucket compactor")
Expand Down
10 changes: 9 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,14 @@ type CompactionLifecycleCallback interface {
}

type DefaultCompactionLifecycleCallback struct {
VerticalCompactionEnabled bool
}

func (c DefaultCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, _ log.Logger, _ *Group, toCompactBlocks []*metadata.Meta) error {
// Duplicated sources is handled by vertical compaction through merging series together.
if c.VerticalCompactionEnabled {
return nil
}
// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
uniqueSources := map[ulid.ULID]struct{}{}
Expand Down Expand Up @@ -1287,6 +1292,7 @@ func NewBucketCompactor(
bkt objstore.Bucket,
concurrency int,
skipBlocksWithOutOfOrderChunks bool,
enableVerticalCompaction bool,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
Expand All @@ -1298,7 +1304,9 @@ func NewBucketCompactor(
planner,
comp,
DefaultBlockDeletableChecker{},
DefaultCompactionLifecycleCallback{},
DefaultCompactionLifecycleCallback{
VerticalCompactionEnabled: enableVerticalCompaction,
},
compactDir,
bkt,
concurrency,
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, false)
testutil.Ok(t, err)

// Compaction on empty should not fail.
Expand Down

0 comments on commit bf3d201

Please sign in to comment.