Skip to content

Commit

Permalink
add offline dedup compactor
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed May 21, 2021
1 parent 39916a8 commit 3a1c8ef
Show file tree
Hide file tree
Showing 9 changed files with 624 additions and 7 deletions.
23 changes: 22 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -302,9 +305,20 @@ func runCompact(
cancel()
}
}()

var mergeFunc storage.VerticalChunkSeriesMergeFunc
switch conf.dedupFunc {
case compact.DedupAlgorithmPenalty:
mergeFunc = dedup.NewDedupChunkSeriesMerger()

if len(conf.dedupReplicaLabels) == 0 {
return errors.New("penalty based deduplication needs at least one replica label specified")
}
}

// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc)
if err != nil {
return errors.Wrap(err, "create compactor")
}
Expand Down Expand Up @@ -564,6 +578,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
dedupFunc string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -627,6 +642,12 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("compact.dedup-func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "")

// Update this. This flag works for both dedup version compactor and the original compactor.
cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
Expand Down
9 changes: 9 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,15 @@ Flags:
happen at the end of an iteration.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--compact.dedup-func= Experimental. Deduplication algorithm for
merging overlapping blocks. Possible values are:
"", "penalty". If no value is specified, the
default compact deduplication merger is used,
which performs 1:1 deduplication for samples.
When set to penalty, penalty based deduplication
algorithm will be used. At least one replica
label has to be set via
--deduplication.replica-label flag.
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed
blocks older than the maximum of
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/prometheus/common v0.23.0
github.com/prometheus/exporter-toolkit v0.5.1
github.com/prometheus/prometheus v1.8.2-0.20210519120135-d95b0972505f
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.28.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624
Expand Down
6 changes: 6 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

const (
// DedupAlgorithmPenalty is the penalty based compactor series merge algorithm.
// This is the same as the online deduplication of querier except counter reset handling.
DedupAlgorithmPenalty = "penalty"
)

// Syncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -167,7 +170,16 @@ func MetricCount(c prometheus.Collector) int {
return mCount
}

func TestGroup_Compact_e2e(t *testing.T) {
func TestGroupCompactE2E(t *testing.T) {
testGroupCompactE2e(t, nil)
}

// Penalty based merger should get the same result as the blocks don't have overlap.
func TestGroupCompactPenaltyDedupE2E(t *testing.T) {
testGroupCompactE2e(t, dedup.NewDedupChunkSeriesMerger())
}

func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
Expand All @@ -194,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
testutil.Ok(t, err)

planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000})
Expand Down
Loading

0 comments on commit 3a1c8ef

Please sign in to comment.