Skip to content

Commit

Permalink
feat(query): Reader implementation for WindowAggregate (influxdata#17885
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ethanyzhang authored Apr 29, 2020
1 parent 19ba69b commit 3c2ab1b
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 122 deletions.
18 changes: 9 additions & 9 deletions mock/store_reader.go → mock/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@ import (
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
)

type StoreReader struct {
type StorageReader struct {
ReadFilterFn func(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
CloseFn func()
}

func (s *StoreReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (s *StorageReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return s.ReadFilterFn(ctx, spec, alloc)
}

func (s *StoreReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (s *StorageReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return s.ReadGroupFn(ctx, spec, alloc)
}

func (s *StoreReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (s *StorageReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return s.ReadTagKeysFn(ctx, spec, alloc)
}

func (s *StoreReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (s *StorageReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return s.ReadTagValuesFn(ctx, spec, alloc)
}

func (s *StoreReader) Close() {
func (s *StorageReader) Close() {
// Only invoke the close function if it is set.
// We want this to be a no-op and work without
// explicitly setting up a close function.
Expand All @@ -41,12 +41,12 @@ func (s *StoreReader) Close() {
}

type WindowAggregateStoreReader struct {
*StoreReader
HasWindowAggregateCapabilityFn func(ctx context.Context) bool
*StorageReader
HasWindowAggregateCapabilityFn func(ctx context.Context, capability ...*influxdb.WindowAggregateCapability) bool
ReadWindowAggregateFn func(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
}

func (s *WindowAggregateStoreReader) HasWindowAggregateCapability(ctx context.Context) bool {
func (s *WindowAggregateStoreReader) HasWindowAggregateCapability(ctx context.Context, capability ...*influxdb.WindowAggregateCapability) bool {
// Use the function if it exists.
if s.HasWindowAggregateCapabilityFn != nil {
return s.HasWindowAggregateCapabilityFn(ctx)
Expand Down
2 changes: 1 addition & 1 deletion query/stdlib/influxdata/influxdb/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type ReadWindowAggregatePhysSpec struct {
ReadRangePhysSpec

WindowEvery int64
Aggregates []plan.ProcedureKind
Aggregates []string
}

func (s *ReadWindowAggregatePhysSpec) Kind() plan.ProcedureKind {
Expand Down
2 changes: 1 addition & 1 deletion query/stdlib/influxdata/influxdb/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestReadWindowAggregateSource(t *testing.T) {
BucketID: bucketID.String(),
},
WindowEvery: 10,
Aggregates: []plan.ProcedureKind{
Aggregates: []string{
universe.SumKind,
},
}
Expand Down
3 changes: 1 addition & 2 deletions query/stdlib/influxdata/influxdb/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/prom"
Expand Down Expand Up @@ -148,7 +147,7 @@ type TableIterator interface {
type ReadWindowAggregateSpec struct {
ReadFilterSpec
WindowEvery int64
Aggregates []plan.ProcedureKind
Aggregates []string
}

// WindowAggregateCapability describes what is supported by WindowAggregateReader.
Expand Down
85 changes: 85 additions & 0 deletions storage/flux/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
storage "github.com/influxdata/influxdb/v2/storage/reads"
Expand Down Expand Up @@ -79,6 +80,23 @@ func (r *storeReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec
}, nil
}

func (r *storeReader) HasWindowAggregateCapability(ctx context.Context, capability ...*influxdb.WindowAggregateCapability) bool {
if aggStore, ok := r.s.(storage.WindowAggregateStore); ok {
return aggStore.HasWindowAggregateCapability(ctx)
}
return false
}

func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return &windowAggregateIterator{
ctx: ctx,
s: r.s,
spec: spec,
cache: newTagsCache(0),
alloc: alloc,
}, nil
}

func (r *storeReader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
var predicate *datatypes.Predicate
if spec.Predicate != nil {
Expand Down Expand Up @@ -527,6 +545,73 @@ func groupKeyForGroup(kv [][]byte, spec *influxdb.ReadGroupSpec, bnds execute.Bo
return execute.NewGroupKey(cols, vs)
}

type windowAggregateIterator struct {
ctx context.Context
s storage.Store
spec influxdb.ReadWindowAggregateSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
}

func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats }

func (wai *windowAggregateIterator) Do(f func(flux.Table) error) error {
src := wai.s.GetSource(
uint64(wai.spec.OrganizationID),
uint64(wai.spec.BucketID),
)

// Setup read request
any, err := types.MarshalAny(src)
if err != nil {
return err
}

var predicate *datatypes.Predicate
if wai.spec.Predicate != nil {
p, err := toStoragePredicate(wai.spec.Predicate)
if err != nil {
return err
}
predicate = p
}

var req datatypes.ReadWindowAggregateRequest
req.ReadSource = any
req.Predicate = predicate
req.Range.Start = int64(wai.spec.Bounds.Start)
req.Range.End = int64(wai.spec.Bounds.Stop)

req.WindowEvery = wai.spec.WindowEvery
req.Aggregate = make([]*datatypes.Aggregate, len(wai.spec.Aggregates))
for i, aggKind := range wai.spec.Aggregates {
if agg, err := determineAggregateMethod(aggKind); err != nil {
return err
} else if agg != datatypes.AggregateTypeNone {
req.Aggregate[i] = &datatypes.Aggregate{Type: agg}
}
}

if aggStore, ok := wai.s.(storage.WindowAggregateStore); !ok {
return errors.New("storage does not support window aggregate.")
} else {
rs, err := aggStore.WindowAggregate(wai.ctx, &req)
if err != nil {
return err
}

if rs == nil {
return nil
}
return wai.handleRead(f, rs)
}
}

func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs storage.ResultSet) error {
return nil
}

type tagKeysIterator struct {
ctx context.Context
bounds execute.Bounds
Expand Down
Loading

0 comments on commit 3c2ab1b

Please sign in to comment.