Skip to content

Commit

Permalink
Preserve reset hint during histogram deduplication
Browse files Browse the repository at this point in the history
Penalty based deduplication can over-skip samples because when switching
between replicas, because it uses a penalty of 2 times the estimated scrape interval.
This works okay for float counters because the reset is detected based on the value of the counter.
With native histograms, detecting a reset is much more expensive which is why there is a
hint stored in the first sample of a chunk, which indicates whether the chunk has been created
because of a reset.

It can easily happen that the dedup iterator switches replicas, and skips the first sample of a chunk
because of the added penalty. In this case we will not read the hint and will assume that no
histogram reset happened.

This commit fixes the issue by explicitly calling DetectReset if a replica stream has been switched.
The result is then stored and set in the histogram returned by AtHistogram and AtFloatHistogram.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jan 26, 2024
1 parent fce0fe2 commit f8cf2fe
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 116 deletions.
64 changes: 32 additions & 32 deletions pkg/dedup/chunk_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
{
name: "single series",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 3, f: 3}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 3, f: 3}}),
},
{
name: "two empty series",
Expand All @@ -49,66 +49,66 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
{
name: "two non overlapping",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 7, f: 7}, sample{t: 9, f: 9}}, []chunks.Sample{sample{t: 10, f: 10}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{5, 5}}, []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 3, f: 3}, sample{t: 5, f: 5}}, []chunks.Sample{sample{t: 7, f: 7}, sample{t: 9, f: 9}}, []chunks.Sample{sample{t: 10, f: 10}}),
},
{
name: "two overlapping",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{8, 8}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 3, f: 3}, sample{t: 8, f: 8}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 7, f: 7}, sample{t: 9, f: 9}}, []chunks.Sample{sample{t: 10, f: 10}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{8, 8}}, []chunks.Sample{sample{10, 10}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 3, f: 3}, sample{t: 8, f: 8}}, []chunks.Sample{sample{t: 10, f: 10}}),
},
{
name: "two overlapping with large time diff",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{2, 2}, sample{5008, 5008}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}}, []chunks.Sample{sample{t: 2, f: 2}, sample{t: 5008, f: 5008}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 7, f: 7}, sample{t: 9, f: 9}}, []chunks.Sample{sample{t: 10, f: 10}}),
},
// sample{5008, 5008} is added to the result due to its large timestamp.
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}, sample{t: 5008, f: 5008}}),
},
{
name: "two duplicated",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
},
{
name: "three overlapping",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{0, 0}, sample{4, 4}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 6, f: 6}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 0, f: 0}, sample{t: 4, f: 4}}),
},
// only samples from the last series are retained due to high penalty.
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{0, 0}, sample{4, 4}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 0, f: 0}, sample{t: 4, f: 4}}),
},
{
name: "three in chained overlap",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{4, 4}, sample{6, 66}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{6, 6}, sample{10, 10}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 4, f: 4}, sample{t: 6, f: 66}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 6, f: 6}, sample{t: 10, f: 10}}),
},
// only samples from the last series are retained due to high penalty.
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 1, f: 1}, sample{t: 2, f: 2}, sample{t: 3, f: 3}, sample{t: 5, f: 5}}),
},
{
name: "three in chained overlap complex",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{0, 0}, sample{5, 5}}, []chunks.Sample{sample{10, 10}, sample{15, 15}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{2, 2}, sample{20, 20}}, []chunks.Sample{sample{25, 25}, sample{30, 30}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{18, 18}, sample{26, 26}}, []chunks.Sample{sample{31, 31}, sample{35, 35}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 0, f: 0}, sample{t: 5, f: 5}}, []chunks.Sample{sample{t: 10, f: 10}, sample{t: 15, f: 15}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 2, f: 2}, sample{t: 20, f: 20}}, []chunks.Sample{sample{t: 25, f: 25}, sample{t: 30, f: 30}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{t: 18, f: 18}, sample{t: 26, f: 26}}, []chunks.Sample{sample{t: 31, f: 31}, sample{t: 35, f: 35}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]chunks.Sample{sample{0, 0}, sample{5, 5}},
[]chunks.Sample{sample{31, 31}, sample{35, 35}},
[]chunks.Sample{sample{t: 0, f: 0}, sample{t: 5, f: 5}},
[]chunks.Sample{sample{t: 31, f: 31}, sample{t: 35, f: 35}},
),
},
{
Expand Down Expand Up @@ -288,11 +288,11 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) {
Lset: defaultLabels,
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
samples := [][]chunks.Sample{
{sample{299999, 3}, sample{540000, 5}},
{sample{299999, 540000}, sample{540000, 2100000}},
{sample{299999, 120000}, sample{540000, 300000}},
{sample{299999, 240000}, sample{540000, 540000}},
{sample{299999, 240000}, sample{299999, 240000}},
{sample{t: 299999, f: 3}, sample{t: 540000, f: 5}},
{sample{t: 299999, f: 540000}, sample{t: 540000, f: 2100000}},
{sample{t: 299999, f: 120000}, sample{t: 540000, f: 300000}},
{sample{t: 299999, f: 240000}, sample{t: 540000, f: 540000}},
{sample{t: 299999, f: 240000}, sample{t: 299999, f: 240000}},
}
var chks [5]chunkenc.Chunk
for i, s := range samples {
Expand Down
41 changes: 39 additions & 2 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ type adjustableSeriesIterator interface {
// adjustAtValue allows to adjust value by implementation if needed knowing the last value. This is used by counter
// implementation which can adjust for obsolete counter value.
adjustAtValue(lastFloatValue float64)

// forceHistogramResetDetection can be used to forces an UnknownCounterReset to be returned in the next histogram.
forceHistogramResetDetection(force bool)
}

type noopAdjustableSeriesIterator struct {
Expand All @@ -379,6 +382,8 @@ type noopAdjustableSeriesIterator struct {

func (it noopAdjustableSeriesIterator) adjustAtValue(float64) {}

func (it noopAdjustableSeriesIterator) forceHistogramResetDetection(bool) {}

// counterErrAdjustSeriesIterator is extendedSeriesIterator used when we deduplicate counter.
// It makes sure we always adjust for the latest seen last counter value for all replicas.
// Let's consider following example:
Expand All @@ -404,7 +409,8 @@ func (it noopAdjustableSeriesIterator) adjustAtValue(float64) {}
type counterErrAdjustSeriesIterator struct {
chunkenc.Iterator

errAdjust float64
errAdjust float64
forceResetDetection bool
}

func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastFloatValue float64) {
Expand All @@ -415,11 +421,31 @@ func (it *counterErrAdjustSeriesIterator) adjustAtValue(lastFloatValue float64)
}
}

func (it *counterErrAdjustSeriesIterator) forceHistogramResetDetection(force bool) {
it.forceResetDetection = force
}

func (it *counterErrAdjustSeriesIterator) At() (int64, float64) {
t, v := it.Iterator.At()
return t, v + it.errAdjust
}

func (it *counterErrAdjustSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
t, h := it.Iterator.AtHistogram()
if it.forceResetDetection {
h.CounterResetHint = histogram.UnknownCounterReset
}
return t, h
}

func (it *counterErrAdjustSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
t, fh := it.Iterator.AtFloatHistogram()
if it.forceResetDetection {
fh.CounterResetHint = histogram.UnknownCounterReset
}
return t, fh
}

type dedupSeriesIterator struct {
a, b adjustableSeriesIterator

Expand Down Expand Up @@ -449,7 +475,13 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
lastFloatVal, isFloatVal := it.lastFloatVal()
lastUseA := it.useA
defer func() {
if it.useA != lastUseA && isFloatVal {
switchedReplica := it.useA != lastUseA
if !switchedReplica {
it.forceHistogramResetDetection(false)
return
}
it.forceHistogramResetDetection(true)
if isFloatVal {
// We switched replicas.
// Ensure values are correct bases on value before At.
// TODO(rabenhorst): Investigate if we also need to implement adjusting histograms here.
Expand Down Expand Up @@ -544,6 +576,11 @@ func (it *dedupSeriesIterator) adjustAtValue(lastFloatValue float64) {
}
}

func (it *dedupSeriesIterator) forceHistogramResetDetection(force bool) {
it.a.forceHistogramResetDetection(force)
it.b.forceHistogramResetDetection(force)
}

func (it *dedupSeriesIterator) Seek(t int64) chunkenc.ValueType {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
Expand Down
Loading

0 comments on commit f8cf2fe

Please sign in to comment.