Skip to content

Commit

Permalink
Moved err adjust to be per replica, not per already deduped value.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed May 12, 2020
1 parent 8d5b0a2 commit caac262
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 86 deletions.
1 change: 1 addition & 0 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ func (it *ApplyCounterResetsSeriesIterator) At() (t int64, v float64) {
}

func (it *ApplyCounterResetsSeriesIterator) Seek(x int64) bool {
// Don't use underlying Seek, but iterate over next to not miss counter resets.
for {
if t, _ := it.At(); t >= x {
return true
Expand Down
218 changes: 141 additions & 77 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,65 +477,179 @@ func (s *dedupSeries) Labels() labels.Labels {
return s.lset
}

func (s *dedupSeries) Iterator() (it storage.SeriesIterator) {
it = s.replicas[0].Iterator()
func (s *dedupSeries) Iterator() storage.SeriesIterator {
var it extendedSeriesIterator = &seriesIteratorToExtendedAdapter{SeriesIterator: s.replicas[0].Iterator()}

for _, o := range s.replicas[1:] {
it = newDedupSeriesIterator(it, o.Iterator())
var replicaIter extendedSeriesIterator
if s.isCounter {
replicaIter = &counterErrAdjustSeriesIterator{SeriesIterator: o.Iterator()}
} else {
replicaIter = &seriesIteratorToExtendedAdapter{SeriesIterator: o.Iterator()}
}

it = newDedupSeriesIterator(it, replicaIter)
}
if s.isCounter {
return newCounterDedupAdjustSeriesIterator(it)
return &extendedToSeriesIteratorAdapter{extendedSeriesIterator: it}
}

// extendedSeriesIterator iterates over the data of a time series.
type extendedSeriesIterator interface {
// Seek advances the iterator forward to the value at or after
// the given timestamp. It allows passing lastValue assessed by consumer, so implementation handling counters can
// adjust for potential counter error.
Seek(t int64, lastValue float64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one. It allows passing lastValue assessed by consumer, so implementation handling counters can
// // adjust for potential counter error.
Next(lastValue float64) bool
// Err returns the current error.
Err() error
}

type seriesIteratorToExtendedAdapter struct {
storage.SeriesIterator
}

func (it *seriesIteratorToExtendedAdapter) Seek(t int64, _ float64) bool {
return it.SeriesIterator.Seek(t)
}

func (it *seriesIteratorToExtendedAdapter) Next(float64) bool {
return it.SeriesIterator.Next()
}

type extendedToSeriesIteratorAdapter struct {
extendedSeriesIterator
}

func (it *extendedToSeriesIteratorAdapter) Seek(t int64) bool {
return it.extendedSeriesIterator.Seek(t, float64(math.MinInt64))
}

func (it *extendedToSeriesIteratorAdapter) Next() bool {
return it.extendedSeriesIterator.Next(float64(math.MinInt64))
}

// counterErrAdjustSeriesIterator is extendedSeriesIterator used when we deduplicate counter.
// It makes sure we always adjust for the latest seen last counter value for all replicas.
// Let's consider following example:
//
// Replica 1 counter scrapes: 20 30 40 Nan - 0 5
// Replica 2 counter scrapes: 25 35 45 Nan - 2
//
// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this:
//
// Replica 1 counter total: 20 30 40 - - 40 45
// Replica 2 counter total: 25 35 45 - - 47
//
// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous,
// which will trigger false positive counter reset in PromQL.
//
// We mitigate this by taking always adjusting for the "behind" replica value to be not smaller than highest sample seen.
// This is also what is closest to the truth (last seen counter value on this target).
//
// This short-term solution to mitigate https://github.com/thanos-io/thanos/issues/2401.
// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given
// series is counter or not: https://github.com/thanos-io/thanos/issues/2547.
type counterErrAdjustSeriesIterator struct {
storage.SeriesIterator

errAdjust float64
}

func (it *counterErrAdjustSeriesIterator) Next(lastValue float64) bool {
if it.SeriesIterator.Next() {
// Get current value with the current error adjust applied.
_, v := it.At()
if lastValue > v {
it.errAdjust += lastValue - v
}
return true
}
return false
}

func (it *counterErrAdjustSeriesIterator) Seek(t int64, lastValue float64) bool {
if it.SeriesIterator.Seek(t) {
// Get current value with the current error adjust applied.
_, v := it.At()
if lastValue > v {
it.errAdjust += lastValue - v
}
return true
}
return it
return false
}

func (it *counterErrAdjustSeriesIterator) At() (int64, float64) {
t, v := it.SeriesIterator.At()
return t, v + it.errAdjust
}

type dedupSeriesIterator struct {
a, b storage.SeriesIterator
a, b extendedSeriesIterator

aok, bok bool

// TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more
// responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR.
lastT int64
lastV float64

aok, bok bool
lastT int64
penA, penB int64
useA bool
}

func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator {
func newDedupSeriesIterator(a, b extendedSeriesIterator) *dedupSeriesIterator {
lastV := float64(math.MinInt64)
return &dedupSeriesIterator{
a: a,
b: b,
lastT: math.MinInt64,
aok: a.Next(),
bok: b.Next(),
lastV: lastV,
aok: a.Next(lastV),
bok: b.Next(lastV),
}
}

func (it *dedupSeriesIterator) Next() bool {
func (it *dedupSeriesIterator) Next(lastValue float64) bool {
// This dedup iterator can be deduplicated with yet another replica. Make sure we adapt to the biggest last Value
// seen across all replicas. This is used only if underlying implementations are on counters.
if lastValue > it.lastV {
it.lastV = lastValue
}

// Advance both iterators to at least the next highest timestamp plus the potential penalty.
if it.aok {
it.aok = it.a.Seek(it.lastT + 1 + it.penA)
it.aok = it.a.Seek(it.lastT+1+it.penA, it.lastV)
}
if it.bok {
it.bok = it.b.Seek(it.lastT + 1 + it.penB)
it.bok = it.b.Seek(it.lastT+1+it.penB, it.lastV)
}

// Handle basic cases where one iterator is exhausted before the other.
if !it.aok {
it.useA = false
if it.bok {
it.lastT, _ = it.b.At()
it.lastT, it.lastV = it.b.At()
it.penB = 0
}
return it.bok
}
if !it.bok {
it.useA = true
it.lastT, _ = it.a.At()
it.lastT, it.lastV = it.a.At()
it.penA = 0
return true
}
// General case where both iterators still have data. We pick the one
// with the smaller timestamp.
// The applied penalty potentially already skipped potential samples already
// that would have resulted in exaggerated sampling frequency.
ta, _ := it.a.At()
tb, _ := it.b.At()
ta, va := it.a.At()
tb, vb := it.b.At()

it.useA = ta <= tb

Expand All @@ -546,35 +660,38 @@ func (it *dedupSeriesIterator) Next() bool {
// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenality = 5000
const initialPenalty = 5000

if it.useA {
if it.lastT != math.MinInt64 {
it.penB = 2 * (ta - it.lastT)
} else {
it.penB = initialPenality
it.penB = initialPenalty
}
it.penA = 0
it.lastT = ta
it.lastV = va
return true
}
if it.lastT != math.MinInt64 {
it.penA = 2 * (tb - it.lastT)
} else {
it.penA = initialPenality
it.penA = initialPenalty
}
it.penB = 0
it.lastT = tb
it.lastV = vb
return true
}

func (it *dedupSeriesIterator) Seek(t int64) bool {
func (it *dedupSeriesIterator) Seek(t int64, lastValue float64) bool {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
ts, _ := it.At()
if ts > 0 && ts >= t {
return true
}
if !it.Next() {
if !it.Next(lastValue) {
return false
}
}
Expand All @@ -593,56 +710,3 @@ func (it *dedupSeriesIterator) Err() error {
}
return it.b.Err()
}

// counterDedupAdjustSeriesIterator is used when we deduplicate counter.
// It makes sure we always adjust for the latest seen last counter value for all replicas.
// Let's consider following example:
//
// Replica 1 counter scrapes: 20 30 40 Nan - 0 5
// Replica 2 counter scrapes: 25 35 45 Nan - 2
//
// Now for downsampling purposes we are accounting the resets so our replicas before going to dedup iterator looks like this:
//
// Replica 1 counter total: 20 30 40 - - 40 45
// Replica 2 counter total: 25 35 45 - - 47
//
// Now if at any point we will switch our focus from replica 2 to replica 1 we will experience lower value than previous,
// which will trigger false positive counter reset in PromQL.
//
// We mitigate this by taking always adjusting for the "behind" replica value to be not smaller than highest sample seen.
// This is also what is closest to the truth (last seen counter value on this target).
//
// This short-term solution to mitigate https://github.com/thanos-io/thanos/issues/2401.
// TODO(bwplotka): Find better deduplication algorithm that does not require knowledge if the given
// series is counter or not: https://github.com/thanos-io/thanos/issues/2547.
type counterDedupAdjustSeriesIterator struct {
storage.SeriesIterator

lastV float64
adjust float64
}

func newCounterDedupAdjustSeriesIterator(iter storage.SeriesIterator) storage.SeriesIterator {
return &counterDedupAdjustSeriesIterator{
SeriesIterator: iter,
lastV: -1 * math.MaxFloat64,
}

}

func (it *counterDedupAdjustSeriesIterator) Next() bool {
if it.SeriesIterator.Next() {
_, v := it.SeriesIterator.At()
if it.lastV > v {
it.adjust += it.lastV - v
}
it.lastV = v
return true
}
return false
}

func (it *counterDedupAdjustSeriesIterator) At() (int64, float64) {
t, v := it.SeriesIterator.At()
return t, v + it.adjust
}
27 changes: 18 additions & 9 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,21 +1104,30 @@ func TestDedupSeriesSet(t *testing.T) {
{10000, 8.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.0.
{20000, 9.0}, // Same. CurrValue = 9.0.
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
{50001, 9 + 1.0}}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added.
{50001, 9 + 1.0}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added.
{60000, 9 + 2.0},
{70000, 9 + 3.0},
{80000, 9 + 4.0},
{90000, 9 + 5.0}, // This should be now taken, and we expect 14 to be correct value now.
{100000, 9 + 6.0},
},
}, {
lset: labels.Labels{{Name: "replica", Value: "02"}},
samples: []sample{
{10001, 8.0}, // Penalty 5000 will be added.
// 20001 was app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. Penalty 2 * (20000 - 10000) will be added.
// 30001 no sample. Within penalty, ignored.
{45001, 8.0 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value.
{45001, 8 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value.
{55001, 8 + 1.5},
{65001, 8 + 2.5},
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
},
},
},
exp: []series{
{
lset: labels.Labels{},
samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}},
samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {55001, 10}, {65001, 11}, {90000, 14}, {100000, 15}},
},
},
dedupLabels: map[string]struct{}{
Expand Down Expand Up @@ -1253,24 +1262,24 @@ func TestDedupSeriesIterator(t *testing.T) {
for i, c := range cases {
t.Logf("case %d:", i)
it := newDedupSeriesIterator(
newMockedSeriesIterator(c.a),
newMockedSeriesIterator(c.b),
&seriesIteratorToExtendedAdapter{newMockedSeriesIterator(c.a)},
&seriesIteratorToExtendedAdapter{newMockedSeriesIterator(c.b)},
)
res := expandSeries(t, it)
res := expandSeries(t, &extendedToSeriesIteratorAdapter{it})
testutil.Equals(t, c.exp, res)
}
}

func BenchmarkDedupSeriesIterator(b *testing.B) {
run := func(b *testing.B, s1, s2 []sample) {
it := newDedupSeriesIterator(
newMockedSeriesIterator(s1),
newMockedSeriesIterator(s2),
&seriesIteratorToExtendedAdapter{newMockedSeriesIterator(s1)},
&seriesIteratorToExtendedAdapter{newMockedSeriesIterator(s2)},
)
b.ResetTimer()
var total int64

for it.Next() {
for it.Next(float64(math.MinInt64)) {
t, _ := it.At()
total += t
}
Expand Down

0 comments on commit caac262

Please sign in to comment.