Skip to content

Commit

Permalink
compactor: Add offline deduplication (#4239)
Browse files Browse the repository at this point in the history
* add offline dedup compactor

Signed-off-by: yeya24 <yb532204897@gmail.com>

* add tests for downsampled block

Signed-off-by: yeya24 <yb532204897@gmail.com>

* address review comment

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update flag description

Signed-off-by: yeya24 <yb532204897@gmail.com>

* make deduplication label unhidden

Signed-off-by: yeya24 <yb532204897@gmail.com>

* fix doc lint

Signed-off-by: yeya24 <yb532204897@gmail.com>

* address review comments

Signed-off-by: yeya24 <yb532204897@gmail.com>

* change tmp dir

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update docs

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>

* fix lint

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored Jun 16, 2021
1 parent 86cff01 commit f75a233
Show file tree
Hide file tree
Showing 9 changed files with 825 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

- [#4299](https://github.com/thanos-io/thanos/pull/4299) Tracing: Add tracing to exemplar APIs.
- [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags.
- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor.

### Fixed

Expand Down
39 changes: 32 additions & 7 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
Expand Down Expand Up @@ -302,9 +305,25 @@ func runCompact(
cancel()
}
}()

var mergeFunc storage.VerticalChunkSeriesMergeFunc
switch conf.dedupFunc {
case compact.DedupAlgorithmPenalty:
mergeFunc = dedup.NewChunkSeriesMerger()

if len(conf.dedupReplicaLabels) == 0 {
return errors.New("penalty based deduplication needs at least one replica label specified")
}
case "":
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)

default:
return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc)
}

// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc)
if err != nil {
return errors.Wrap(err, "create compactor")
}
Expand Down Expand Up @@ -564,6 +583,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
dedupFunc string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -622,16 +642,21 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h").SetValue(&cc.deleteDelay)

cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more. "+
"Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "")

cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().StringsVar(&cc.dedupReplicaLabels)
"Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+
"If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func.").
StringsVar(&cc.dedupReplicaLabels)

// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+
Expand Down
40 changes: 34 additions & 6 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ which compacts overlapping blocks into single one. This is mainly used for **bac

In Thanos, it works similarly, but on bigger scale and using external labels for grouping as explained in [Compaction section](#compaction).

In both systems, series with the same labels are merged together. Merging samples is **naive**. It works by deduplicating samples within
exactly the same timestamps. Otherwise samples are added in sorted by time order.
In both systems, series with the same labels are merged together. In prometheus, merging samples is **naive**. It works by deduplicating samples within
exactly the same timestamps. Otherwise samples are added in sorted by time order. Thanos also support a new penalty based samples merger and it is explained in [Deduplication](#Vertical Compaction Use Cases).

> **NOTE:** Both Prometheus and Thanos default behaviour is to fail compaction if any overlapping blocks are spotted. (For Thanos, within the same external labels).

Expand All @@ -101,12 +101,12 @@ There can be few valid use cases for vertical compaction:
* Races between multiple compactions, for example multiple compactors or between compactor and Prometheus compactions. While this will have extra
computation overhead for Compactor it's safe to enable vertical compaction for this case.
* Backfilling. If you want to add blocks of data to any stream where there is existing data already there for the time range, you will need enabled vertical compaction.
* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series duplications, `one-to-one` and `realistic`:
* `one-to-one` duplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps.
* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series deduplications, `one-to-one` and `penalty`:
* `one-to-one` deduplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps.
This is very common while using [Receivers](../components/receive.md) with replication greater than 1 as receiver replication copies exactly the same timestamps and values to different receive instances.
* `realistic` duplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally
* `penalty` deduplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally
this requires more complex deduplication algorithms. For example one that is used to [deduplicate on the fly on the Querier](query.md#run-time-deduplication-of-ha-groups). This is common
case when Prometheus HA replicas are used. [Offline deduplication for this is in progress](https://github.com/thanos-io/thanos/issues/1014).
case when Prometheus HA replicas are used. You can enable this deduplication via `--deduplication.func=penalty` flag.

#### Vertical Compaction Risks

Expand All @@ -115,6 +115,8 @@ The main risk is the **irreversible** implications of potential configuration er
* If you accidentally upload block with the same external labels but produced by totally different Prometheus for totally different applications, some metrics can overlap
and potentially can merge together making such series useless.
* If you merge disjoint series in multiple of blocks together, there is currently no easy way to split them back.
* The `penalty` offline deduplication algorithm has its own limitation. Even though it has been battle-tested for quite a long time but still very few issues come up from time to time
such as https://github.com/thanos-io/thanos/issues/2890. If you'd like to enable this deduplication algorithm, please take the risk and make sure you back up your data.

#### Enabling Vertical Compaction

Expand Down Expand Up @@ -143,6 +145,8 @@ external_labels: {cluster="us1", receive="true", environment="staging"}

On next compaction multiple streams' blocks will be compacted into one.

If you need a different deduplication algorithm, use `deduplication.func` flag. The default value is the original `one-to-one` deduplication.

## Enforcing Retention of Data

By default, there is NO retention set for object storage data. This means that you store data for unlimited time, which is a valid and recommended way of running Thanos.
Expand Down Expand Up @@ -349,6 +353,30 @@ Flags:
consistency-delay and 48h0m0s will be removed.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--deduplication.func= Experimental. Deduplication algorithm for
merging overlapping blocks. Possible values are:
"", "penalty". If no value is specified, the
default compact deduplication merger is used,
which performs 1:1 deduplication for samples.
When set to penalty, penalty based deduplication
algorithm will be used. At least one replica
label has to be set via
--deduplication.replica-label flag.
--deduplication.replica-label=DEDUPLICATION.REPLICA-LABEL ...
Label to treat as a replica indicator of blocks
that can be deduplicated (repeated flag). This
will merge multiple replica blocks into one.
This process is irreversible.Experimental. When
one or more labels are set, compactor will
ignore the given labels so that vertical
compaction can merge the blocks.Please note that
by default this uses a NAIVE algorithm for
merging which works well for deduplication of
blocks with **precisely the same samples** like
produced by Receiver replication.If you need a
different deduplication algorithm (e.g one that
works well with Prometheus replicas), please set
it via --deduplication.func.
--delete-delay=48h Time before a block marked for deletion is
deleted from bucket. If delete-delay is non
zero, blocks will be marked for deletion and
Expand Down
6 changes: 6 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

const (
// DedupAlgorithmPenalty is the penalty based compactor series merge algorithm.
// This is the same as the online deduplication of querier except counter reset handling.
DedupAlgorithmPenalty = "penalty"
)

// Syncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -167,7 +170,16 @@ func MetricCount(c prometheus.Collector) int {
return mCount
}

func TestGroup_Compact_e2e(t *testing.T) {
func TestGroupCompactE2E(t *testing.T) {
testGroupCompactE2e(t, nil)
}

// Penalty based merger should get the same result as the blocks don't have overlap.
func TestGroupCompactPenaltyDedupE2E(t *testing.T) {
testGroupCompactE2e(t, dedup.NewChunkSeriesMerger())
}

func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
Expand All @@ -194,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
testutil.Ok(t, err)

planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000})
Expand Down
Loading

0 comments on commit f75a233

Please sign in to comment.