Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add roaring bitmaps to TSI index files. #10122

Merged
merged 8 commits into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/RoaringBitmap/roaring d6540aab65a17321470b1661bfc52da1823871e9
github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
github.com/bmizerany/pat 6226ea591a40176dd3ff9cd8eff81ed6ca721a00
github.com/boltdb/bolt 2f1ce7a837dcb8da3ec595b1dac9d0632f0f99e8
Expand All @@ -14,6 +13,7 @@ github.com/golang/protobuf b4deda0973fb4c70b50d226b1af49f3da59f5265
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/google/go-cmp 3af367b6b30c263d47e8895973edcca9a49cf029
github.com/influxdata/influxql 5e999e6a81820d4450f2a1f35c5597b569258f01
github.com/influxdata/roaring d6540aab65a17321470b1661bfc52da1823871e9
github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jsternberg/zap-logfmt ac4bd917e18a4548ce6e0e765b29a4e7f397b0b6
github.com/jwilder/encoding b4e1701a28efcc637d9afcca7d38e495fe909a09
Expand Down
13 changes: 6 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
branch = "master"

[[constraint]]
name = "github.com/RoaringBitmap/roaring"
version = "0.4.3"
name = "github.com/influxdata/roaring"
revision = "ec86e26aba5545a1819e1ad68e9faa0f1745fff5"

[[constraint]]
name = "github.com/boltdb/bolt"
Expand Down
2 changes: 1 addition & 1 deletion LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE)
- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
- github.com/BurntSushi/toml [MIT LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING)
- github.com/RoaringBitmap/roaring [APACHE LICENSE](https://github.com/RoaringBitmap/roaring/blob/master/LICENSE)
- github.com/influxdata/roaring [APACHE LICENSE](https://github.com/influxdata/roaring/blob/master/LICENSE)
- github.com/beorn7/perks [MIT LICENSE](https://github.com/beorn7/perks/blob/master/LICENSE)
- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license)
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
Expand Down
5 changes: 4 additions & 1 deletion cmd/influx_inspect/dumptsi/dumptsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet

// Iterate over each series.
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
itr := fs.TagValueSeriesIDIterator(name, key, value)
itr, err := fs.TagValueSeriesIDIterator(name, key, value)
if err != nil {
return err
}
for {
e, err := itr.Next()
if err != nil {
Expand Down
48 changes: 25 additions & 23 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,6 @@ func (fs *FileSet) LastContiguousIndexFilesByLevel(level int) []*IndexFile {
return a
}

/*
// SeriesIDIterator returns an iterator over all series in the index.
func (fs *FileSet) SeriesIDIterator() tsdb.SeriesIDIterator {
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
for _, f := range fs.files {
itr := f.SeriesIDIterator()
if itr == nil {
continue
}
a = append(a, itr)
}
return FilterUndeletedSeriesIterator(MergeSeriesIterators(a...))
}
*/

// Measurement returns a measurement by name.
func (fs *FileSet) Measurement(name []byte) MeasurementElem {
for _, f := range fs.files {
Expand Down Expand Up @@ -395,15 +380,32 @@ func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator {
}

// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
for _, f := range fs.files {
itr := f.TagValueSeriesIDIterator(name, key, value)
if itr != nil {
a = append(a, itr)
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
ss := tsdb.NewSeriesIDSet()

var ftss *tsdb.SeriesIDSet
for i := len(fs.files) - 1; i >= 0; i-- {
f := fs.files[i]

// Remove tombstones set in previous file.
if ftss != nil && ftss.Cardinality() > 0 {
ss = ss.AndNot(ftss)
}

// Fetch tag value series set for this file and merge into overall set.
fss, err := f.TagValueSeriesIDSet(name, key, value)
if err != nil {
return nil, err
} else if fss != nil {
ss.Merge(fss)
}

// Fetch tombstone set to be processed on next file.
if ftss, err = f.TombstoneSeriesIDSet(); err != nil {
return nil, err
}
}
return tsdb.MergeSeriesIDIterators(a...)
return tsdb.NewSeriesIDSetIterator(ss), nil
}

// MeasurementsSketches returns the merged measurement sketches for the FileSet.
Expand Down Expand Up @@ -453,7 +455,7 @@ type File interface {
// Series iteration.
MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator
TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error)

// Sketches for cardinality estimation
MergeMeasurementsSketches(s, t estimator.Sketch) error
Expand Down
6 changes: 4 additions & 2 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,10 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
for _, p := range i.partitions {
itr := p.TagValueSeriesIDIterator(name, key, value)
if itr != nil {
itr, err := p.TagValueSeriesIDIterator(name, key, value)
if err != nil {
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
Expand Down
19 changes: 9 additions & 10 deletions tsdb/index/tsi1/index_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,22 +329,21 @@ func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat
return tsdb.MergeSeriesIDIterators(itrs...)
}

// TagValueSeriesIDIterator returns a series iterator for a tag value and a flag
// indicating if a tombstone exists on the measurement, key, or value.
func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
// TagValueSeriesIDSet returns a series id set for a tag value.
func (f *IndexFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
tblk := f.tblks[string(name)]
if tblk == nil {
return nil
return nil, nil
}

// Find value element.
n, data := tblk.TagValueSeriesData(key, value)
if n == 0 {
return nil
var valueElem TagBlockValueElem
if !tblk.DecodeTagValueElem(key, value, &valueElem) {
return nil, nil
} else if valueElem.SeriesN() == 0 {
return nil, nil
}

// Create an iterator over value's series.
return &rawSeriesIDIterator{n: n, data: data}
return valueElem.SeriesIDSet()
}

// TagKey returns a tag key.
Expand Down
25 changes: 25 additions & 0 deletions tsdb/index/tsi1/index_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@ func TestGenerateIndexFile(t *testing.T) {
}
}

// Ensure index file generated with uvarint encoding can be loaded.
func TestGenerateIndexFile_Uvarint(t *testing.T) {
// Load previously generated series file.
sfile := tsdb.NewSeriesFile("testdata/uvarint/_series")
if err := sfile.Open(); err != nil {
t.Fatal(err)
}
defer sfile.Close()

// Load legacy index file from buffer.
f := tsi1.NewIndexFile(sfile)
f.SetPath("testdata/uvarint/index")
if err := f.Open(); err != nil {
t.Fatal(err)
}
defer f.Close()

// Verify that tag/value series can be fetched.
if e := f.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); e == nil {
t.Fatal("expected element")
} else if n := e.(*tsi1.TagBlockValueElem).SeriesN(); n == 0 {
t.Fatal("expected series")
}
}

// Ensure a MeasurementHashSeries returns false when all series are tombstoned.
func TestIndexFile_MeasurementHasSeries_Tombstoned(t *testing.T) {
sfile := MustOpenSeriesFile()
Expand Down
59 changes: 26 additions & 33 deletions tsdb/index/tsi1/index_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ func (p IndexFiles) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat
return tsdb.MergeSeriesIDIterators(a...)
}

// TagValueSeriesIDIterator returns an iterator that merges series across all files.
func (p IndexFiles) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
a := make([]tsdb.SeriesIDIterator, 0, len(p))

// TagValueSeriesIDSet returns an iterator that merges series across all files.
func (p IndexFiles) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
ss := tsdb.NewSeriesIDSet()
for i := range p {
itr := p[i].TagValueSeriesIDIterator(name, key, value)
if itr != nil {
a = append(a, itr)
if fss, err := p[i].TagValueSeriesIDSet(name, key, value); err != nil {
return nil, err
} else if fss != nil {
ss.Merge(fss)
}
}
return tsdb.MergeSeriesIDIterators(a...)
return ss, nil
}

// CompactTo merges all index files and writes them to w.
Expand Down Expand Up @@ -185,6 +185,13 @@ func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64,
return n, err
}

// Ensure block is word aligned.
// if offset := n % 8; offset != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the commented code if we don't need it?

// if err := writeTo(bw, make([]byte, 8-offset), &n); err != nil {
// return n, err
// }
// }

// Write measurement block.
t.MeasurementBlock.Offset = n
if err := p.writeMeasurementBlockTo(bw, &info, &n); err != nil {
Expand Down Expand Up @@ -289,12 +296,18 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
default:
}

// Ensure block is word aligned.
// if offset := (*n) % 8; offset != 0 {
// if err := writeTo(w, make([]byte, 8-offset), n); err != nil {
// return err
// }
// }

kitr, err := p.TagKeyIterator(name)
if err != nil {
return err
}

var seriesN int
enc := NewTagBlockEncoder(w)
for ke := kitr.Next(); ke != nil; ke = kitr.Next() {
// Encode key.
Expand All @@ -309,31 +322,11 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn

// Merge all series together.
if err := func() error {
sitr := p.TagValueSeriesIDIterator(name, ke.Key(), ve.Value())
if sitr != nil {
defer sitr.Close()
for {
se, err := sitr.Next()
if err != nil {
return err
} else if se.SeriesID == 0 {
break
}
seriesIDs = append(seriesIDs, se.SeriesID)

// Check for cancellation periodically.
if seriesN++; seriesN%1000 == 0 {
select {
case <-info.cancel:
return ErrCompactionInterrupted
default:
}
}
}
ss, err := p.TagValueSeriesIDSet(name, ke.Key(), ve.Value())
if err != nil {
return err
}

// Encode value.
return enc.EncodeValue(ve.Value(), ve.Deleted(), seriesIDs)
return enc.EncodeValue(ve.Value(), ve.Deleted(), ss)
}(); err != nil {
return nil
}
Expand Down
30 changes: 22 additions & 8 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,29 +450,29 @@ func (f *LogFile) DeleteTagKey(name, key []byte) error {
return f.FlushAndSync()
}

// TagValueSeriesIDIterator returns a series iterator for a tag value.
func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
// TagValueSeriesIDSet returns a series iterator for a tag value.
func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
f.mu.RLock()
defer f.mu.RUnlock()

mm, ok := f.mms[string(name)]
if !ok {
return nil
return nil, nil
}

tk, ok := mm.tagSet[string(key)]
if !ok {
return nil
return nil, nil
}

tv, ok := tk.tagValues[string(value)]
if !ok {
return nil
return nil, nil
} else if tv.cardinality() == 0 {
return nil
return nil, nil
}

return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())
return tv.seriesIDSet(), nil
}

// MeasurementN returns the total number of measurements.
Expand Down Expand Up @@ -846,6 +846,13 @@ func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n
return n, err
}

// Ensure block is word aligned.
// if offset := n % 8; offset != 0 {
// if err := writeTo(bw, make([]byte, 8-offset), &n); err != nil {
// return n, err
// }
// }

// Write measurement block.
t.MeasurementBlock.Offset = n
if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil {
Expand Down Expand Up @@ -924,6 +931,13 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn
default:
}

// Ensure block is word aligned.
// if offset := (*n) % 8; offset != 0 {
// if err := writeTo(w, make([]byte, 8-offset), n); err != nil {
// return err
// }
// }

enc := NewTagBlockEncoder(w)
var valueN int
for _, k := range mm.keys() {
Expand All @@ -946,7 +960,7 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn
// Add each value.
for _, v := range values {
value := tag.tagValues[v]
if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDs()); err != nil {
if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil {
return err
}

Expand Down
Loading