Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: Add offline deduplication #4239

Merged
merged 12 commits into from
Jun 16, 2021
4 changes: 2 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h").SetValue(&cc.deleteDelay)

cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+
cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more. "+
"Please note that by default this uses a NAIVE algorithm for merging. If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func."+
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)
Expand All @@ -653,7 +653,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "")

cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
"Experimental. When it is set to true, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
"Please note that by default this uses a NAIVE algorithm for merging which works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."+
"If you need a different deduplication algorithm (e.g one that works well with Prometheus replicas), please set it via --deduplication.func.").
StringsVar(&cc.dedupReplicaLabels)
Expand Down
20 changes: 10 additions & 10 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,16 @@ Flags:
that can be deduplicated (repeated flag). This
will merge multiple replica blocks into one.
This process is irreversible.Experimental. When
it is set to true, compactor will ignore the
given labels so that vertical compaction can
merge the blocks.Please note that by default
this uses a NAIVE algorithm for merging which
works well for deduplication of blocks with
**precisely the same samples** like produced by
Receiver replication.If you need a different
deduplication algorithm (e.g one that works well
with Prometheus replicas), please set it via
--deduplication.func.
one or more labels are set, compactor will
ignore the given labels so that vertical
compaction can merge the blocks.Please note that
by default this uses a NAIVE algorithm for
merging which works well for deduplication of
blocks with **precisely the same samples** like
produced by Receiver replication.If you need a
different deduplication algorithm (e.g one that
works well with Prometheus replicas), please set
it via --deduplication.func.
--delete-delay=48h Time before a block marked for deletion is
deleted from bucket. If delete-delay is non
zero, blocks will be marked for deletion and
Expand Down
2 changes: 1 addition & 1 deletion docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ Flags:
--rewrite.to-relabel-config-file=<file-path>
Path to YAML file that contains relabel configs
that will be applied to blocks
--tmp.dir="/tmp/thanos-rewrite"
--tmp.dir="/var/folders/0v/x6r6mgjx14ldyx7tfmpmx3gw0000gn/T/thanos-rewrite"
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
Working directory for temporary files
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
Expand Down
49 changes: 15 additions & 34 deletions pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func newAggrChunkIterator(iters [5]chunkenc.Iterator) chunks.Iterator {

func (a *aggrChunkIterator) Next() bool {
if !a.countChkIter.Next() {
if err := a.countChkIter.Err(); err != nil {
a.err = err
}
return false
}

Expand All @@ -266,40 +269,15 @@ func (a *aggrChunkIterator) Next() bool {
)

chks[downsample.AggrCount] = countChk.Chunk
chk, err = a.toChunk(downsample.AggrSum, mint, maxt)
if err != nil {
a.err = err
return false
}
if chk != nil {
chks[downsample.AggrSum] = chk.Chunk
}

chk, err = a.toChunk(downsample.AggrMin, mint, maxt)
if err != nil {
a.err = err
return false
}
if chk != nil {
chks[downsample.AggrMin] = chk.Chunk
}

chk, err = a.toChunk(downsample.AggrMax, mint, maxt)
if err != nil {
a.err = err
return false
}
if chk != nil {
chks[downsample.AggrMax] = chk.Chunk
}

chk, err = a.toChunk(downsample.AggrCounter, mint, maxt)
if err != nil {
a.err = err
return false
}
if chk != nil {
chks[downsample.AggrCounter] = chk.Chunk
for i := downsample.AggrSum; i <= downsample.AggrCounter; i++ {
chk, err = a.toChunk(i, mint, maxt)
if err != nil {
a.err = err
return false
}
if chk != nil {
chks[i] = chk.Chunk
}
}

a.curr = chunks.Meta{
Expand Down Expand Up @@ -338,6 +316,9 @@ func (a *aggrChunkIterator) toChunk(at downsample.AggrType, minTime, maxTime int
lastT, lastV = it.At()
appender.Append(lastT, lastV)
}
if err := it.Err(); err != nil {
return nil, err
}

// No sample in the required time range.
if lastT == 0 && lastV == 0 {
Expand Down