Skip to content

Commit

Permalink
Reduce allocations in TSI TagSets implementation
Browse files Browse the repository at this point in the history
Since all tag sets are materialised to strings before this method
returns, a large number of allocations can be avoided by carefully
resuing buffers and containers.

This commit reduces allocations by about 75%, which can be very
significant for high cardinality workloads.

The benchmark results shown below are for a benchmark that asks for all
series keys matching `tag5=value0'.

name                                               old time/op    new time/op    delta
Index_ConcurrentWriteQuery/inmem/queries_100000-8     5.66s ± 4%     5.70s ± 5%     ~     (p=0.739 n=10+10)
Index_ConcurrentWriteQuery/tsi1/queries_100000-8      26.5s ± 8%     26.8s ±12%     ~     (p=0.579 n=10+10)
IndexSet_TagSets/1M_series/inmem-8                   11.9ms ±18%    10.4ms ± 2%  -12.81%  (p=0.000 n=10+10)
IndexSet_TagSets/1M_series/tsi1-8                    23.4ms ± 5%    18.9ms ± 1%  -19.07%  (p=0.000 n=10+9)

name                                               old alloc/op   new alloc/op   delta
Index_ConcurrentWriteQuery/inmem/queries_100000-8    2.50GB ± 0%    2.50GB ± 0%     ~     (p=0.315 n=10+10)
Index_ConcurrentWriteQuery/tsi1/queries_100000-8     32.6GB ± 0%    32.6GB ± 0%     ~     (p=0.247 n=10+10)
IndexSet_TagSets/1M_series/inmem-8                   3.56MB ± 0%    3.56MB ± 0%     ~     (all equal)
IndexSet_TagSets/1M_series/tsi1-8                    12.7MB ± 0%     5.2MB ± 0%  -59.02%  (p=0.000 n=10+10)

name                                               old allocs/op  new allocs/op  delta
Index_ConcurrentWriteQuery/inmem/queries_100000-8     24.0M ± 0%     24.0M ± 0%     ~     (p=0.353 n=10+10)
Index_ConcurrentWriteQuery/tsi1/queries_100000-8      96.6M ± 0%     96.7M ± 0%     ~     (p=0.579 n=10+10)
IndexSet_TagSets/1M_series/inmem-8                     51.0 ± 0%      51.0 ± 0%     ~     (all equal)
IndexSet_TagSets/1M_series/tsi1-8                     80.4k ± 0%     20.4k ± 0%  -74.65%  (p=0.000 n=10+10)
  • Loading branch information
e-dard committed Aug 10, 2018
1 parent 45a596f commit 15100c0
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 12 deletions.
16 changes: 12 additions & 4 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2548,6 +2548,11 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
maxSeriesN = int(^uint(0) >> 1)
}

// The tag sets require a string for each series key in the set, The series
// file formatted keys need to be parsed into models format. Since they will
// end up as strings we can re-use an intermediate buffer for this process.
var keyBuf []byte
var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration.
for {
se, err := itr.Next()
if err != nil {
Expand Down Expand Up @@ -2575,14 +2580,15 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
}

_, tags := ParseSeriesKey(key)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tags) {
// NOTE - must not escape this loop iteration.
_, tagsBuf = ParseSeriesKeyInto(key, tagsBuf)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) {
continue
}

var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = MakeTagsKey(dims, tags)
tagsAsKey = MakeTagsKey(dims, tagsBuf)
}

tagSet, ok := tagSets[string(tagsAsKey)]
Expand All @@ -2595,7 +2601,9 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
}

// Associate the series and filter with the Tagset.
tagSet.AddFilter(string(models.MakeKey(name, tags)), se.Expr)
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
tagSet.AddFilter(string(keyBuf), se.Expr)
keyBuf = keyBuf[:0]

// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
Expand Down
8 changes: 4 additions & 4 deletions tsdb/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ func (i *Index) Close() error {
//
// Typical results on an i7 laptop.
//
// BenchmarkIndexSet_TagSets/1M_series/inmem-8 100 12377082 ns/op 3556728 B/op 51 allocs/op
// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 50 24705967 ns/op 12740609 B/op 80375 allocs/op
// BenchmarkIndexSet_TagSets/1M_series/inmem-8 100 10430732 ns/op 3556728 B/op 51 allocs/op
// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op
func BenchmarkIndexSet_TagSets(b *testing.B) {
// Read line-protocol and coerce into tsdb format.
keys := make([][]byte, 0, 1e6)
Expand Down Expand Up @@ -539,8 +539,8 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
//
// Typical results for an i7 laptop
//
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000-8 1 6334019648 ns/op 2499546744 B/op 23963221 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000-8 1 31291015688 ns/op 32656096728 B/op 96879549 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000-8 1 5866592461 ns/op 2499768464 B/op 23964591 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000-8 1 30059490078 ns/op 32582973824 B/op 96705317 allocs/op
func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
// Read line-protocol and coerce into tsdb format.
keys := make([][]byte, 0, 1e6)
Expand Down
29 changes: 25 additions & 4 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,18 +355,39 @@ func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) {

// ParseSeriesKey extracts the name & tags from a series key.
func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) {
return parseSeriesKey(data, nil)
}

// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into
// dstTags, which is then returened.
//
// The returned dstTags may have a different length and capacity.
func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {
return parseSeriesKey(data, dstTags)
}

// parseSeriesKey extracts the name and tags from data, attempting to re-use the
// provided tags value rather than allocating. The returned tags may have a
// different length and capacity to those provided.
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
name, data = ReadSeriesKeyMeasurement(data)

tagN, data := ReadSeriesKeyTagN(data)
tags = make(models.Tags, tagN)

dst = dst[:cap(dst)] // Grow dst to use full capacity
if got, want := len(dst), tagN; got < want {
dst = append(dst, make(models.Tags, want-got)...)
}
dst = dst[:tagN]

for i := 0; i < tagN; i++ {
var key, value []byte
key, value, data = ReadSeriesKeyTag(data)
tags[i] = models.Tag{Key: key, Value: value}
dst[i].Key, dst[i].Value = key, value
}

return name, tags
return name, dst
}

func CompareSeriesKeys(a, b []byte) int {
Expand Down
38 changes: 38 additions & 0 deletions tsdb/series_file_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsdb_test

import (
"bytes"
"fmt"
"io/ioutil"
"os"
Expand All @@ -11,6 +12,43 @@ import (
"github.com/influxdata/influxdb/tsdb"
)

func TestParseSeriesKeyInto(t *testing.T) {
name := []byte("cpu")
tags := models.NewTags(map[string]string{"region": "east", "server": "a"})
key := tsdb.AppendSeriesKey(nil, name, tags)

dst := make(models.Tags, 0)
gotName, gotTags := tsdb.ParseSeriesKeyInto(key, dst)

if !bytes.Equal(gotName, name) {
t.Fatalf("got %q, expected %q", gotName, name)
}

if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}

dst = make(models.Tags, 0, 5)
_, gotTags = tsdb.ParseSeriesKeyInto(key, dst)
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := cap(gotTags), 5; got != exp {
t.Fatalf("got tags capacity %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}

dst = make(models.Tags, 1)
_, gotTags = tsdb.ParseSeriesKeyInto(key, dst)
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
}

// Ensure series file contains the correct set of series.
func TestSeriesFile_Series(t *testing.T) {
sfile := MustOpenSeriesFile()
Expand Down
Binary file modified tsdb/testdata/line-protocol-1M.txt.gz
Binary file not shown.

0 comments on commit 15100c0

Please sign in to comment.