Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: Add offline deduplication #4239

Merged
merged 12 commits into from
Jun 16, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

- [#4299](https://github.com/thanos-io/thanos/pull/4299) Tracing: Add tracing to exemplar APIs.
- [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags.
- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor.

### Fixed

Expand Down
39 changes: 32 additions & 7 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
Expand Down Expand Up @@ -302,9 +305,25 @@ func runCompact(
cancel()
}
}()

var mergeFunc storage.VerticalChunkSeriesMergeFunc
switch conf.dedupFunc {
case compact.DedupAlgorithmPenalty:
mergeFunc = dedup.NewChunkSeriesMerger()

if len(conf.dedupReplicaLabels) == 0 {
return errors.New("penalty based deduplication needs at least one replica label specified")
}
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
case "":
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)

default:
return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc)
}

// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc)
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "create compactor")
}
Expand Down Expand Up @@ -564,6 +583,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
dedupFunc string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -622,16 +642,21 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h").SetValue(&cc.deleteDelay)

cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more. "+
"Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "")

cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
"Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().StringsVar(&cc.dedupReplicaLabels)
"Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+
"If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func.").
StringsVar(&cc.dedupReplicaLabels)

// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+
Expand Down
40 changes: 34 additions & 6 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ which compacts overlapping blocks into single one. This is mainly used for **bac

In Thanos, it works similarly, but on bigger scale and using external labels for grouping as explained in [Compaction section](#compaction).

In both systems, series with the same labels are merged together. Merging samples is **naive**. It works by deduplicating samples within
exactly the same timestamps. Otherwise samples are added in sorted by time order.
In both systems, series with the same labels are merged together. In prometheus, merging samples is **naive**. It works by deduplicating samples within
exactly the same timestamps. Otherwise samples are added in sorted by time order. Thanos also support a new penalty based samples merger and it is explained in [Deduplication](#Vertical Compaction Use Cases).

> **NOTE:** Both Prometheus and Thanos default behaviour is to fail compaction if any overlapping blocks are spotted. (For Thanos, within the same external labels).

Expand All @@ -101,12 +101,12 @@ There can be few valid use cases for vertical compaction:
* Races between multiple compactions, for example multiple compactors or between compactor and Prometheus compactions. While this will have extra
computation overhead for Compactor it's safe to enable vertical compaction for this case.
* Backfilling. If you want to add blocks of data to any stream where there is existing data already there for the time range, you will need enabled vertical compaction.
* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series duplications, `one-to-one` and `realistic`:
* `one-to-one` duplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps.
* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series deduplications, `one-to-one` and `penalty`:
* `one-to-one` deduplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps.
This is very common while using [Receivers](../components/receive.md) with replication greater than 1 as receiver replication copies exactly the same timestamps and values to different receive instances.
* `realistic` duplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally
* `penalty` deduplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally
this requires more complex deduplication algorithms. For example one that is used to [deduplicate on the fly on the Querier](query.md#run-time-deduplication-of-ha-groups). This is common
case when Prometheus HA replicas are used. [Offline deduplication for this is in progress](https://github.com/thanos-io/thanos/issues/1014).
case when Prometheus HA replicas are used. You can enable this deduplication via `--deduplication.func=penalty` flag.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please clarify in the docs in exactly which 1% of the cases the penalty-based deduplication algorithm doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the 1% case. I only know it is related to counter reset/ rate functions. Is this #2890 the right issue for it?

@bwplotka @kakkoyun for more input.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's say something like:

even though the deduplication algorithm has been battle-tested for quite a long time but still very few issues come up from time to time such as https://github.com/thanos-io/thanos/issues/2890

? It would be more concrete. And it wouldn't sound like we know that it is bad in some 1% of cases & we aren't fixing it 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated docs. PTAL


#### Vertical Compaction Risks

Expand All @@ -115,6 +115,8 @@ The main risk is the **irreversible** implications of potential configuration er
* If you accidentally upload block with the same external labels but produced by totally different Prometheus for totally different applications, some metrics can overlap
and potentially can merge together making such series useless.
* If you merge disjoint series in multiple of blocks together, there is currently no easy way to split them back.
* The `penalty` offline deduplication algorithm has its own limitation. Even though it has been battle-tested for quite a long time but still very few issues come up from time to time
such as https://github.com/thanos-io/thanos/issues/2890. If you'd like to enable this deduplication algorithm, please take the risk and make sure you back up your data.

#### Enabling Vertical Compaction

Expand Down Expand Up @@ -143,6 +145,8 @@ external_labels: {cluster="us1", receive="true", environment="staging"}

On next compaction multiple streams' blocks will be compacted into one.

If you need a different deduplication algorithm, use `deduplication.func` flag. The default value is the original `one-to-one` deduplication.

## Enforcing Retention of Data

By default, there is NO retention set for object storage data. This means that you store data for unlimited time, which is a valid and recommended way of running Thanos.
Expand Down Expand Up @@ -349,6 +353,30 @@ Flags:
consistency-delay and 48h0m0s will be removed.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--deduplication.func= Experimental. Deduplication algorithm for
merging overlapping blocks. Possible values are:
"", "penalty". If no value is specified, the
default compact deduplication merger is used,
which performs 1:1 deduplication for samples.
When set to penalty, penalty based deduplication
algorithm will be used. At least one replica
label has to be set via
--deduplication.replica-label flag.
--deduplication.replica-label=DEDUPLICATION.REPLICA-LABEL ...
Label to treat as a replica indicator of blocks
that can be deduplicated (repeated flag). This
will merge multiple replica blocks into one.
This process is irreversible.Experimental. When
one or more labels are set, compactor will
ignore the given labels so that vertical
compaction can merge the blocks.Please note that
by default this uses a NAIVE algorithm for
merging which works well for deduplication of
blocks with **precisely the same samples** like
produced by Receiver replication.If you need a
different deduplication algorithm (e.g one that
works well with Prometheus replicas), please set
it via --deduplication.func.
--delete-delay=48h Time before a block marked for deletion is
deleted from bucket. If delete-delay is non
zero, blocks will be marked for deletion and
Expand Down
6 changes: 6 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

const (
// DedupAlgorithmPenalty is the penalty based compactor series merge algorithm.
// This is the same as the online deduplication of querier except counter reset handling.
DedupAlgorithmPenalty = "penalty"
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
)

// Syncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -167,7 +170,16 @@ func MetricCount(c prometheus.Collector) int {
return mCount
}

func TestGroup_Compact_e2e(t *testing.T) {
func TestGroupCompactE2E(t *testing.T) {
testGroupCompactE2e(t, nil)
}

// Penalty based merger should get the same result as the blocks don't have overlap.
func TestGroupCompactPenaltyDedupE2E(t *testing.T) {
testGroupCompactE2e(t, dedup.NewChunkSeriesMerger())
}

func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
Expand All @@ -194,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
testutil.Ok(t, err)

planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000})
Expand Down
Loading