Skip to content

Commit

Permalink
store/proxy: Deduplicate chunks on StoreAPI level. Recommend chunk so…
Browse files Browse the repository at this point in the history
…rting for StoreAPI. (#2603)

* Deduplicate chunk dups on proxy StoreAPI level. Recommend chunk sorting for StoreAPI.

Also: Merge same series together on proxy level instead select. This allows better dedup efficiency.

Partially fixes: #2303

Cases like overlapped data from store and sidecar and 1:1 duplicates are optimized as soon as it's possible.
This case was highly visible on GitLab repro data and exists in most of Thanos setup.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Optimized algorithm to combine series only on start.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Optimized chunk comparision for overlaps.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored May 18, 2020
1 parent 34859f1 commit 53e69bd
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [2513](https://github.com/thanos-io/thanos/pull/2513) Tools: Moved `thanos bucket` commands to `thanos tools bucket`, also
moved `thanos check rules` to `thanos tools rules-check`. `thanos tools rules-check` also takes rules by `--rules` repeated flag not argument
anymore.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).

## [v0.12.2](https://github.com/thanos-io/thanos/releases/tag/v0.12.2) - 2020.04.30

Expand Down
5 changes: 2 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,9 +981,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()

// Merge series set into an union of all block sets. This exposes all blocks are single seriesSet.
// Chunks of returned series might be out of order w.r.t to their time range.
// This must be accounted for later by clients.
// NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by
// blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later.
set := storepb.MergeSeriesSets(res...)
for set.Next() {
var series storepb.Series
Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
}
return s.currSeries.Labels, s.currSeries.Chunks
}

func (s *streamSeriesSet) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
Expand Down
12 changes: 4 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,11 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "a"}},
chunks: [][]sample{{{5, 4}}},
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}, {{5, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{2, 2}, {3, 3}, {4, 4}}, {{1, 1}, {2, 2}, {3, 3}}}, // No sort merge.
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{2, 2}, {3, 3}, {4, 4}}},
},
{
lset: []storepb.Label{{Name: "a", Value: "c"}},
Expand Down Expand Up @@ -343,7 +339,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries: []rawSeries{
{
lset: []storepb.Label{{Name: "a", Value: "b"}},
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{1, 11}, {2, 22}, {3, 33}}},
chunks: [][]sample{{{1, 11}, {2, 22}, {3, 33}}, {{1, 1}, {2, 2}, {3, 3}}},
},
},
},
Expand Down Expand Up @@ -1220,7 +1216,7 @@ type rawSeries struct {
}

func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) {
testutil.Equals(t, len(expected), len(got), "got: %v", got)
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got)

for i, series := range got {
testutil.Equals(t, expected[i].lset, series.Labels)
Expand Down
201 changes: 176 additions & 25 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package storepb

import (
"bytes"
"strings"
"unsafe"

Expand Down Expand Up @@ -45,6 +46,7 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}

// CompareLabels compares two sets of labels.
// After lexicographical order, the set with fewer labels comes first.
func CompareLabels(a, b []Label) int {
l := len(a)
if len(b) < l {
Expand All @@ -58,7 +60,7 @@ func CompareLabels(a, b []Label) int {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.

return len(a) - len(b)
}

Expand All @@ -73,13 +75,25 @@ func EmptySeriesSet() SeriesSet {
return emptySeriesSet{}
}

// MergeSeriesSets returns a new series set that is the union of the input sets.
// MergeSeriesSets takes all series sets and returns as a union single series set.
// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees.
// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations,
// MergeSeriesSets will merge those into one.
//
// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input
// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked
// which on just-before PromQL level as well, so the only consequence is increased network bandwidth.
// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time.
//
// Chunks within the same series can also overlap (within all SeriesSet
// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed
// (except one), and any other overlaps will be appended into on chunks slice.
func MergeSeriesSets(all ...SeriesSet) SeriesSet {
switch len(all) {
case 0:
return emptySeriesSet{}
case 1:
return all[0]
return newUniqueSeriesSet(all[0])
}
h := len(all) / 2

Expand All @@ -106,11 +120,6 @@ type mergedSeriesSet struct {
adone, bdone bool
}

// newMergedSeriesSet takes two series sets as a single series set.
// Series that occur in both sets should have disjoint time ranges.
// If the ranges overlap b samples are appended to a samples.
// If the single SeriesSet returns same series within many iterations,
// merge series set will not try to merge those.
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
Expand Down Expand Up @@ -150,33 +159,175 @@ func (s *mergedSeriesSet) Next() bool {
}

d := s.compare()

// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.lset, s.chunks = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
return true
}
if d < 0 {
s.lset, s.chunks = s.a.At()
s.adone = !s.a.Next()
} else {
// Concatenate chunks from both series sets. They may be expected of order
// w.r.t to their time range. This must be accounted for later.
lset, chksA := s.a.At()
_, chksB := s.b.At()

s.lset = lset
// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))
s.chunks = append(s.chunks, chksA...)
s.chunks = append(s.chunks, chksB...)
return true
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
// Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both
// series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will
// be account on select layer anyway. We do it still for early optimization.
lset, chksA := s.a.At()
_, chksB := s.b.At()
s.lset = lset

// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))

b := 0
Outer:
for a := range chksA {
for {
if b >= len(chksB) {
// No more b chunks.
s.chunks = append(s.chunks, chksA[a:]...)
break Outer
}

cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}

// Exact duplicated chunks, discard one from b.
b++
}
}

if b < len(chksB) {
s.chunks = append(s.chunks, chksB[b:]...)
}

s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return true
}

// uniqueSeriesSet takes one series set and ensures each iteration contains single, full series.
type uniqueSeriesSet struct {
SeriesSet
done bool

peek *Series

lset []Label
chunks []AggrChunk
}

func newUniqueSeriesSet(wrapped SeriesSet) *uniqueSeriesSet {
return &uniqueSeriesSet{SeriesSet: wrapped}
}

func (s *uniqueSeriesSet) At() ([]Label, []AggrChunk) {
return s.lset, s.chunks
}

func (s *uniqueSeriesSet) Next() bool {
if s.Err() != nil {
return false
}

for !s.done {
if s.done = !s.SeriesSet.Next(); s.done {
break
}
lset, chks := s.SeriesSet.At()
if s.peek == nil {
s.peek = &Series{Labels: lset, Chunks: chks}
continue
}

if CompareLabels(lset, s.peek.Labels) != 0 {
s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = &Series{Labels: lset, Chunks: chks}
return true
}

// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)
}

if s.peek == nil {
return false
}

s.lset, s.chunks = s.peek.Labels, s.peek.Chunks
s.peek = nil
return true
}

// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}

// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}

// We could use proto.Equal, but we need ordering as well.
for _, cmp := range []func() int{
func() int { return m.Raw.Compare(b.Raw) },
func() int { return m.Count.Compare(b.Count) },
func() int { return m.Sum.Compare(b.Sum) },
func() int { return m.Min.Compare(b.Min) },
func() int { return m.Max.Compare(b.Max) },
func() int { return m.Counter.Compare(b.Counter) },
} {
if c := cmp(); c == 0 {
continue
} else {
return c
}
}
return 0
}

// Compare returns positive 1 if chunk is smaller -1 if larger.
// It returns 0 if chunks are exactly the same.
func (m *Chunk) Compare(b *Chunk) int {
if m == nil && b == nil {
return 0
}
if b == nil {
return 1
}
if m == nil {
return -1
}

if m.Type < b.Type {
return 1
}
if m.Type > b.Type {
return -1
}
return bytes.Compare(m.Data, b.Data)
}

// LabelsToPromLabels converts Thanos proto labels to Prometheus labels in type safe manner.
func LabelsToPromLabels(lset []Label) labels.Labels {
ret := make(labels.Labels, len(lset))
Expand Down
Loading

0 comments on commit 53e69bd

Please sign in to comment.