Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Improve Merge performance #531

Merged
merged 1 commit into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions head_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,61 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(b, err)
defer h.Close()
defer func() {
testutil.Ok(b, h.Close())
}()

// TODO: vary number of series
for i := 0; i < 1000000; i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
var hash uint64
for n := 0; n < 10; n++ {
for i := 0; i < 100000; i++ {
h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "foo"))
hash++
// Have some series that won't be matched, to properly test inverted matches.
h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "bar"))
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
hash++
}
}

b.ResetTimer()
n1 := labels.NewEqualMatcher("n", "1")

all, _ := labels.NewRegexpMatcher("a", ".*")
jFoo := labels.NewEqualMatcher("j", "foo")
jNotFoo := labels.Not(jFoo)

for i := 0; i < b.N; i++ {
_, err := PostingsForMatchers(h.indexRange(0, 1000), all)
testutil.Ok(b, err)
iStar := labels.NewMustRegexpMatcher("i", "^.*$")
iPlus := labels.NewMustRegexpMatcher("i", "^.+$")
i1Plus := labels.NewMustRegexpMatcher("i", "^1.+$")
iEmptyRe := labels.NewMustRegexpMatcher("i", "^$")
iNotEmpty := labels.Not(labels.NewEqualMatcher("i", ""))
iNot2 := labels.Not(labels.NewEqualMatcher("n", "2"))
iNot2Star := labels.Not(labels.NewMustRegexpMatcher("i", "^2.*$"))

cases := []struct {
name string
matchers []labels.Matcher
}{
{`n="1"`, []labels.Matcher{n1}},
{`n="1",j="foo"`, []labels.Matcher{n1, jFoo}},
{`j="foo",n="1"`, []labels.Matcher{jFoo, n1}},
{`n="1",j!="foo"`, []labels.Matcher{n1, jNotFoo}},
{`i=~".*"`, []labels.Matcher{iStar}},
{`i=~".+"`, []labels.Matcher{iPlus}},
{`i=~""`, []labels.Matcher{iEmptyRe}},
{`i!=""`, []labels.Matcher{iNotEmpty}},
{`n="1",i=~".*",j="foo"`, []labels.Matcher{n1, iStar, jFoo}},
{`n="1",i=~".*",i!="2",j="foo"`, []labels.Matcher{n1, iStar, iNot2, jFoo}},
{`n="1",i!="",j="foo"`, []labels.Matcher{n1, iNotEmpty, jFoo}},
{`n="1",i=~".+",j="foo"`, []labels.Matcher{n1, iPlus, jFoo}},
{`n="1",i=~"1.+",j="foo"`, []labels.Matcher{n1, i1Plus, jFoo}},
{`n="1",i=~".+",i!="2",j="foo"`, []labels.Matcher{n1, iPlus, iNot2, jFoo}},
{`n="1",i=~".+",i!~"2.*",j="foo"`, []labels.Matcher{n1, iPlus, iNot2Star, jFoo}},
}

for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := PostingsForMatchers(h.indexRange(0, 1000), c.matchers...)
testutil.Ok(b, err)
}
})
}
}
141 changes: 126 additions & 15 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package index

import (
"container/heap"
"encoding/binary"
"runtime"
"sort"
Expand Down Expand Up @@ -365,25 +366,132 @@ func Merge(its ...Postings) Postings {
if len(its) == 1 {
return its[0]
}
// All the uses of this function immediately expand it, so
// collect everything in a map. This is more efficient
// when there's 100ks of postings, compared to
// having a tree of merge objects.
pm := make(map[uint64]struct{}, len(its))
for _, it := range its {
for it.Next() {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
pm[it.At()] = struct{}{}
return newMergedPostings(its)
}

type postingsHeap []Postings

func (h postingsHeap) Len() int { return len(h) }
func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }

func (h *postingsHeap) Push(x interface{}) {
*h = append(*h, x.(Postings))
}

func (h *postingsHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

type mergedPostings struct {
h postingsHeap
initilized bool
heaped bool
cur uint64
err error
}

func newMergedPostings(p []Postings) *mergedPostings {
ph := make(postingsHeap, 0, len(p))
for _, it := range p {
if it.Next() {
ph = append(ph, it)
} else {
if it.Err() != nil {
return &mergedPostings{err: it.Err()}
}
}
}
return &mergedPostings{h: ph}
}

func (it *mergedPostings) Next() bool {
if it.h.Len() == 0 || it.err != nil {
return false
}

if !it.heaped {
heap.Init(&it.h)
it.heaped = true
}
// The user must issue an initial Next.
if !it.initilized {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
it.cur = it.h[0].At()
it.initilized = true
return true
}

for {
cur := it.h[0]
if !cur.Next() {
heap.Pop(&it.h)
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
// Value of top of heap has changed, re-heapify.
heap.Fix(&it.h, 0)
}
if it.Err() != nil {
return ErrPostings(it.Err())

if it.h[0].At() != it.cur {
it.cur = it.h[0].At()
return true
}
}
}

func (it *mergedPostings) Seek(id uint64) bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
if !it.initilized {
if !it.Next() {
return false
}
}
pl := make([]uint64, 0, len(pm))
for p := range pm {
pl = append(pl, p)
if it.cur >= id {
return true
}
sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] })
return newListPostings(pl)
// Heapifying when there is lots of Seeks is inefficient,
// mark to be re-heapified on the Next() call.
it.heaped = false
newH := make(postingsHeap, 0, len(it.h))
lowest := ^uint64(0)
for _, i := range it.h {
if i.Seek(id) {
newH = append(newH, i)
if i.At() < lowest {
lowest = i.At()
}
} else {
if i.Err() != nil {
it.err = i.Err()
return false
}
}
}
it.h = newH
if len(it.h) == 0 {
return false
}
it.cur = lowest
return true
}

func (it mergedPostings) At() uint64 {
return it.cur
}

func (it mergedPostings) Err() error {
return it.err
}

// Without returns a new postings list that contains all elements from the full list that
Expand Down Expand Up @@ -498,6 +606,9 @@ func (it *listPostings) Seek(x uint64) bool {
if it.cur >= x {
return true
}
if len(it.list) == 0 {
return false
}

// Do binary search between current position and end.
i := sort.Search(len(it.list), func(i int) bool {
Expand Down
14 changes: 13 additions & 1 deletion querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,23 @@ func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Posti
rit = append(rit, it)
}

merged := index.Merge(rit...)
// With many many postings, it's best to pre-calculate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// the merged list via next rather than have a ton of seeks
// in Without/Intersection.
if len(rit) > 100 {
pl, err := index.ExpandPostings(merged)
if err != nil {
return nil, err
}
merged = index.NewListPostings(pl)
}

allPostings, err := ix.Postings(index.AllPostingsKey())
if err != nil {
return nil, err
}
return index.Without(allPostings, index.Merge(rit...)), nil
return index.Without(allPostings, merged), nil
}

func mergeStrings(a, b []string) []string {
Expand Down