Skip to content

Commit

Permalink
fix(tsi1): index defect with negated equality filters
Browse files Browse the repository at this point in the history
Fixes influxdata#15859

This commit fixes a defect in the TSI index where a filter using the
negated equality operator would result in no matching series being
returned for series stored within the `IndexFile` portions of the index.

The root cause of this was due to missing legacy-handling code in the
index for this particular iterator.
  • Loading branch information
e-dard committed Nov 13, 2019
1 parent 16c041d commit cbae66e
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 24 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
v1.7.10 [unreleased]
--------------------
-------------------

### Bugfixes

- [#15881](https://github.com/influxdata/influxdb/pull/15881): fix(tsm1): make digest safe for concurrent use.
- [#15863](https://github.com/influxdata/influxdb/pull/15863): fix(tsi1): index defect with negated equality filters

v1.7.9 [2019-10-27]
-------------------
Expand Down
110 changes: 110 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
module github.com/influxdata/influxdb

go 1.12

require (
cloud.google.com/go v0.47.0
collectd.org v0.3.0
github.com/BurntSushi/toml v0.0.0-20170626110600-a368813c5e64
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf
github.com/apache/arrow v0.0.0-20191024131854-af6fa24be0db
github.com/apex/log v1.1.0
github.com/aws/aws-sdk-go v1.25.16
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40
github.com/boltdb/bolt v1.3.1
github.com/c-bata/go-prompt v0.2.1
github.com/caarlos0/ctrlc v1.0.0
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e
github.com/cespare/xxhash v1.0.0
github.com/davecgh/go-spew v1.1.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/fatih/color v1.5.0
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.1.1
github.com/golang/groupcache v0.0.0-20191002201903-404acd9df4cc
github.com/golang/protobuf v1.1.0
github.com/golang/snappy v0.0.0-20160529050041-d9eb7a3d35ec
github.com/google/go-cmp v0.2.0
github.com/google/go-github v0.0.0-20181009003523-dd29b543e14c
github.com/google/go-querystring v1.0.0
github.com/googleapis/gax-go v1.0.3
github.com/goreleaser/archive v1.1.3
github.com/goreleaser/goreleaser v0.79.2
github.com/goreleaser/nfpm v0.9.7
github.com/imdario/mergo v0.3.6
github.com/influxdata/changelog v0.0.0-20180330035926-d2664f8a12e3
github.com/influxdata/flux v0.50.2
github.com/influxdata/influxql v1.0.1
github.com/influxdata/line-protocol v0.0.0-20190220025226-a3afd890113f
github.com/influxdata/roaring v0.0.0-20180809181101-fc520f41fab6
github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/jstemmer/go-junit-report v0.9.1
github.com/jsternberg/markdownfmt v0.0.0-20180204232022-c2a5702991e3
github.com/jsternberg/zap-logfmt v1.0.0
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef
github.com/kisielk/gotool v1.0.0
github.com/klauspost/compress v1.4.0
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6
github.com/klauspost/pgzip v0.0.0-20170402124221-0bf5dcad4ada
github.com/lib/pq v1.0.0
github.com/masterminds/semver v1.4.2
github.com/mattn/go-colorable v0.0.9
github.com/mattn/go-isatty v0.0.4
github.com/mattn/go-runewidth v0.0.2
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104
github.com/mattn/go-zglob v0.0.1
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/go-homedir v1.0.0
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae
github.com/opentracing/opentracing-go v0.0.0-20180606204148-bd9c31933947
github.com/paulbellamy/ratecounter v0.2.0
github.com/peterh/liner v0.0.0-20180619022028-8c1271fcf47f
github.com/philhofer/fwd v1.0.0
github.com/pkg/errors v0.8.0
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5
github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1
github.com/prometheus/procfs v0.0.0-20180705121852-ae68e2d4c00f
github.com/retailnext/hllpp v0.0.0-20180308014038-101a6d2f8b52
github.com/satori/go.uuid v1.2.0
github.com/segmentio/kafka-go v0.2.2
github.com/shurcooL/go v0.0.0-20190704215121-7189cc372560
github.com/shurcooL/sanitized_anchor_name v1.0.0
github.com/spf13/cast v1.3.0
github.com/tinylib/msgp v1.0.2
github.com/willf/bitset v1.1.3
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6
go.opencensus.io v0.22.1
go.uber.org/atomic v1.3.2
go.uber.org/multierr v1.1.0
go.uber.org/zap v1.9.0
golang.org/x/crypto v0.0.0-20180718160520-a2144134853f
golang.org/x/exp v0.0.0-20191014171548-69215a2ee97e
golang.org/x/lint v0.0.0-20190930215403-16217165b5de
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20180715085529-ac767d655b30
golang.org/x/text v0.3.0
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
golang.org/x/tools v0.0.0-20181101071927-45ff765b4815
gonum.org/v1/gonum v0.6.0
google.golang.org/api v0.5.0
google.golang.org/appengine v1.2.0
google.golang.org/genproto v0.0.0-20180718234121-fedd2861243f
google.golang.org/grpc v1.13.0
gopkg.in/russross/blackfriday.v2 v2.0.1
gopkg.in/yaml.v2 v2.2.1
honnef.co/go/tools v0.0.0-20180110233758-d73ab98e7c39
)
12 changes: 7 additions & 5 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,17 @@ func (fs *FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, r
}

// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (fs *FileSet) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
func (fs *FileSet) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
for _, f := range fs.files {
itr := f.TagKeySeriesIDIterator(name, key)
if itr != nil {
itr, err := f.TagKeySeriesIDIterator(name, key)
if err != nil {
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
return tsdb.MergeSeriesIDIterators(a...)
return tsdb.MergeSeriesIDIterators(a...), nil
}

// HasTagKey returns true if the tag key exists.
Expand Down Expand Up @@ -458,7 +460,7 @@ type File interface {

// Series iteration.
MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error)
TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error)

// Sketches for cardinality estimation
Expand Down
6 changes: 4 additions & 2 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,10 @@ func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error
func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
for _, p := range i.partitions {
itr := p.TagKeySeriesIDIterator(name, key)
if itr != nil {
itr, err := p.TagKeySeriesIDIterator(name, key)
if err != nil {
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
Expand Down
21 changes: 15 additions & 6 deletions tsdb/index/tsi1/index_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,27 +291,36 @@ func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator {

// TagKeySeriesIDIterator returns a series iterator for a tag key and a flag
// indicating if a tombstone exists on the measurement or key.
func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
tblk := f.tblks[string(name)]
if tblk == nil {
return nil
return nil, nil
}

// Find key element.
ke := tblk.TagKeyElem(key)
if ke == nil {
return nil
return nil, nil
}

// Merge all value series iterators together.
vitr := ke.TagValueIterator()

var itrs []tsdb.SeriesIDIterator
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
sitr := &rawSeriesIDIterator{data: ve.(*TagBlockValueElem).series.data}
itrs = append(itrs, sitr)
tblk, ok := ve.(*TagBlockValueElem)
if !ok {
return nil, fmt.Errorf("got type %T for iterator, expected %T", ve, TagBlockValueElem{})
}

ss, err := tblk.SeriesIDSet()
if err != nil {
return nil, err
}
itrs = append(itrs, tsdb.NewSeriesIDSetIterator(ss))
}

return tsdb.MergeSeriesIDIterators(itrs...)
return tsdb.MergeSeriesIDIterators(itrs...), nil
}

// TagValueSeriesIDSet returns a series id set for a tag value.
Expand Down
50 changes: 50 additions & 0 deletions tsdb/index/tsi1/index_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package tsi1_test

import (
"bytes"
"reflect"
"testing"
"time"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
Expand Down Expand Up @@ -30,6 +32,54 @@ func TestCreateIndexFile(t *testing.T) {
}
}

func TestIndexFile_TagKeySeriesIDIterator(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

f, err := CreateIndexFile(sfile.SeriesFile, []Series{
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
})
if err != nil {
t.Fatal(err)
}
defer f.Close()

itr, err := f.TagKeySeriesIDIterator([]byte("cpu"), []byte("region"))
if err != nil {
t.Fatal(err)
}
defer itr.Close()

// NOTE(edd): the series keys end up being emitted in this order because the
// series were written to different partitons in the _series file_. As such,
// the key with region=west ends up with a lower series ID than the region=east
// series, even though it was written later. When the series id sets for each
// tag block in the index file are merged together and iterated, the roaring
// bitmap library sorts the series ids, resulting the the series keys being
// emitted in a different order to that which they were written.
exp := []string{"cpu,region=west", "cpu,region=east"}
var got []string
for {
e, err := itr.Next()
if err != nil {
t.Fatal(err)
}

if e.SeriesID == 0 {
break
}

name, tags := tsdb.ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
got = append(got, string(models.MustNewPoint(string(name), tags, models.Fields{"a": "a"}, time.Time{}).Key()))
}

if !reflect.DeepEqual(got, exp) {
t.Fatalf("got keys %v, expected %v", got, exp)
}
}

// Ensure index file generation can be successfully built.
func TestGenerateIndexFile(t *testing.T) {
sfile := MustOpenSeriesFile()
Expand Down
8 changes: 4 additions & 4 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,18 @@ func (f *LogFile) DeleteMeasurement(name []byte) error {
}

// TagKeySeriesIDIterator returns a series iterator for a tag key.
func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
f.mu.RLock()
defer f.mu.RUnlock()

mm, ok := f.mms[string(name)]
if !ok {
return nil
return nil, nil
}

tk, ok := mm.tagSet[string(key)]
if !ok {
return nil
return nil, nil
}

// Combine iterators across all tag keys.
Expand All @@ -348,7 +348,7 @@ func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
}
}

return tsdb.MergeSeriesIDIterators(itrs...)
return tsdb.MergeSeriesIDIterators(itrs...), nil
}

// TagKeyIterator returns a value iterator for a measurement.
Expand Down
16 changes: 10 additions & 6 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,18 +770,21 @@ func (p *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator {
}

// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (p *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
func (p *Partition) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
fs, err := p.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
return nil, err
}

itr := fs.TagKeySeriesIDIterator(name, key)
if itr == nil {
itr, err := fs.TagKeySeriesIDIterator(name, key)
if err != nil {
fs.Release()
return nil
return nil, err
} else if itr == nil {
fs.Release()
return nil, nil
}
return newFileSetSeriesIDIterator(fs, itr)
return newFileSetSeriesIDIterator(fs, itr), nil
}

// TagValueSeriesIDIterator returns a series iterator for a single key value.
Expand All @@ -793,6 +796,7 @@ func (p *Partition) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Seri

itr, err := fs.TagValueSeriesIDIterator(name, key, value)
if err != nil {
fs.Release()
return nil, err
} else if itr == nil {
fs.Release()
Expand Down

0 comments on commit cbae66e

Please sign in to comment.