Skip to content

Commit

Permalink
Merge pull request #10424 from influxdata/BP1.5-er-nil-shard
Browse files Browse the repository at this point in the history
Fix panic in IndexSet
  • Loading branch information
e-dard authored Oct 29, 2018
2 parents 92f56d8 + 11bf6ec commit 9691113
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 9 deletions.
3 changes: 3 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
TSI1IndexName = "tsi1"
)

// ErrIndexClosing can be returned to from an Index method if the index is currently closing.
var ErrIndexClosing = errors.New("index is closing")

type Index interface {
Open() error
Close() error
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (i *Partition) FieldSet() *tsdb.MeasurementFieldSet {
func (i *Partition) RetainFileSet() (*FileSet, error) {
select {
case <-i.closing:
return nil, errors.New("index is closing")
return nil, tsdb.ErrIndexClosing
default:
i.mu.RLock()
defer i.mu.RUnlock()
Expand Down
6 changes: 4 additions & 2 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ func (s *Shard) Index() (Index, error) {
return s.index, nil
}

func (s *Shard) seriesFile() (*SeriesFile, error) {
// SeriesFile returns a reference the underlying series file. If return an error
// if the series file is nil.
func (s *Shard) SeriesFile() (*SeriesFile, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if err := s.ready(); err != nil {
Expand Down Expand Up @@ -1262,7 +1264,7 @@ func (a Shards) createSeriesIterator(ctx context.Context, opt query.IteratorOpti
idxs = append(idxs, idx)
}
if sfile == nil {
sfile, _ = sh.seriesFile()
sfile, _ = sh.SeriesFile()
}
}

Expand Down
30 changes: 26 additions & 4 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,10 +1332,20 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
}

if is.SeriesFile == nil {
is.SeriesFile = shard.sfile
sfile, err := shard.SeriesFile()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.SeriesFile = sfile
}

is.Indexes = append(is.Indexes, shard.index)
index, err := shard.Index()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.Indexes = append(is.Indexes, index)
}
s.mu.RUnlock()

Expand Down Expand Up @@ -1488,9 +1498,21 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
}

if is.SeriesFile == nil {
is.SeriesFile = shard.sfile
sfile, err := shard.SeriesFile()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.SeriesFile = sfile
}

index, err := shard.Index()
if err != nil {
s.mu.RUnlock()
return nil, err
}
is.Indexes = append(is.Indexes, shard.index)

is.Indexes = append(is.Indexes, index)
}
s.mu.RUnlock()
is = is.DedupeInmemIndexes()
Expand Down
285 changes: 283 additions & 2 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb_test
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"math"
Expand All @@ -16,15 +17,15 @@ import (
"testing"
"time"

"github.com/influxdata/influxdb/tsdb/index/inmem"

"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql"
)

Expand Down Expand Up @@ -1529,6 +1530,286 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
return out
}

func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
defer s.Close()

shardN := 10
for i := 0; i < shardN; i++ {
// Create new shards with some data
s.MustCreateShardWithData("db0", "rp0", i,
`cpu,host=serverA value=1 30`,
`mem,region=west value=2 40`, // skip: wrong source
`cpu,host=serverC value=3 60`,
)
}

done := make(chan struct{})
errC := make(chan error, 2)

// Randomly close and open the shards.
go func() {
for {
select {
case <-done:
errC <- nil
return
default:
i := uint64(rand.Intn(int(shardN)))
if sh := s.Shard(i); sh == nil {
errC <- errors.New("shard should not be nil")
return
} else {
if err := sh.Close(); err != nil {
errC <- err
return
}
time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil {
errC <- err
return
}
}
}
}
}()

// Attempt to get tag keys from the shards.
go func() {
for {
select {
case <-done:
errC <- nil
return
default:
names, err := s.MeasurementNames(nil, "db0", nil)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}

if err != nil {
errC <- err
return
}

if got, exp := names, slices.StringsToBytes("cpu", "mem"); !reflect.DeepEqual(got, exp) {
errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
return
}
}
}
}()

// Run for 500ms
time.Sleep(500 * time.Millisecond)
close(done)

// Check for errors.
if err := <-errC; err != nil {
t.Fatal(err)
}
if err := <-errC; err != nil {
t.Fatal(err)
}
}
}

func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
defer s.Close()

shardN := 10
for i := 0; i < shardN; i++ {
// Create new shards with some data
s.MustCreateShardWithData("db0", "rp0", i,
`cpu,host=serverA value=1 30`,
`mem,region=west value=2 40`, // skip: wrong source
`cpu,host=serverC value=3 60`,
)
}

done := make(chan struct{})
errC := make(chan error, 2)

// Randomly close and open the shards.
go func() {
for {
select {
case <-done:
errC <- nil
return
default:
i := uint64(rand.Intn(int(shardN)))
if sh := s.Shard(i); sh == nil {
errC <- errors.New("shard should not be nil")
return
} else {
if err := sh.Close(); err != nil {
errC <- err
return
}
time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil {
errC <- err
return
}
}
}
}
}()

// Attempt to get tag keys from the shards.
go func() {
for {
select {
case <-done:
errC <- nil
return
default:
keys, err := s.TagKeys(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}

if err != nil {
errC <- err
return
}

if got, exp := keys[0].Keys, []string{"host"}; !reflect.DeepEqual(got, exp) {
errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
return
}

if got, exp := keys[1].Keys, []string{"region"}; !reflect.DeepEqual(got, exp) {
errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
return
}
}
}
}()

// Run for 500ms
time.Sleep(500 * time.Millisecond)

close(done)

// Check for errors
if err := <-errC; err != nil {
t.Fatal(err)
}
if err := <-errC; err != nil {
t.Fatal(err)
}
}
}

func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
s := MustOpenStore(index)
defer s.Close()

shardN := 10
for i := 0; i < shardN; i++ {
// Create new shards with some data
s.MustCreateShardWithData("db0", "rp0", i,
`cpu,host=serverA value=1 30`,
`mem,region=west value=2 40`, // skip: wrong source
`cpu,host=serverC value=3 60`,
)
}

done := make(chan struct{})
errC := make(chan error, 2)

// Randomly close and open the shards.
go func() {
for {
select {
case <-done:
errC <- nil
return
default:
i := uint64(rand.Intn(int(shardN)))
if sh := s.Shard(i); sh == nil {
errC <- errors.New("shard should not be nil")
return
} else {
if err := sh.Close(); err != nil {
errC <- err
return
}
time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil {
errC <- err
return
}
}
}
}
}()

// Attempt to get tag keys from the shards.
go func() {
for {
select {
case <-done:
errC <- nil
return
default:
stmt, err := influxql.ParseStatement(`SHOW TAG VALUES WITH KEY = "host"`)
if err != nil {
t.Fatal(err)
}
rewrite, err := query.RewriteStatement(stmt)
if err != nil {
t.Fatal(err)
}

cond := rewrite.(*influxql.ShowTagValuesStatement).Condition
values, err := s.TagValues(nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond)
if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
continue // These errors are expected
}

if err != nil {
errC <- err
return
}

exp := tsdb.TagValues{
Measurement: "cpu",
Values: []tsdb.KeyValue{
tsdb.KeyValue{Key: "host", Value: "serverA"},
tsdb.KeyValue{Key: "host", Value: "serverC"},
},
}

if got := values[0]; !reflect.DeepEqual(got, exp) {
errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
return
}
}
}
}()

// Run for 500ms
time.Sleep(500 * time.Millisecond)

close(done)

// Check for errors
if err := <-errC; err != nil {
t.Fatal(err)
}
if err := <-errC; err != nil {
t.Fatal(err)
}
}
}

func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
for _, index := range tsdb.RegisteredIndexes() {
store := NewStore(index)
Expand Down

0 comments on commit 9691113

Please sign in to comment.