Skip to content

Commit

Permalink
address review comment
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed May 27, 2021
1 parent 82dac08 commit 6316e8a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 28 deletions.
11 changes: 5 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func runCompact(
var mergeFunc storage.VerticalChunkSeriesMergeFunc
switch conf.dedupFunc {
case compact.DedupAlgorithmPenalty:
mergeFunc = dedup.NewDedupChunkSeriesMerger()
mergeFunc = dedup.NewChunkSeriesMerger()

if len(conf.dedupReplicaLabels) == 0 {
return errors.New("penalty based deduplication needs at least one replica label specified")
Expand Down Expand Up @@ -638,20 +638,19 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("48h").SetValue(&cc.deleteDelay)

cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+
"Please note that this uses a NAIVE algorithm for merging. If you need a smarter deduplication algorithm, please set it via -- compact.dedup-func."+
"Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("compact.dedup-func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "")

// Update this. This flag works for both dedup version compactor and the original compactor.
cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
"Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+
"If you need a smarter deduplication algorithm, please set it via --deduplication.func.").
Hidden().StringsVar(&cc.dedupReplicaLabels)

// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
Expand Down
28 changes: 15 additions & 13 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ which compacts overlapping blocks into single one. This is mainly used for **bac

In Thanos, it works similarly, but on bigger scale and using external labels for grouping as explained in [Compaction section](#compaction).

In both systems, series with the same labels are merged together. Merging samples is **naive**. It works by deduplicating samples within
exactly the same timestamps. Otherwise samples are added in sorted by time order.
In both systems, series with the same labels are merged together. In prometheus, merging samples is **naive**. It works by deduplicating samples within
exactly the same timestamps. Otherwise samples are added in sorted by time order. Thanos also support a new penalty based samples merger and it is explained in [Deduplication](#Vertical Compaction Use Cases).

> **NOTE:** Both Prometheus and Thanos default behaviour is to fail compaction if any overlapping blocks are spotted. (For Thanos, within the same external labels).

Expand All @@ -101,12 +101,12 @@ There can be few valid use cases for vertical compaction:
* Races between multiple compactions, for example multiple compactors or between compactor and Prometheus compactions. While this will have extra
computation overhead for Compactor it's safe to enable vertical compaction for this case.
* Backfilling. If you want to add blocks of data to any stream where there is existing data already there for the time range, you will need enabled vertical compaction.
* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series duplications, `one-to-one` and `realistic`:
* `one-to-one` duplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps.
* Offline deduplication of series. It's very common to have the same data replicated into multiple streams. We can distinguish two common series deduplications, `one-to-one` and `penalty`:
* `one-to-one` deduplication is when same series (series with the same labels from different blocks) for the same range have **exactly** the same samples: Same values and timestamps.
This is very common while using [Receivers](../components/receive.md) with replication greater than 1 as receiver replication copies exactly the same timestamps and values to different receive instances.
* `realistic` duplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally
* `penalty` deduplication is when same series data is **logically duplicated**. For example, it comes from the same application, but scraped by two different Prometheus-es. Ideally
this requires more complex deduplication algorithms. For example one that is used to [deduplicate on the fly on the Querier](query.md#run-time-deduplication-of-ha-groups). This is common
case when Prometheus HA replicas are used. [Offline deduplication for this is in progress](https://github.com/thanos-io/thanos/issues/1014).
case when Prometheus HA replicas are used. You can enable this deduplication via `--deduplication.func=penalty` flag.

#### Vertical Compaction Risks

Expand Down Expand Up @@ -143,6 +143,8 @@ external_labels: {cluster="us1", receive="true", environment="staging"}

On next compaction multiple streams' blocks will be compacted into one.

If you need a different deduplication algorithm, use `deduplication.func` flag. The default value is the original `one-to-one` deduplication.

## Enforcing Retention of Data

By default, there is NO retention set for object storage data. This means that you store data for unlimited time, which is a valid and recommended way of running Thanos.
Expand Down Expand Up @@ -343,7 +345,13 @@ Flags:
happen at the end of an iteration.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--compact.dedup-func= Experimental. Deduplication algorithm for
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed
blocks older than the maximum of
consistency-delay and 48h0m0s will be removed.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--deduplication.func= Experimental. Deduplication algorithm for
merging overlapping blocks. Possible values are:
"", "penalty". If no value is specified, the
default compact deduplication merger is used,
Expand All @@ -352,12 +360,6 @@ Flags:
algorithm will be used. At least one replica
label has to be set via
--deduplication.replica-label flag.
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed
blocks older than the maximum of
consistency-delay and 48h0m0s will be removed.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--delete-delay=48h Time before a block marked for deletion is
deleted from bucket. If delete-delay is non
zero, blocks will be marked for deletion and
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGroupCompactE2E(t *testing.T) {

// Penalty based merger should get the same result as the blocks don't have overlap.
func TestGroupCompactPenaltyDedupE2E(t *testing.T) {
testGroupCompactE2e(t, dedup.NewDedupChunkSeriesMerger())
testGroupCompactE2e(t, dedup.NewChunkSeriesMerger())
}

func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

// NewDedupChunkSeriesMerger merges several chunk series into one.
// NewChunkSeriesMerger merges several chunk series into one.
// Deduplication is based on penalty based deduplication algorithm without handling counter reset.
func NewDedupChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc {
func NewChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc {
return func(series ...storage.ChunkSeries) storage.ChunkSeries {
if len(series) == 0 {
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/dedup/chunk_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func TestDedupChunkSeriesMerger(t *testing.T) {
m := NewDedupChunkSeriesMerger()
m := NewChunkSeriesMerger()

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
),
},
{
name: "150 overlapping samples, split chunk",
name: "150 overlapping samples, no chunk splitting due to penalty deduplication",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90)
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150)
Expand All @@ -144,15 +144,15 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
}

func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) {
m := NewDedupChunkSeriesMerger()
m := NewChunkSeriesMerger()

defaultLabels := labels.FromStrings("bar", "baz")
emptySamples := downsample.SamplesFromTSDBSamples([]tsdbutil.Sample{})
// Samples are created with step 1m. So the 5m downsampled chunk has 2 samples.
samples1 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(0, 10, 60*1000))
// Non overlapping samples with samples1. 5m downsampled chunk has 2 samples.
samples2 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(600000, 10, 60*1000))

// Overlapped with samples1.
samples3 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(120000, 10, 60*1000))

for _, tc := range []struct {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {

extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica"}
if penaltyDedup {
extArgs = append(extArgs, "--compact.dedup-func=penalty")
extArgs = append(extArgs, "--deduplication.func=penalty")
}

// We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction.
Expand Down Expand Up @@ -740,7 +740,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) {
extArgs := []string{"--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s"}
if penaltyDedup {
extArgs = append(extArgs, "--compact.dedup-func=penalty")
extArgs = append(extArgs, "--deduplication.func=penalty")
}
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...)
testutil.Ok(t, err)
Expand Down

0 comments on commit 6316e8a

Please sign in to comment.