Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: properly treat native histogram deduplication in chunk series merger #7164

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (d *dedupChunksIterator) At() chunks.Meta {
}

// Next method is almost the same as https://github.com/prometheus/prometheus/blob/v2.27.1/storage/merge.go#L615.
// The difference is that it handles both XOR and Aggr chunk Encoding.
// The difference is that it handles both XOR/Histogram/FloatHistogram and Aggr chunk Encoding.
func (d *dedupChunksIterator) Next() bool {
if d.h == nil {
for _, iter := range d.iterators {
Expand Down Expand Up @@ -151,17 +151,21 @@ func (h *chunkIteratorHeap) Pop() interface{} {
}

type overlappingMerger struct {
xorIterators []chunkenc.Iterator
aggrIterators [5][]chunkenc.Iterator
xorIterators []chunkenc.Iterator
histIterators []chunkenc.Iterator
floatHistIterators []chunkenc.Iterator
aggrIterators [5][]chunkenc.Iterator

samplesMergeFunc func(a, b chunkenc.Iterator) chunkenc.Iterator
}

func newOverlappingMerger() *overlappingMerger {
return &overlappingMerger{
samplesMergeFunc: func(a, b chunkenc.Iterator) chunkenc.Iterator {
it := noopAdjustableSeriesIterator{a}
return newDedupSeriesIterator(it, noopAdjustableSeriesIterator{b})
return newDedupSeriesIterator(
noopAdjustableSeriesIterator{a},
noopAdjustableSeriesIterator{b},
)
},
}
}
Expand All @@ -170,20 +174,26 @@ func (o *overlappingMerger) addChunk(chk chunks.Meta) {
switch chk.Chunk.Encoding() {
case chunkenc.EncXOR:
o.xorIterators = append(o.xorIterators, chk.Chunk.Iterator(nil))
case chunkenc.EncFloatHistogram:
o.floatHistIterators = append(o.floatHistIterators, chk.Chunk.Iterator(nil))
case chunkenc.EncHistogram:
o.histIterators = append(o.histIterators, chk.Chunk.Iterator(nil))
case downsample.ChunkEncAggr:
aggrChk := chk.Chunk.(*downsample.AggrChunk)
for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ {
if c, err := aggrChk.Get(i); err == nil {
o.aggrIterators[i] = append(o.aggrIterators[i], c.Iterator(nil))
}
}
case chunkenc.EncNone:
default:
// exhausted options for chunk
return
}
}

func (o *overlappingMerger) empty() bool {
// OverlappingMerger only contains either xor chunk or aggr chunk.
// If xor chunks are present then we don't need to check aggr chunks.
if len(o.xorIterators) > 0 {
if len(o.xorIterators) > 0 || len(o.histIterators) > 0 || len(o.floatHistIterators) > 0 {
return false
}
return len(o.aggrIterators[downsample.AggrCount]) == 0
Expand All @@ -204,6 +214,28 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator {
return it
}}).Iterator(nil)

case chunkenc.EncHistogram:
return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
it = baseChk.Chunk.Iterator(nil)
for _, i := range o.histIterators {
it = o.samplesMergeFunc(it, i)
}
return it
},
}).Iterator(nil)

case chunkenc.EncFloatHistogram:
return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
it = baseChk.Chunk.Iterator(nil)
for _, i := range o.floatHistIterators {
it = o.samplesMergeFunc(it, i)
}
return it
},
}).Iterator(nil)

case downsample.ChunkEncAggr:
// If Aggr encoding, each aggregated chunks need to be expanded and deduplicated,
// then re-encoded into Aggr chunks.
Expand All @@ -225,10 +257,9 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator {
}

return newAggrChunkIterator(samplesIter)
default:
return nil
}

// Impossible for now.
return nil
}

type aggrChunkIterator struct {
Expand Down
138 changes: 136 additions & 2 deletions pkg/dedup/chunk_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package dedup
import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/compact/downsample"
)

Expand Down Expand Up @@ -132,7 +132,9 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
merged := m(tc.input...)
testutil.Equals(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := storage.ExpandChunks(merged.Iterator(nil))
Expand Down Expand Up @@ -321,6 +323,138 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) {
}
}

type histoSample struct {
t int64
f float64
h *histogram.Histogram
fh *histogram.FloatHistogram
}

func (h histoSample) T() int64 {
return h.t
}

func (h histoSample) F() float64 {
return h.f
}

func (h histoSample) H() *histogram.Histogram {
return h.h
}

func (h histoSample) FH() *histogram.FloatHistogram {
return h.fh
}

func (h histoSample) Type() chunkenc.ValueType {
if h.fh != nil {
return chunkenc.ValFloatHistogram
}
if h.h != nil {
return chunkenc.ValHistogram
}
return chunkenc.ValFloat
}

var histogramSample = &histogram.Histogram{
Schema: 0,
Count: 20,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
CounterResetHint: histogram.UnknownCounterReset,
}

func TestDedupChunkSeriesMerger_Histogram(t *testing.T) {
scrapeIntervalMilli := int64(30_000)

testCases := []struct {
name string
input []storage.ChunkSeries
expected storage.ChunkSeries
}{
{
name: "two overlapping - Histogram and Histogram",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 0 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 2 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 3 * scrapeIntervalMilli, h: histogramSample},
}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 2 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 3 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 4 * scrapeIntervalMilli, h: histogramSample},
}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 0 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 1 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 2 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 3 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 4 * scrapeIntervalMilli, h: histogramSample},
}),
},
{
name: "overlapping mixed - XOR then Histogram - panic repro case",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, f: 1},
histoSample{t: 2 * scrapeIntervalMilli, f: 1},
}, []chunks.Sample{
histoSample{t: 5 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 6 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 7 * scrapeIntervalMilli, h: histogramSample},
}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, f: 1},
histoSample{t: 2 * scrapeIntervalMilli, f: 1},
}, []chunks.Sample{
histoSample{t: 5 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 6 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 7 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 8 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 9 * scrapeIntervalMilli, h: histogramSample},
},
),
},
expected: storage.NewListChunkSeriesFromSamples(
labels.FromStrings("bar", "baz"),
[]chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, f: 1},
histoSample{t: 2 * scrapeIntervalMilli, f: 1},
}, []chunks.Sample{
histoSample{t: 5 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 6 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 7 * scrapeIntervalMilli, h: histogramSample},
},
),
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
m := NewChunkSeriesMerger()
merged := m(tc.input...)
testutil.Equals(t, labels.FromStrings("bar", "baz"), merged.Labels())
actChks, actErr := storage.ExpandChunks(merged.Iterator(nil))
testutil.Ok(t, actErr)

expChks, expErr := storage.ExpandChunks(tc.expected.Iterator(nil))
testutil.Ok(t, expErr)
testutil.Equals(t, expChks, actChks)
})
}
}

func createSamplesWithStep(start, numOfSamples, step int) []chunks.Sample {
res := make([]chunks.Sample, numOfSamples)
cur := start
Expand Down
Loading