Skip to content

Commit

Permalink
fix(storage/flux): fix the empty call for storage/flux (#18446)
Browse files Browse the repository at this point in the history
The tables produced by `storage/flux` didn't previously pass our table
tests. The `Empty()` call is supposed to return false if the table was
ever not empty, but reading the table or calling `Done()` would cause
the table implementations here to return that they were always empty.
This messes up the csv encoder which then believes that it just emitted
an empty table.

The table tests for valid table implementations states that this is an
error for the table implementation. This change introduces a simple test
for `ReadFilter` and also runs the table tests on the filter iterator.
  • Loading branch information
jsternberg authored Jun 11, 2020
1 parent e8fe3c9 commit 1d5a7bf
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 19 deletions.
30 changes: 15 additions & 15 deletions storage/flux/table.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newFloatTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func newFloatWindowTable(
t.nextTS = start + (every - start%every)
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func newFloatGroupTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func newIntegerTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -535,7 +535,7 @@ func newIntegerWindowTable(
t.nextTS = start + (every - start%every)
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -730,7 +730,7 @@ func newIntegerGroupTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -843,7 +843,7 @@ func newUnsignedTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -933,7 +933,7 @@ func newUnsignedWindowTable(
t.nextTS = start + (every - start%every)
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func newUnsignedGroupTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1241,7 +1241,7 @@ func newStringTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func newStringWindowTable(
t.nextTS = start + (every - start%every)
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1526,7 +1526,7 @@ func newStringGroupTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1639,7 +1639,7 @@ func newBooleanTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1729,7 +1729,7 @@ func newBooleanWindowTable(
t.nextTS = start + (every - start%every)
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -1924,7 +1924,7 @@ func newBooleanGroupTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down
6 changes: 3 additions & 3 deletions storage/flux/table.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func new{{.Name}}Table(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func new{{.Name}}WindowTable(
t.nextTS = start + (every - start % every)
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func new{{.Name}}GroupTable(
cur: cur,
}
t.readTags(tags)
t.advance()
t.init(t.advance)

return t
}
Expand Down
7 changes: 6 additions & 1 deletion storage/flux/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type table struct {
done chan struct{}

colBufs *colReader
empty bool

err error

Expand Down Expand Up @@ -59,7 +60,7 @@ func newTable(
func (t *table) Key() flux.GroupKey { return t.key }
func (t *table) Cols() []flux.ColMeta { return t.cols }
func (t *table) Err() error { return t.err }
func (t *table) Empty() bool { return t.colBufs == nil || t.colBufs.l == 0 }
func (t *table) Empty() bool { return t.empty }

func (t *table) Cancel() {
atomic.StoreInt32(&t.cancelled, 1)
Expand All @@ -69,6 +70,10 @@ func (t *table) isCancelled() bool {
return atomic.LoadInt32(&t.cancelled) != 0
}

func (t *table) init(advance func() bool) {
t.empty = !advance()
}

func (t *table) do(f func(flux.ColReader) error, advance func() bool) error {
// Mark this table as having been used. If this doesn't
// succeed, then this has already been invoked somewhere else.
Expand Down
7 changes: 7 additions & 0 deletions storage/flux/table_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package storageflux

import "sync/atomic"

func (t *table) IsDone() bool {
return atomic.LoadInt32(&t.used) != 0
}
181 changes: 181 additions & 0 deletions storage/flux/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,187 @@ func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.Read
return wr.ReadWindowAggregate(ctx, spec, alloc)
}

func TestStorageReader_ReadFilter(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Tags: []*gen.TagValuesSpec{
{
TagKey: "t0",
Values: func() gen.CountableSequence {
return gen.NewCounterByteSequence("a-%s", 0, 3)
},
},
},
}
spec := gen.Spec{
OrgID: org,
BucketID: bucket,
Measurements: []gen.MeasurementSpec{
{
Name: "m0",
TagsSpec: tagsSpec,
FieldValuesSpec: &gen.FieldValuesSpec{
Name: "f0",
TimeSequenceSpec: gen.TimeSequenceSpec{
Count: math.MaxInt32,
Delta: 10 * time.Second,
},
DataType: models.Float,
Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
return gen.NewTimeFloatValuesSequence(
spec.Count,
gen.NewTimestampSequenceFromSpec(spec),
gen.NewFloatArrayValuesSequence([]float64{1.0, 2.0, 3.0}),
)
},
},
},
},
}
tr := gen.TimeRange{
Start: mustParseTime("2019-11-25T00:00:00Z"),
End: mustParseTime("2019-11-25T00:00:30Z"),
}
return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
})
defer reader.Close()

mem := &memory.Allocator{}
ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
}, mem)
if err != nil {
t.Fatal(err)
}

makeTable := func(t0 string) *executetest.Table {
start, stop := reader.Bounds.Start, reader.Bounds.Stop
return &executetest.Table{
KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},
ColMeta: []flux.ColMeta{
{Label: "_start", Type: flux.TTime},
{Label: "_stop", Type: flux.TTime},
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TFloat},
{Label: "_field", Type: flux.TString},
{Label: "_measurement", Type: flux.TString},
{Label: "t0", Type: flux.TString},
},
Data: [][]interface{}{
{start, stop, Time("2019-11-25T00:00:00Z"), 1.0, "f0", "m0", t0},
{start, stop, Time("2019-11-25T00:00:10Z"), 2.0, "f0", "m0", t0},
{start, stop, Time("2019-11-25T00:00:20Z"), 3.0, "f0", "m0", t0},
},
}
}

want := []*executetest.Table{
makeTable("a-0"),
makeTable("a-1"),
makeTable("a-2"),
}
executetest.NormalizeTables(want)
sort.Sort(executetest.SortedTables(want))

var got []*executetest.Table
if err := ti.Do(func(table flux.Table) error {
t, err := executetest.ConvertTable(table)
if err != nil {
return err
}
got = append(got, t)
return nil
}); err != nil {
t.Fatal(err)
}
executetest.NormalizeTables(got)
sort.Sort(executetest.SortedTables(got))

// compare these two
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
}

func TestStorageReader_Table(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Tags: []*gen.TagValuesSpec{
{
TagKey: "t0",
Values: func() gen.CountableSequence {
return gen.NewCounterByteSequence("a-%s", 0, 3)
},
},
},
}
spec := gen.Spec{
OrgID: org,
BucketID: bucket,
Measurements: []gen.MeasurementSpec{
{
Name: "m0",
TagsSpec: tagsSpec,
FieldValuesSpec: &gen.FieldValuesSpec{
Name: "f0",
TimeSequenceSpec: gen.TimeSequenceSpec{
Count: math.MaxInt32,
Delta: 10 * time.Second,
},
DataType: models.Float,
Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
return gen.NewTimeFloatValuesSequence(
spec.Count,
gen.NewTimestampSequenceFromSpec(spec),
gen.NewFloatArrayValuesSequence([]float64{1.0, 2.0, 3.0}),
)
},
},
},
},
}
tr := gen.TimeRange{
Start: mustParseTime("2019-11-25T00:00:00Z"),
End: mustParseTime("2019-11-25T00:00:30Z"),
}
return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
})
defer reader.Close()

for _, tc := range []struct {
name string
newFn func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator
}{
{
name: "ReadFilter",
newFn: func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator {
ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
}, alloc)
if err != nil {
t.Fatal(err)
}
return ti
},
},
} {
t.Run(tc.name, func(t *testing.T) {
executetest.RunTableTests(t, executetest.TableTest{
NewFn: tc.newFn,
IsDone: func(table flux.Table) bool {
return table.(interface {
IsDone() bool
}).IsDone()
},
})
})
}
}

func TestStorageReader_ReadWindowAggregate(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{
Expand Down

0 comments on commit 1d5a7bf

Please sign in to comment.