Skip to content

Commit

Permalink
*: properly treat native histogram deduplication in chunk series merger
Browse files Browse the repository at this point in the history
We have detected a problem in the chunk seriers merger where it will
panic in case it encounters native histogram chunks.
I am using thanos as a library for a project and wanted to use the
penalty function to dedup blocks from Prometheus instances.

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed Feb 27, 2024
1 parent 75152c4 commit 0bf2ad8
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 13 deletions.
53 changes: 42 additions & 11 deletions pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (d *dedupChunksIterator) At() chunks.Meta {
}

// Next method is almost the same as https://github.com/prometheus/prometheus/blob/v2.27.1/storage/merge.go#L615.
// The difference is that it handles both XOR and Aggr chunk Encoding.
// The difference is that it handles both XOR/Histogram/FloatHistogram and Aggr chunk Encoding.
func (d *dedupChunksIterator) Next() bool {
if d.h == nil {
for _, iter := range d.iterators {
Expand Down Expand Up @@ -151,17 +151,21 @@ func (h *chunkIteratorHeap) Pop() interface{} {
}

type overlappingMerger struct {
xorIterators []chunkenc.Iterator
aggrIterators [5][]chunkenc.Iterator
xorIterators []chunkenc.Iterator
histIterators []chunkenc.Iterator
floatHistIterators []chunkenc.Iterator
aggrIterators [5][]chunkenc.Iterator

samplesMergeFunc func(a, b chunkenc.Iterator) chunkenc.Iterator
}

func newOverlappingMerger() *overlappingMerger {
return &overlappingMerger{
samplesMergeFunc: func(a, b chunkenc.Iterator) chunkenc.Iterator {
it := noopAdjustableSeriesIterator{a}
return newDedupSeriesIterator(it, noopAdjustableSeriesIterator{b})
return newDedupSeriesIterator(
noopAdjustableSeriesIterator{a},
noopAdjustableSeriesIterator{b},
)
},
}
}
Expand All @@ -170,20 +174,26 @@ func (o *overlappingMerger) addChunk(chk chunks.Meta) {
switch chk.Chunk.Encoding() {
case chunkenc.EncXOR:
o.xorIterators = append(o.xorIterators, chk.Chunk.Iterator(nil))
case chunkenc.EncFloatHistogram:
o.floatHistIterators = append(o.floatHistIterators, chk.Chunk.Iterator(nil))
case chunkenc.EncHistogram:
o.histIterators = append(o.histIterators, chk.Chunk.Iterator(nil))
case downsample.ChunkEncAggr:
aggrChk := chk.Chunk.(*downsample.AggrChunk)
for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ {
if c, err := aggrChk.Get(i); err == nil {
o.aggrIterators[i] = append(o.aggrIterators[i], c.Iterator(nil))
}
}
case chunkenc.EncNone:
default:
// exhausted options for chunk
return
}
}

func (o *overlappingMerger) empty() bool {
// OverlappingMerger only contains either xor chunk or aggr chunk.
// If xor chunks are present then we don't need to check aggr chunks.
if len(o.xorIterators) > 0 {
if len(o.xorIterators) > 0 || len(o.histIterators) > 0 || len(o.floatHistIterators) > 0 {
return false
}
return len(o.aggrIterators[downsample.AggrCount]) == 0
Expand All @@ -204,6 +214,28 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator {
return it
}}).Iterator(nil)

case chunkenc.EncHistogram:
return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
it = baseChk.Chunk.Iterator(nil)
for _, i := range o.histIterators {
it = o.samplesMergeFunc(it, i)
}
return it
},
}).Iterator(nil)

case chunkenc.EncFloatHistogram:
return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
it = baseChk.Chunk.Iterator(nil)
for _, i := range o.floatHistIterators {
it = o.samplesMergeFunc(it, i)
}
return it
},
}).Iterator(nil)

case downsample.ChunkEncAggr:
// If Aggr encoding, each aggregated chunks need to be expanded and deduplicated,
// then re-encoded into Aggr chunks.
Expand All @@ -225,10 +257,9 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator {
}

return newAggrChunkIterator(samplesIter)
default:
return nil
}

// Impossible for now.
return nil
}

type aggrChunkIterator struct {
Expand Down
137 changes: 135 additions & 2 deletions pkg/dedup/chunk_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package dedup
import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/compact/downsample"
)

Expand Down Expand Up @@ -133,6 +133,7 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
merged := m(tc.input...)

Check failure on line 137 in pkg/dedup/chunk_iter_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

loopclosure: loop variable tc captured by func literal (govet)
testutil.Equals(t, tc.expected.Labels(), merged.Labels())

Check failure on line 138 in pkg/dedup/chunk_iter_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

loopclosure: loop variable tc captured by func literal (govet)
actChks, actErr := storage.ExpandChunks(merged.Iterator(nil))
Expand Down Expand Up @@ -321,6 +322,138 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) {
}
}

type histoSample struct {
t int64
f float64
h *histogram.Histogram
fh *histogram.FloatHistogram
}

func (h histoSample) T() int64 {
return h.t
}

func (h histoSample) F() float64 {
return h.f
}

func (h histoSample) H() *histogram.Histogram {
return h.h
}

func (h histoSample) FH() *histogram.FloatHistogram {
return h.fh
}

func (h histoSample) Type() chunkenc.ValueType {
if h.fh != nil {
return chunkenc.ValFloatHistogram
}
if h.h != nil {
return chunkenc.ValHistogram
}
return chunkenc.ValFloat
}

var histogramSample = &histogram.Histogram{
Schema: 0,
Count: 20,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
CounterResetHint: histogram.UnknownCounterReset,
}

func TestDedupChunkSeriesMerger_Histogram(t *testing.T) {
scrapeIntervalMilli := int64(30_000)

testCases := []struct {
name string
input []storage.ChunkSeries
expected storage.ChunkSeries
}{
{
name: "two overlapping - Histogram and Histogram",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 0 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 2 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 3 * scrapeIntervalMilli, h: histogramSample},
}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 2 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 3 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 4 * scrapeIntervalMilli, h: histogramSample},
}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 0 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 1 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 2 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 3 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 4 * scrapeIntervalMilli, h: histogramSample},
}),
},
{
name: "overlapping mixed - XOR then Histogram - panic repro case",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, f: 1},
histoSample{t: 2 * scrapeIntervalMilli, f: 1},
}, []chunks.Sample{
histoSample{t: 5 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 6 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 7 * scrapeIntervalMilli, h: histogramSample},
}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, f: 1},
histoSample{t: 2 * scrapeIntervalMilli, f: 1},
}, []chunks.Sample{
histoSample{t: 5 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 6 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 7 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 8 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 9 * scrapeIntervalMilli, h: histogramSample},
},
),
},
expected: storage.NewListChunkSeriesFromSamples(
labels.FromStrings("bar", "baz"),
[]chunks.Sample{
histoSample{t: 1 * scrapeIntervalMilli, f: 1},
histoSample{t: 2 * scrapeIntervalMilli, f: 1},
}, []chunks.Sample{
histoSample{t: 5 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 6 * scrapeIntervalMilli, h: histogramSample},
histoSample{t: 7 * scrapeIntervalMilli, h: histogramSample},
},
),
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
m := NewChunkSeriesMerger()
merged := m(tc.input...)
testutil.Equals(t, labels.FromStrings("bar", "baz"), merged.Labels())
actChks, actErr := storage.ExpandChunks(merged.Iterator(nil))
testutil.Ok(t, actErr)

expChks, expErr := storage.ExpandChunks(tc.expected.Iterator(nil))
testutil.Ok(t, expErr)
testutil.Equals(t, expChks, actChks)
})
}
}

func createSamplesWithStep(start, numOfSamples, step int) []chunks.Sample {
res := make([]chunks.Sample, numOfSamples)
cur := start
Expand Down

0 comments on commit 0bf2ad8

Please sign in to comment.