Skip to content

Commit

Permalink
feat: add feature flag for fill() (#18445)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanyzhang authored Jun 10, 2020
1 parent 07c01cf commit 82185dd
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 2 deletions.
7 changes: 7 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,10 @@
default: false
contact: Ariel Salem / Monitoring Team
lifetime: temporary

- name: Memory Optimized Fill
description: Enable the memory optimized fill()
key: memoryOptimizedFill
default: false
contact: Query Team
lifetime: temporary
4 changes: 2 additions & 2 deletions kit/feature/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
// contact: My Name
// ```
//
// My flag type is inferred to be boolean by my defaulf of `false` when I run
// My flag type is inferred to be boolean by my default of `false` when I run
// `make flags` and the `feature` package now includes `func MyFeature() BoolFlag`.
//
// I use this to control my backend code with
Expand Down Expand Up @@ -69,7 +69,7 @@
// ```
//
// ```
// influxd --feature-flags flag1:value1,flag2:value2
// influxd --feature-flags flag1=value1,flag2=value2
// ```
//
package feature
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions query/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
PushDownWindowAggregateByTimeRule{},
PushDownBareAggregateRule{},
PushDownGroupAggregateRule{},
SwitchFillImplRule{},
)
}

Expand Down Expand Up @@ -1010,3 +1011,24 @@ func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool {
}
return false
}

type SwitchFillImplRule struct{}

func (SwitchFillImplRule) Name() string {
return "SwitchFillImplRule"
}

func (SwitchFillImplRule) Pattern() plan.Pattern {
return plan.Pat(universe.FillKind, plan.Any())
}

func (r SwitchFillImplRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
if !feature.MemoryOptimizedFill().Enabled(ctx) {
spec := pn.ProcedureSpec().Copy()
universe.UseDeprecatedImpl(spec)
if err := pn.ReplaceSpec(spec); err != nil {
return nil, false, err
}
}
return pn, false, nil
}
74 changes: 74 additions & 0 deletions query/stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,80 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
}

func TestSwitchFillImplRule(t *testing.T) {
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.MemoryOptimizedFill(): true,
})
withFlagger, _ := feature.Annotate(context.Background(), flagger)
readRange := &influxdb.ReadRangePhysSpec{
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
sourceSpec := &universe.DualImplProcedureSpec{
ProcedureSpec: &universe.FillProcedureSpec{
DefaultCost: plan.DefaultCost{},
Column: "_value",
Value: values.NewFloat(0),
UsePrevious: false,
},
UseDeprecated: false,
}
targetSpec := sourceSpec.Copy().(*universe.DualImplProcedureSpec)
universe.UseDeprecatedImpl(targetSpec)

testcases := []plantest.RuleTestCase{
{
Context: withFlagger,
Name: "enable memory optimized fill",
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("fill", sourceSpec),
},
Edges: [][2]int{
{0, 1},
},
},
NoChange: true,
},
{
Context: context.Background(),
Name: "disable memory optimized fill",
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("fill", sourceSpec),
},
Edges: [][2]int{
{0, 1},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("fill", targetSpec),
},
Edges: [][2]int{
{0, 1},
},
},
},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}

func TestPushDownBareAggregateRule(t *testing.T) {
// Turn on support for window aggregate count
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
Expand Down

0 comments on commit 82185dd

Please sign in to comment.