Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: promote algorithm W to master #17900

Merged
merged 106 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
0f3aabc
refactor(query): let influxdb build against algo-w branch (#16673)
Jan 24, 2020
2bc8d49
chore(go.mod): update go.mod to point at latest flux@feat/use-algo-w
Jan 31, 2020
d25c56c
chore: merge commit 'e472b013d93d' into feat/use-algo-w
Feb 10, 2020
337595c
chore: update to latest flux@feat/use-algo-w
Feb 10, 2020
2e073fd
chore: update to latest flux@feat/use-algo-w
Feb 11, 2020
d676696
test(query): skip tests that are not passing in feat/use-algo-w (#16813)
Feb 11, 2020
16fc8df
build: remove -tags libflux and FLUX_PARSER_TYPE from build and circl…
Feb 19, 2020
766aa23
chore: update flux dependency
Feb 21, 2020
b01e194
chore: update flux dependency in promqltests
Feb 21, 2020
2c184ea
chore: update flux dependency and unskip notification tests
Feb 21, 2020
935180e
refactor: update flux for the runtime package (#17002)
jsternberg Feb 27, 2020
ff848bc
refactor(repl): remove the querier from the repl (#17031)
jsternberg Feb 27, 2020
25b0e37
feat: add support for pkg-config (#17036)
jsternberg Feb 28, 2020
0872af8
refactor: update semantic builtin lookups to use runtime now (#17066)
jsternberg Mar 2, 2020
03cfdba
fix(stdlib): unskip flux end to end tests
Mar 2, 2020
a84df74
feat(stdlib): add influxdb source (#17047)
jsternberg Mar 3, 2020
3250fb1
fix(stdlib): update flux dependency and unskip test
Mar 5, 2020
a907e05
refactor(http): modify query handler to use a language service (#17074)
jsternberg Mar 5, 2020
bcbb9df
refactor(task): tasks will now use the flux language service (#17104)
jsternberg Mar 5, 2020
0d6e4e3
Merge branch 'master' into feat/use-algo-w
jsternberg Mar 6, 2020
400d710
refactor(storage/reads): remove the storage dependency on libflux (#1…
jsternberg Mar 7, 2020
8d2ba69
refactor(http): remove the spec and update lang usage (#17148)
jsternberg Mar 9, 2020
4d68385
Merge branch 'master' into feat/use-algo-w
jsternberg Mar 9, 2020
92f121d
fix: change variable name to match new clockface variable names
hoorayimhelping Mar 9, 2020
2374717
refactor(kv): when no session length is set, use the default
jsternberg Mar 9, 2020
7012470
Merge branch 'master' into feat/use-algo-w
jsternberg Mar 10, 2020
07c9a7c
fix(stdlib): unskip stateChanges end to end test
Mar 12, 2020
790661c
test(query/stdlib/influxdata/influxdb): update rules_test to use algo…
Mar 16, 2020
5f30466
feat(query): update buckets and v1.databases calls (#17294)
jsternberg Mar 17, 2020
6c8125b
chore: update flux dependency
Mar 19, 2020
20c2439
refactor(http): change *ast.Package to json.RawMessage in query reque…
Mar 23, 2020
e9689b7
chore: update to latest flux algo-w (#17408)
Mar 24, 2020
e57c28d
chore: update to latest algo-w flux (#17422)
Mar 25, 2020
7d885fb
chore: update flux
Mar 26, 2020
4c5ef96
refactor: add pkg-config package to tools.go (#17493)
jsternberg Mar 30, 2020
469c584
Merge branch 'master' into feat/use-algo-w
jsternberg Apr 3, 2020
4ade845
chore: update flux to latest algow
Apr 3, 2020
27f7f4c
chore: update flux to latest algow
Apr 3, 2020
d3fa609
Merge branch 'master' into chore/merge-master
jsternberg Apr 6, 2020
bf15ba7
chore: update flux to latest algow
Apr 6, 2020
17c244d
chore: fix the promqltests go module file (#17631)
jsternberg Apr 6, 2020
1bb08ce
refactor(query/stdlib): modify storage filters to use the predicate d…
jsternberg Apr 7, 2020
687ab70
refactor(query): update semantic graph usage for function expression …
jsternberg Apr 7, 2020
5c99219
chore: update flux to latest algow
Apr 14, 2020
b9df72c
chore: update flux to latest algow
Apr 14, 2020
7541af8
chore: merge master into algow
Apr 15, 2020
0bac09d
Merge pull request #17759 from influxdata/chore/merge-master
Apr 15, 2020
90fb754
Merge branch 'master' into chore/merge-master
Apr 16, 2020
8e098b5
Merge pull request #17777 from influxdata/chore/merge-master
Apr 16, 2020
9857ae1
feat: release a nightly image of algow
Apr 16, 2020
9c67148
chore: update flux to latest algow revision
Apr 20, 2020
335968a
Merge branch 'master' into chore/merge-master
Apr 20, 2020
e076b61
fix: planner rewrite rules take a context
Apr 20, 2020
34bcbd8
Merge pull request #17807 from influxdata/chore/merge-master
Apr 20, 2020
63fd365
Merge branch 'master' into chore/merge-master-into-algo-w
Apr 22, 2020
54ac783
refactor(query): move ReadWindowAggregateSpec to query package
Apr 22, 2020
c6ffeac
test: skip flux end to end test for writing null values
Apr 22, 2020
ff2a2cc
chore: update flux to latest algo-w revision
Apr 23, 2020
d60b877
Merge branch 'master' into chore/merge-master-into-algo-w
Apr 23, 2020
77fb838
Merge pull request #17848 from influxdata/chore/merge-master-into-algo-w
Apr 23, 2020
3079d2a
Merge branch 'master' into chore/merge-master-into-algo-w
Apr 24, 2020
3f4368d
chore: update flux to latest algo-w revision
Apr 24, 2020
40398b7
Merge pull request #17867 from influxdata/chore/merge-master-into-algo-w
Apr 24, 2020
f5cf024
chore: update flux to latest algo-w revision
Apr 28, 2020
457637e
Merge branch 'master' into chore/merge-master
Apr 28, 2020
d6b4728
Merge pull request #17884 from influxdata/chore/merge-master
Apr 28, 2020
6d885c7
Merge branch 'master' into chore/update-flux
Apr 29, 2020
f2478a1
chore: update flux to latest revision
Apr 29, 2020
4884f71
Merge pull request #17904 from influxdata/chore/update-flux
Apr 29, 2020
2ef9999
Merge branch 'master' into chore/merge-master
Apr 30, 2020
1db0fcc
Merge pull request #17920 from influxdata/chore/merge-master
Apr 30, 2020
83b90c3
Merge branch 'master' into chore/merge-master
Apr 30, 2020
4fba49f
Merge pull request #17925 from influxdata/chore/merge-master
Apr 30, 2020
22c7d09
feat: frontend consumption of feature flags (#17926)
drdelambre Apr 30, 2020
2b58cbf
fix: safari rendering bug in checks (#17929)
drdelambre May 1, 2020
aac3a4e
fix: show buckets in alerting (#17930)
drdelambre May 1, 2020
89e3436
feat(pkger): add the ability to remove a stack and all its associated…
jsteenb2 May 1, 2020
28aa464
chore(httpc): refactor inputs to eliminate required path
jsteenb2 May 2, 2020
ffddeb4
fix(zero_value_checks): checks can now have a value set to 0 (#17933)
asalem1 May 4, 2020
1d027cf
feat(pkger): enforce metadata.name dns name compliance
jsteenb2 May 3, 2020
8bb5065
refactor: unify WindowAggregateCapability (#17901)
ethanyzhang May 5, 2020
e341a51
fix: fmt
ethanyzhang May 5, 2020
7c1b735
Merge branch 'master' into chore/update-flux
May 5, 2020
02c8e02
chore: update flux to latest revision
May 5, 2020
2faac6a
Merge pull request #17962 from influxdata/chore/update-flux
May 5, 2020
7cb599c
feat(storage): convert ResultSet to table stream for aggregate window
May 4, 2020
e51a2b8
feat: added PushDownWindowAggregate planner rewrite rule (#17898)
adrian-thurston May 6, 2020
6dd385d
chore: update flux to latest revision
May 6, 2020
4aec0dc
Merge branch 'master' into chore/update-flux
May 6, 2020
c380ff6
Merge pull request #17974 from influxdata/chore/update-flux
May 6, 2020
f3320c7
Merge branch 'master' into chore/update-flux
May 7, 2020
9a1e31a
chore: update flux to latest revision
May 7, 2020
8bf2d34
Merge pull request #18002 from influxdata/chore/update-flux
May 7, 2020
7379e78
fix(storage/flux): fix a race condition in the tags cache (#17977)
jsternberg May 7, 2020
34e581c
Merge branch 'master' into chore/merge-master
May 12, 2020
20a583d
chore: update flux to latest revision
May 12, 2020
2124d84
Merge pull request #18061 from influxdata/chore/merge-master
May 12, 2020
09b8e40
fix(query/stdlib): update to function to use the refactored row funct…
jsternberg May 13, 2020
b43f79b
fix: need to rebuild the query request before the second e2e test run…
adrian-thurston May 15, 2020
342c3c2
fix(query): do not free the column reader in the no content encoders …
jsternberg May 15, 2020
0c8d19e
Merge branch 'master' into feat/use-algo-w
jsternberg May 18, 2020
b776cf4
feat: added PushDownGroupAggregate planner rewrite rule (#18085)
adrian-thurston May 20, 2020
0bb847b
Merge branch 'master' into feat/use-algo-w
jsternberg May 21, 2020
53165bf
feat(query): add a planner rule to push down bare aggregates (#18144)
May 22, 2020
7db9f4c
Merge branch 'master' into chore/merge-master
May 26, 2020
de1b1b3
Merge pull request #18216 from influxdata/chore/merge-master
May 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: added PushDownGroupAggregate planner rewrite rule (#18085)
Added a (disabled) planner rule that matches:
   ReadGroupPhys -> { count }

It uses the same physical spec node for group to implement the aggregate. The
rule requires:
 * the pushDownGroupAggregateCount feature flag enabled
 * no existing aggregate present in the ReadGroup
 * use of the "_value" column only
  • Loading branch information
adrian-thurston authored May 20, 2020
commit b776cf46f52057b7b4ff1249ad73f388d67c9574
6 changes: 6 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@
default: false
contact: Lyon Hill
expose: true

- name: Push Down Group Aggregate Count
description: Enable the count variant of PushDownGroupAggregate planner rule
key: pushDownGroupAggregateCount
default: false
contact: Query Team
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 58 additions & 5 deletions query/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func init() {
// For this rule to take effect the appropriate capabilities must be
// added AND feature flags must be enabled.
// PushDownWindowAggregateRule{},

// For this rule to take effect the corresponding feature flags must be
// enabled.
// PushDownGroupAggregateRule{},
)
}

Expand Down Expand Up @@ -690,7 +694,7 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}

minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec)
if minSpec.Column != "_value" {
if minSpec.Column != execute.DefaultValueColLabel {
return pn, false, nil
}
case universe.MaxKind:
Expand All @@ -699,7 +703,7 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}

maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec)
if maxSpec.Column != "_value" {
if maxSpec.Column != execute.DefaultValueColLabel {
return pn, false, nil
}
case universe.MeanKind:
Expand All @@ -708,7 +712,7 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}

meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec)
if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != "_value" {
if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
}
case universe.CountKind:
Expand All @@ -717,7 +721,7 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}

countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec)
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != "_value" {
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
}
case universe.SumKind:
Expand All @@ -726,7 +730,7 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}

sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec)
if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != "_value" {
if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
}
}
Expand Down Expand Up @@ -764,3 +768,52 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
WindowEvery: window.Every.Nanoseconds(),
}), true, nil
}

//
// Push Down of group aggregates.
// ReadGroupPhys |> { count }
//
type PushDownGroupAggregateRule struct{}

func (PushDownGroupAggregateRule) Name() string {
return "PushDownGroupAggregateRule"
}

func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern {
return plan.OneOf(
[]plan.ProcedureKind{
universe.CountKind,
},
plan.Pat(ReadGroupPhysKind))
}

func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
// Check the aggregate function spec. Require operation on _value.
var aggregateMethod string
fnNode := pn
switch fnNode.Kind() {
case universe.CountKind:
if !feature.PushDownGroupAggregateCount().Enabled(ctx) {
return pn, false, nil
}

countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec)
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
}
aggregateMethod = universe.CountKind
}

groupNode := fnNode.Predecessors()[0]
groupSpec := groupNode.ProcedureSpec().(*ReadGroupPhysSpec)

// Group spec must not already have an aggregate method.
if len(groupSpec.AggregateMethod) > 0 {
return pn, false, nil
}

// Rule passes.
rewrite := *groupSpec.Copy().(*ReadGroupPhysSpec)
rewrite.AggregateMethod = aggregateMethod
return plan.CreatePhysicalNode("ReadGroup", &rewrite), true, nil
}
216 changes: 215 additions & 1 deletion query/stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {

tests := make([]plantest.RuleTestCase, 0)

// construct a simple plan with a specific window
// construct a simple plan with a specific window and aggregate function
simplePlanWithWindowAgg := func( window universe.WindowProcedureSpec, agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
Expand Down Expand Up @@ -1568,3 +1568,217 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
})
}
}

//
// Group Aggregate Testing
//
func TestPushDownGroupAggregateRule(t *testing.T) {
// Turn on all flags
flagger := mock.NewFlagger(map[feature.Flag] interface{}{
feature.PushDownGroupAggregateCount(): true,
})

ctx, _ := feature.Annotate(context.Background(), flagger)

readGroupAgg := func(aggregateMethod string) *influxdb.ReadGroupPhysSpec{
return &influxdb.ReadGroupPhysSpec{
ReadRangePhysSpec: influxdb.ReadRangePhysSpec {
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
},
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement", "tag0", "tag1"},
AggregateMethod: aggregateMethod,
}
}
readGroup := func() *influxdb.ReadGroupPhysSpec {
return readGroupAgg("")
}

tests := make([]plantest.RuleTestCase, 0)

// construct a simple plan with a specific aggregate
simplePlanWithAgg := func( agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode(agg, spec),
},
Edges: [][2]int{
{0, 1},
},
}
}

// construct a simple result
simpleResult := func( proc string ) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadGroup", readGroupAgg(proc) ),
},
}
}

minProcedureSpec := func() *universe.MinProcedureSpec {
return &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column: "_value"},
}
}
countProcedureSpec := func() *universe.CountProcedureSpec {
return &universe.CountProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
}
}
sumProcedureSpec := func() *universe.SumProcedureSpec {
return &universe.SumProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
}
}

// ReadGroup -> count => ReadGroup
tests = append(tests, plantest.RuleTestCase{
Context: ctx,
Name: "SimplePassCount",
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg( "count", countProcedureSpec() ),
After: simpleResult( "count" ),
})

// Rewrite with successors
// ReadGroup -> count -> sum {2} => ReadGroup -> count {2}
tests = append(tests, plantest.RuleTestCase{
Context: ctx,
Name: "WithSuccessor1",
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{1, 3},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count") ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{0, 2},
},
},
})

// Cannot replace a ReadGroup that already has an aggregate. This exercises
// the check that ReadGroup aggregate is not set.
// ReadGroup -> count -> count => ReadGroup -> count
tests = append(tests, plantest.RuleTestCase{
Context: ctx,
Name: "WithSuccessor2",
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count") ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
},
},
})

// Bad count column
// ReadGroup -> count => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadCountCol",
Context: ctx,
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg( "count", &universe.CountProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_valmoo"}},
}),
NoChange: true,
})

// No match due to a collapsed node having a successor
// ReadGroup -> count
// \-> min
tests = append(tests, plantest.RuleTestCase{
Name: "CollapsedWithSuccessor",
Context: ctx,
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{0, 2},
},
},
NoChange: true,
})

// No pattern match
// ReadGroup -> filter -> min -> NO-CHANGE
pushableFn1 := executetest.FunctionExpression(t, `(r) => true`)

makeResolvedFilterFn := func(expr *semantic.FunctionExpression) interpreter.ResolvedFunction {
return interpreter.ResolvedFunction{
Scope: nil,
Fn: expr,
}
}
noPatternMatch1 := func() *plantest.PlanSpec{
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(pushableFn1),
}),
plan.CreateLogicalNode("count", countProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
},
}
}
tests = append(tests, plantest.RuleTestCase{
Name: "NoPatternMatch",
Context: ctx,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch1(),
NoChange: true,
})

for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}