diff --git a/flags.yml b/flags.yml index f369c921e03..d721460d590 100644 --- a/flags.yml +++ b/flags.yml @@ -28,13 +28,13 @@ expose: true - name: Push Down Window Aggregate Count - description: Enable Count variant of PushDownWindowAggregateRule + description: Enable Count variant of PushDownWindowAggregateRule and PushDownBareAggregateRule key: pushDownWindowAggregateCount default: false contact: Query Team - name: Push Down Window Aggregate Rest - description: Enable non-Count variants of PushDownWindowAggregateRule (stage 2) + description: Enable non-Count variants of PushDownWindowAggregateRule and PushDownWindowAggregateRule (stage 2) key: pushDownWindowAggregateRest default: false contact: Query Team diff --git a/kit/feature/list.go b/kit/feature/list.go index a4832f7a29f..a2bce7d1838 100644 --- a/kit/feature/list.go +++ b/kit/feature/list.go @@ -39,7 +39,7 @@ var pushDownWindowAggregateCount = MakeBoolFlag( false, ) -// PushDownWindowAggregateCount - Enable Count variant of PushDownWindowAggregateRule +// PushDownWindowAggregateCount - Enable Count variant of PushDownWindowAggregateRule and PushDownBareAggregateRule func PushDownWindowAggregateCount() BoolFlag { return pushDownWindowAggregateCount } @@ -53,7 +53,7 @@ var pushDownWindowAggregateRest = MakeBoolFlag( false, ) -// PushDownWindowAggregateRest - Enable non-Count variants of PushDownWindowAggregateRule (stage 2) +// PushDownWindowAggregateRest - Enable non-Count variants of PushDownWindowAggregateRule and PushDownWindowAggregateRule (stage 2) func PushDownWindowAggregateRest() BoolFlag { return pushDownWindowAggregateRest } diff --git a/query/stdlib/influxdata/influxdb/rules.go b/query/stdlib/influxdata/influxdb/rules.go index 9f89334d3ce..201f0a98eed 100644 --- a/query/stdlib/influxdata/influxdb/rules.go +++ b/query/stdlib/influxdata/influxdb/rules.go @@ -2,6 +2,7 @@ package influxdb import ( "context" + "math" "github.com/influxdata/flux" "github.com/influxdata/flux/ast" @@ -24,9 +25,10 @@ func init() { PushDownReadTagKeysRule{}, PushDownReadTagValuesRule{}, SortedPivotRule{}, - // For this rule to take effect the appropriate capabilities must be + // For the following two rules to take effect the appropriate capabilities must be // added AND feature flags must be enabled. // PushDownWindowAggregateRule{}, + // PushDownBareAggregateRule{}, // For this rule to take effect the corresponding feature flags must be // enabled. @@ -658,82 +660,90 @@ func (PushDownWindowAggregateRule) Name() string { return "PushDownWindowAggregateRule" } +var windowPushableAggs = []plan.ProcedureKind{ + universe.MinKind, + universe.MaxKind, + universe.MeanKind, + universe.CountKind, + universe.SumKind, +} + func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern { - return plan.OneOf( - []plan.ProcedureKind{ - universe.MinKind, - universe.MaxKind, - universe.MeanKind, - universe.CountKind, - universe.SumKind, - }, + return plan.OneOf(windowPushableAggs, plan.Pat(universe.WindowKind, plan.Pat(ReadRangePhysKind))) } -func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) { +func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool { // Check Capabilities reader := GetStorageDependencies(ctx).FromDeps.Reader windowAggregateReader, ok := reader.(query.WindowAggregateReader) if !ok { - return pn, false, nil + return false } caps := windowAggregateReader.GetWindowAggregateCapability(ctx) if caps == nil { - return pn, false, nil + return false } // Check the aggregate function spec. Require operation on _value. There // are two feature flags covering all cases. One specifically for Count, // and another for the rest. There are individual capability tests for all // cases. - fnNode := pn switch fnNode.Kind() { case universe.MinKind: if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMin() { - return pn, false, nil + return false } minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec) if minSpec.Column != execute.DefaultValueColLabel { - return pn, false, nil + return false } case universe.MaxKind: if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMax() { - return pn, false, nil + return false } maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec) if maxSpec.Column != execute.DefaultValueColLabel { - return pn, false, nil + return false } case universe.MeanKind: if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMean() { - return pn, false, nil + return false } meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec) if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != execute.DefaultValueColLabel { - return pn, false, nil + return false } case universe.CountKind: if !feature.PushDownWindowAggregateCount().Enabled(ctx) || !caps.HaveCount() { - return pn, false, nil + return false } countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec) if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel { - return pn, false, nil + return false } case universe.SumKind: if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveSum() { - return pn, false, nil + return false } sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec) if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != execute.DefaultValueColLabel { - return pn, false, nil + return false } } + return true +} + +func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) { + fnNode := pn + if !canPushWindowedAggregate(ctx, fnNode) { + return pn, false, nil + } windowNode := fnNode.Predecessors()[0] windowSpec := windowNode.ProcedureSpec().(*universe.WindowProcedureSpec) @@ -769,6 +779,35 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p }), true, nil } +// PushDownBareAggregateRule is a rule that allows pushing down of aggregates +// that are directly over a ReadRange source. +type PushDownBareAggregateRule struct{} + +func (p PushDownBareAggregateRule) Name() string { + return "PushDownWindowAggregateRule" +} + +func (p PushDownBareAggregateRule) Pattern() plan.Pattern { + return plan.OneOf(windowPushableAggs, + plan.Pat(ReadRangePhysKind)) +} + +func (p PushDownBareAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) { + fnNode := pn + if !canPushWindowedAggregate(ctx, fnNode) { + return pn, false, nil + } + + fromNode := fnNode.Predecessors()[0] + fromSpec := fromNode.ProcedureSpec().(*ReadRangePhysSpec) + + return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{ + ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec), + Aggregates: []plan.ProcedureKind{fnNode.Kind()}, + WindowEvery: math.MaxInt64, + }), true, nil +} + // // Push Down of group aggregates. // ReadGroupPhys |> { count } diff --git a/query/stdlib/influxdata/influxdb/rules_test.go b/query/stdlib/influxdata/influxdb/rules_test.go index 808b0d8fe72..a42d28cff0f 100644 --- a/query/stdlib/influxdata/influxdb/rules_test.go +++ b/query/stdlib/influxdata/influxdb/rules_test.go @@ -2,6 +2,7 @@ package influxdb_test import ( "context" + "math" "testing" "time" @@ -16,11 +17,11 @@ import ( "github.com/influxdata/flux/semantic" "github.com/influxdata/flux/stdlib/universe" "github.com/influxdata/flux/values" + "github.com/influxdata/influxdb/v2/kit/feature" + "github.com/influxdata/influxdb/v2/mock" "github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" - "github.com/influxdata/influxdb/v2/mock" - "github.com/influxdata/influxdb/v2/kit/feature" ) // A small mock reader so we can indicate if rule-related capabilities are @@ -31,7 +32,7 @@ type mockReaderCaps struct { } func (caps mockReaderCaps) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability { - return mockWAC{ Have: caps.Have } + return mockWAC{Have: caps.Have} } func (caps mockReaderCaps) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { @@ -43,11 +44,11 @@ type mockWAC struct { Have bool } -func (m mockWAC) HaveMin() bool { return m.Have } -func (m mockWAC) HaveMax() bool { return m.Have } -func (m mockWAC) HaveMean() bool { return m.Have } +func (m mockWAC) HaveMin() bool { return m.Have } +func (m mockWAC) HaveMax() bool { return m.Have } +func (m mockWAC) HaveMean() bool { return m.Have } func (m mockWAC) HaveCount() bool { return m.Have } -func (m mockWAC) HaveSum() bool { return m.Have } +func (m mockWAC) HaveSum() bool { return m.Have } func fluxTime(t int64) flux.Time { return flux.Time{ @@ -1152,9 +1153,9 @@ func TestReadTagValuesRule(t *testing.T) { // func TestPushDownWindowAggregateRule(t *testing.T) { // Turn on all variants. - flagger := mock.NewFlagger(map[feature.Flag] interface{}{ + flagger := mock.NewFlagger(map[feature.Flag]interface{}{ feature.PushDownWindowAggregateCount(): true, - feature.PushDownWindowAggregateRest(): true, + feature.PushDownWindowAggregateRest(): true, }) withFlagger, _ := feature.Annotate(context.Background(), flagger) @@ -1163,7 +1164,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) { deps := func(have bool) influxdb.StorageDependencies { return influxdb.StorageDependencies{ FromDeps: influxdb.FromDependencies{ - Reader: mockReaderCaps{ Have: have }, + Reader: mockReaderCaps{Have: have}, Metrics: influxdb.NewMetrics(nil), }, } @@ -1186,7 +1187,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) { durNeg, _ := values.ParseDuration("-60s") dur1y, _ := values.ParseDuration("1y") - window := func(dur values.Duration) universe.WindowProcedureSpec{ + window := func(dur values.Duration) universe.WindowProcedureSpec { return universe.WindowProcedureSpec{ Window: plan.WindowSpec{ Every: dur, @@ -1208,7 +1209,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) { tests := make([]plantest.RuleTestCase, 0) // construct a simple plan with a specific window and aggregate function - simplePlanWithWindowAgg := func( window universe.WindowProcedureSpec, agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec { + simplePlanWithWindowAgg := func(window universe.WindowProcedureSpec, agg plan.NodeID, spec plan.ProcedureSpec) *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadRange", &readRange), @@ -1223,13 +1224,13 @@ func TestPushDownWindowAggregateRule(t *testing.T) { } // construct a simple result - simpleResult := func( proc plan.ProcedureKind ) *plantest.PlanSpec { + simpleResult := func(proc plan.ProcedureKind) *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{ ReadRangePhysSpec: readRange, - Aggregates: []plan.ProcedureKind{proc}, - WindowEvery: 60000000000, + Aggregates: []plan.ProcedureKind{proc}, + WindowEvery: 60000000000, }), }, } @@ -1261,66 +1262,64 @@ func TestPushDownWindowAggregateRule(t *testing.T) { } } - // ReadRange -> window -> min => ReadWindowAggregate tests = append(tests, plantest.RuleTestCase{ Context: haveCaps, - Name: "SimplePassMin", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "min", minProcedureSpec() ), - After: simpleResult( "min" ), + Name: "SimplePassMin", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "min", minProcedureSpec()), + After: simpleResult("min"), }) // ReadRange -> window -> max => ReadWindowAggregate tests = append(tests, plantest.RuleTestCase{ Context: haveCaps, - Name: "SimplePassMax", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "max", maxProcedureSpec() ), - After: simpleResult( "max" ), + Name: "SimplePassMax", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "max", maxProcedureSpec()), + After: simpleResult("max"), }) // ReadRange -> window -> mean => ReadWindowAggregate tests = append(tests, plantest.RuleTestCase{ Context: haveCaps, - Name: "SimplePassMean", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "mean", meanProcedureSpec() ), - After: simpleResult( "mean" ), + Name: "SimplePassMean", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "mean", meanProcedureSpec()), + After: simpleResult("mean"), }) // ReadRange -> window -> count => ReadWindowAggregate tests = append(tests, plantest.RuleTestCase{ Context: haveCaps, - Name: "SimplePassCount", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "count", countProcedureSpec() ), - After: simpleResult( "count" ), + Name: "SimplePassCount", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "count", countProcedureSpec()), + After: simpleResult("count"), }) // ReadRange -> window -> sum => ReadWindowAggregate tests = append(tests, plantest.RuleTestCase{ Context: haveCaps, - Name: "SimplePassSum", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "sum", sumProcedureSpec() ), - After: simpleResult( "sum" ), + Name: "SimplePassSum", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "sum", sumProcedureSpec()), + After: simpleResult("sum"), }) - // Rewrite with successors // ReadRange -> window -> min -> count {2} => ReadWindowAggregate -> count {2} tests = append(tests, plantest.RuleTestCase{ Context: haveCaps, - Name: "WithSuccessor", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, + Name: "WithSuccessor", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadRange", &readRange), plan.CreateLogicalNode("window", &window1m), - plan.CreateLogicalNode("min", minProcedureSpec() ), - plan.CreateLogicalNode("count", countProcedureSpec() ), - plan.CreateLogicalNode("count", countProcedureSpec() ), + plan.CreateLogicalNode("min", minProcedureSpec()), + plan.CreateLogicalNode("count", countProcedureSpec()), + plan.CreateLogicalNode("count", countProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1333,11 +1332,11 @@ func TestPushDownWindowAggregateRule(t *testing.T) { Nodes: []plan.Node{ plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{ ReadRangePhysSpec: readRange, - Aggregates: []plan.ProcedureKind{"min"}, - WindowEvery: 60000000000, + Aggregates: []plan.ProcedureKind{"min"}, + WindowEvery: 60000000000, }), - plan.CreateLogicalNode("count", countProcedureSpec() ), - plan.CreateLogicalNode("count", countProcedureSpec() ), + plan.CreateLogicalNode("count", countProcedureSpec()), + plan.CreateLogicalNode("count", countProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1348,14 +1347,14 @@ func TestPushDownWindowAggregateRule(t *testing.T) { // Helper that adds a test with a simple plan that does not pass due to a // specified bad window - simpleMinUnchanged := func( name string, window universe.WindowProcedureSpec ) { + simpleMinUnchanged := func(name string, window universe.WindowProcedureSpec) { // Note: NoChange is not working correctly for these tests. It is // expecting empty time, start, and stop column fields. - tests = append( tests, plantest.RuleTestCase{ - Name: name, - Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window, "min", countProcedureSpec() ), + tests = append(tests, plantest.RuleTestCase{ + Name: name, + Context: haveCaps, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window, "min", countProcedureSpec()), NoChange: true, }) } @@ -1363,47 +1362,47 @@ func TestPushDownWindowAggregateRule(t *testing.T) { // Condition not met: period not equal to every badWindow1 := window1m badWindow1.Window.Period = dur2m - simpleMinUnchanged( "BadPeriod", badWindow1 ) + simpleMinUnchanged("BadPeriod", badWindow1) // Condition not met: offset non-zero badWindow2 := window1m badWindow2.Window.Offset = dur1m - simpleMinUnchanged( "BadOffset", badWindow2 ) + simpleMinUnchanged("BadOffset", badWindow2) // Condition not met: non-standard _time column badWindow3 := window1m badWindow3.TimeColumn = "_timmy" - simpleMinUnchanged( "BadTime", badWindow3 ) + simpleMinUnchanged("BadTime", badWindow3) // Condition not met: non-standard start column badWindow4 := window1m badWindow4.StartColumn = "_stooort" - simpleMinUnchanged( "BadStart", badWindow4 ) + simpleMinUnchanged("BadStart", badWindow4) // Condition not met: non-standard stop column badWindow5 := window1m badWindow5.StopColumn = "_stappp" - simpleMinUnchanged( "BadStop", badWindow5 ) + simpleMinUnchanged("BadStop", badWindow5) // Condition not met: createEmpty is not false badWindow6 := window1m badWindow6.CreateEmpty = true - simpleMinUnchanged( "BadCreateEmpty", badWindow6 ) + simpleMinUnchanged("BadCreateEmpty", badWindow6) // Condition not met: duration too long. - simpleMinUnchanged( "WindowTooLarge", window1y ) + simpleMinUnchanged("WindowTooLarge", window1y) // Condition not met: neg duration. - simpleMinUnchanged( "WindowNeg", windowNeg ) + simpleMinUnchanged("WindowNeg", windowNeg) // Bad min column // ReadRange -> window -> min => NO-CHANGE tests = append(tests, plantest.RuleTestCase{ - Name: "BadMinCol", + Name: "BadMinCol", Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "min", &universe.MinProcedureSpec{ - SelectorConfig: execute.SelectorConfig{Column:"_valmoo"}, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "min", &universe.MinProcedureSpec{ + SelectorConfig: execute.SelectorConfig{Column: "_valmoo"}, }), NoChange: true, }) @@ -1411,11 +1410,11 @@ func TestPushDownWindowAggregateRule(t *testing.T) { // Bad max column // ReadRange -> window -> max => NO-CHANGE tests = append(tests, plantest.RuleTestCase{ - Name: "BadMaxCol", + Name: "BadMaxCol", Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "max", &universe.MaxProcedureSpec{ - SelectorConfig: execute.SelectorConfig{Column:"_valmoo"}, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "max", &universe.MaxProcedureSpec{ + SelectorConfig: execute.SelectorConfig{Column: "_valmoo"}, }), NoChange: true, }) @@ -1423,20 +1422,20 @@ func TestPushDownWindowAggregateRule(t *testing.T) { // Bad mean columns // ReadRange -> window -> mean => NO-CHANGE tests = append(tests, plantest.RuleTestCase{ - Name: "BadMeanCol1", + Name: "BadMeanCol1", Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "mean", &universe.MeanProcedureSpec{ - AggregateConfig: execute.AggregateConfig{Columns:[]string{"_valmoo"}}, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "mean", &universe.MeanProcedureSpec{ + AggregateConfig: execute.AggregateConfig{Columns: []string{"_valmoo"}}, }), NoChange: true, }) tests = append(tests, plantest.RuleTestCase{ - Name: "BadMeanCol2", + Name: "BadMeanCol2", Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "mean", &universe.MeanProcedureSpec{ - AggregateConfig: execute.AggregateConfig{Columns:[]string{"_value", "_valmoo"}}, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "mean", &universe.MeanProcedureSpec{ + AggregateConfig: execute.AggregateConfig{Columns: []string{"_value", "_valmoo"}}, }), NoChange: true, }) @@ -1445,15 +1444,15 @@ func TestPushDownWindowAggregateRule(t *testing.T) { // ReadRange -> window -> min // \-> min tests = append(tests, plantest.RuleTestCase{ - Name: "CollapsedWithSuccessor1", + Name: "CollapsedWithSuccessor1", Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadRange", &readRange), plan.CreateLogicalNode("window", &window1m), - plan.CreateLogicalNode("min", minProcedureSpec() ), - plan.CreateLogicalNode("min", minProcedureSpec() ), + plan.CreateLogicalNode("min", minProcedureSpec()), + plan.CreateLogicalNode("min", minProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1468,14 +1467,14 @@ func TestPushDownWindowAggregateRule(t *testing.T) { // ReadRange -> window -> min // \-> window tests = append(tests, plantest.RuleTestCase{ - Name: "CollapsedWithSuccessor2", + Name: "CollapsedWithSuccessor2", Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadRange", &readRange), plan.CreateLogicalNode("window", &window1m), - plan.CreateLogicalNode("min", minProcedureSpec() ), + plan.CreateLogicalNode("min", minProcedureSpec()), plan.CreateLogicalNode("window", &window2m), }, Edges: [][2]int{ @@ -1487,7 +1486,6 @@ func TestPushDownWindowAggregateRule(t *testing.T) { NoChange: true, }) - // No pattern match // ReadRange -> filter -> window -> min -> NO-CHANGE pushableFn1 := executetest.FunctionExpression(t, `(r) => true`) @@ -1498,7 +1496,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) { Fn: expr, } } - noPatternMatch1 := func() *plantest.PlanSpec{ + noPatternMatch1 := func() *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadRange", &readRange), @@ -1506,7 +1504,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) { Fn: makeResolvedFilterFn(pushableFn1), }), plan.CreateLogicalNode("window", &window1m), - plan.CreateLogicalNode("min", minProcedureSpec() ), + plan.CreateLogicalNode("min", minProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1516,16 +1514,16 @@ func TestPushDownWindowAggregateRule(t *testing.T) { } } tests = append(tests, plantest.RuleTestCase{ - Name: "NoPatternMatch1", - Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: noPatternMatch1(), + Name: "NoPatternMatch1", + Context: haveCaps, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: noPatternMatch1(), NoChange: true, }) // No pattern match 2 // ReadRange -> window -> filter -> min -> NO-CHANGE - noPatternMatch2 := func() *plantest.PlanSpec{ + noPatternMatch2 := func() *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadRange", &readRange), @@ -1533,7 +1531,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) { plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{ Fn: makeResolvedFilterFn(pushableFn1), }), - plan.CreateLogicalNode("min", minProcedureSpec() ), + plan.CreateLogicalNode("min", minProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1543,20 +1541,20 @@ func TestPushDownWindowAggregateRule(t *testing.T) { } } tests = append(tests, plantest.RuleTestCase{ - Name: "NoPatternMatch2", - Context: haveCaps, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: noPatternMatch2(), + Name: "NoPatternMatch2", + Context: haveCaps, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: noPatternMatch2(), NoChange: true, }) // Fail due to no capabilities present. tests = append(tests, plantest.RuleTestCase{ - Context: noCaps, - Name: "FailNoCaps", - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: simplePlanWithWindowAgg( window1m, "count", countProcedureSpec() ), - After: simpleResult( "count" ), + Context: noCaps, + Name: "FailNoCaps", + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: simplePlanWithWindowAgg(window1m, "count", countProcedureSpec()), + After: simpleResult("count"), NoChange: true, }) @@ -1569,28 +1567,138 @@ func TestPushDownWindowAggregateRule(t *testing.T) { } } +func TestPushDownBareAggregateRule(t *testing.T) { + // Turn on support for window aggregate count + flagger := mock.NewFlagger(map[feature.Flag]interface{}{ + feature.PushDownWindowAggregateCount(): true, + }) + + withFlagger, _ := feature.Annotate(context.Background(), flagger) + + // Construct dependencies either with or without aggregate window caps. + deps := func(have bool) influxdb.StorageDependencies { + return influxdb.StorageDependencies{ + FromDeps: influxdb.FromDependencies{ + Reader: mockReaderCaps{Have: have}, + Metrics: influxdb.NewMetrics(nil), + }, + } + } + + haveCaps := deps(true).Inject(withFlagger) + noCaps := deps(false).Inject(withFlagger) + + readRange := &influxdb.ReadRangePhysSpec{ + Bucket: "my-bucket", + Bounds: flux.Bounds{ + Start: fluxTime(5), + Stop: fluxTime(10), + }, + } + + readWindowAggregate := &influxdb.ReadWindowAggregatePhysSpec{ + ReadRangePhysSpec: *(readRange.Copy().(*influxdb.ReadRangePhysSpec)), + WindowEvery: math.MaxInt64, + Aggregates: []plan.ProcedureKind{universe.CountKind}, + } + + minProcedureSpec := func() *universe.MinProcedureSpec { + return &universe.MinProcedureSpec{ + SelectorConfig: execute.SelectorConfig{Column: "_value"}, + } + } + countProcedureSpec := func() *universe.CountProcedureSpec { + return &universe.CountProcedureSpec{ + AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}}, + } + } + + testcases := []plantest.RuleTestCase{ + { + // successful push down + Context: haveCaps, + Name: "push down count", + Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}}, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadRange", readRange), + plan.CreatePhysicalNode("count", countProcedureSpec()), + }, + Edges: [][2]int{ + {0, 1}, + }, + }, + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate), + }, + }, + }, + { + // capability not provided in storage layer + Context: noCaps, + Name: "no caps", + Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}}, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadRange", readRange), + plan.CreatePhysicalNode("count", countProcedureSpec()), + }, + Edges: [][2]int{ + {0, 1}, + }, + }, + NoChange: true, + }, + { + // unsupported aggregate + Context: haveCaps, + Name: "no push down min", + Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}}, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadRange", readRange), + plan.CreatePhysicalNode("count", minProcedureSpec()), + }, + Edges: [][2]int{ + {0, 1}, + }, + }, + NoChange: true, + }, + } + + for _, tc := range testcases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + plantest.PhysicalRuleTestHelper(t, &tc) + }) + } +} + // // Group Aggregate Testing // func TestPushDownGroupAggregateRule(t *testing.T) { // Turn on all flags - flagger := mock.NewFlagger(map[feature.Flag] interface{}{ + flagger := mock.NewFlagger(map[feature.Flag]interface{}{ feature.PushDownGroupAggregateCount(): true, }) ctx, _ := feature.Annotate(context.Background(), flagger) - readGroupAgg := func(aggregateMethod string) *influxdb.ReadGroupPhysSpec{ + readGroupAgg := func(aggregateMethod string) *influxdb.ReadGroupPhysSpec { return &influxdb.ReadGroupPhysSpec{ - ReadRangePhysSpec: influxdb.ReadRangePhysSpec { + ReadRangePhysSpec: influxdb.ReadRangePhysSpec{ Bucket: "my-bucket", Bounds: flux.Bounds{ Start: fluxTime(5), Stop: fluxTime(10), }, }, - GroupMode: flux.GroupModeBy, - GroupKeys: []string{"_measurement", "tag0", "tag1"}, + GroupMode: flux.GroupModeBy, + GroupKeys: []string{"_measurement", "tag0", "tag1"}, AggregateMethod: aggregateMethod, } } @@ -1601,7 +1709,7 @@ func TestPushDownGroupAggregateRule(t *testing.T) { tests := make([]plantest.RuleTestCase, 0) // construct a simple plan with a specific aggregate - simplePlanWithAgg := func( agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec { + simplePlanWithAgg := func(agg plan.NodeID, spec plan.ProcedureSpec) *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadGroup", readGroup()), @@ -1614,10 +1722,10 @@ func TestPushDownGroupAggregateRule(t *testing.T) { } // construct a simple result - simpleResult := func( proc string ) *plantest.PlanSpec { + simpleResult := func(proc string) *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ - plan.CreatePhysicalNode("ReadGroup", readGroupAgg(proc) ), + plan.CreatePhysicalNode("ReadGroup", readGroupAgg(proc)), }, } } @@ -1641,24 +1749,24 @@ func TestPushDownGroupAggregateRule(t *testing.T) { // ReadGroup -> count => ReadGroup tests = append(tests, plantest.RuleTestCase{ Context: ctx, - Name: "SimplePassCount", - Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}}, - Before: simplePlanWithAgg( "count", countProcedureSpec() ), - After: simpleResult( "count" ), + Name: "SimplePassCount", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, + Before: simplePlanWithAgg("count", countProcedureSpec()), + After: simpleResult("count"), }) // Rewrite with successors // ReadGroup -> count -> sum {2} => ReadGroup -> count {2} tests = append(tests, plantest.RuleTestCase{ Context: ctx, - Name: "WithSuccessor1", - Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}}, + Name: "WithSuccessor1", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadGroup", readGroup()), - plan.CreateLogicalNode("count", countProcedureSpec() ), - plan.CreateLogicalNode("sum", sumProcedureSpec() ), - plan.CreateLogicalNode("sum", sumProcedureSpec() ), + plan.CreateLogicalNode("count", countProcedureSpec()), + plan.CreateLogicalNode("sum", sumProcedureSpec()), + plan.CreateLogicalNode("sum", sumProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1668,9 +1776,9 @@ func TestPushDownGroupAggregateRule(t *testing.T) { }, After: &plantest.PlanSpec{ Nodes: []plan.Node{ - plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count") ), - plan.CreateLogicalNode("sum", sumProcedureSpec() ), - plan.CreateLogicalNode("sum", sumProcedureSpec() ), + plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count")), + plan.CreateLogicalNode("sum", sumProcedureSpec()), + plan.CreateLogicalNode("sum", sumProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1684,13 +1792,13 @@ func TestPushDownGroupAggregateRule(t *testing.T) { // ReadGroup -> count -> count => ReadGroup -> count tests = append(tests, plantest.RuleTestCase{ Context: ctx, - Name: "WithSuccessor2", - Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}}, + Name: "WithSuccessor2", + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadGroup", readGroup()), - plan.CreateLogicalNode("count", countProcedureSpec() ), - plan.CreateLogicalNode("count", countProcedureSpec() ), + plan.CreateLogicalNode("count", countProcedureSpec()), + plan.CreateLogicalNode("count", countProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1699,8 +1807,8 @@ func TestPushDownGroupAggregateRule(t *testing.T) { }, After: &plantest.PlanSpec{ Nodes: []plan.Node{ - plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count") ), - plan.CreateLogicalNode("count", countProcedureSpec() ), + plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count")), + plan.CreateLogicalNode("count", countProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1711,27 +1819,27 @@ func TestPushDownGroupAggregateRule(t *testing.T) { // Bad count column // ReadGroup -> count => NO-CHANGE tests = append(tests, plantest.RuleTestCase{ - Name: "BadCountCol", + Name: "BadCountCol", Context: ctx, - Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}}, - Before: simplePlanWithAgg( "count", &universe.CountProcedureSpec{ + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, + Before: simplePlanWithAgg("count", &universe.CountProcedureSpec{ AggregateConfig: execute.AggregateConfig{Columns: []string{"_valmoo"}}, }), NoChange: true, }) // No match due to a collapsed node having a successor - // ReadGroup -> count + // ReadGroup -> count // \-> min tests = append(tests, plantest.RuleTestCase{ - Name: "CollapsedWithSuccessor", + Name: "CollapsedWithSuccessor", Context: ctx, - Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}}, + Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}}, Before: &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadGroup", readGroup()), - plan.CreateLogicalNode("count", countProcedureSpec() ), - plan.CreateLogicalNode("min", minProcedureSpec() ), + plan.CreateLogicalNode("count", countProcedureSpec()), + plan.CreateLogicalNode("min", minProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1751,14 +1859,14 @@ func TestPushDownGroupAggregateRule(t *testing.T) { Fn: expr, } } - noPatternMatch1 := func() *plantest.PlanSpec{ + noPatternMatch1 := func() *plantest.PlanSpec { return &plantest.PlanSpec{ Nodes: []plan.Node{ plan.CreateLogicalNode("ReadGroup", readGroup()), plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{ Fn: makeResolvedFilterFn(pushableFn1), }), - plan.CreateLogicalNode("count", countProcedureSpec() ), + plan.CreateLogicalNode("count", countProcedureSpec()), }, Edges: [][2]int{ {0, 1}, @@ -1767,10 +1875,10 @@ func TestPushDownGroupAggregateRule(t *testing.T) { } } tests = append(tests, plantest.RuleTestCase{ - Name: "NoPatternMatch", - Context: ctx, - Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}}, - Before: noPatternMatch1(), + Name: "NoPatternMatch", + Context: ctx, + Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}}, + Before: noPatternMatch1(), NoChange: true, })