Skip to content

Commit 25bce2f

Browse files
beyond1920godfreyhe
authored andcommitted
[FLINK-21290][table-planner] Support project push down for window TVF
This closes apache#16689
1 parent d326b05 commit 25bce2f

File tree

20 files changed

+794
-1020
lines changed

20 files changed

+794
-1020
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.logical;
20+
21+
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
22+
import org.apache.flink.table.planner.functions.sql.SqlWindowTableFunction;
23+
import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
24+
import org.apache.flink.table.planner.plan.utils.WindowUtil;
25+
import org.apache.flink.table.types.logical.LogicalType;
26+
27+
import org.apache.calcite.plan.RelOptCluster;
28+
import org.apache.calcite.plan.RelOptRule;
29+
import org.apache.calcite.plan.RelOptRuleCall;
30+
import org.apache.calcite.plan.RelOptUtil;
31+
import org.apache.calcite.rel.RelNode;
32+
import org.apache.calcite.rel.logical.LogicalProject;
33+
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
34+
import org.apache.calcite.rel.type.RelDataType;
35+
import org.apache.calcite.rex.RexCall;
36+
import org.apache.calcite.rex.RexInputRef;
37+
import org.apache.calcite.rex.RexNode;
38+
import org.apache.calcite.rex.RexShuttle;
39+
import org.apache.calcite.tools.RelBuilder;
40+
import org.apache.calcite.util.ImmutableBitSet;
41+
import org.apache.calcite.util.Pair;
42+
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.HashMap;
46+
import java.util.Iterator;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.stream.Collectors;
50+
import java.util.stream.IntStream;
51+
52+
/**
53+
* Planner rule that pushes a {@link LogicalProject} into a {@link LogicalTableFunctionScan} which
54+
* contains a Window table function call by splitting the projection into a projection on top of
55+
* child of the TableFunctionScan.
56+
*/
57+
public class ProjectWindowTableFunctionTransposeRule extends RelOptRule {
58+
59+
public static final ProjectWindowTableFunctionTransposeRule INSTANCE =
60+
new ProjectWindowTableFunctionTransposeRule();
61+
62+
public ProjectWindowTableFunctionTransposeRule() {
63+
super(
64+
operand(LogicalProject.class, operand(LogicalTableFunctionScan.class, any())),
65+
"ProjectWindowTableFunctionTransposeRule");
66+
}
67+
68+
@Override
69+
public boolean matches(RelOptRuleCall call) {
70+
LogicalTableFunctionScan scan = call.rel(1);
71+
return WindowUtil.isWindowTableFunctionCall(scan.getCall());
72+
}
73+
74+
@Override
75+
public void onMatch(RelOptRuleCall call) {
76+
LogicalProject project = call.rel(0);
77+
LogicalTableFunctionScan scan = call.rel(1);
78+
RelNode scanInput = scan.getInput(0);
79+
TimeAttributeWindowingStrategy windowingStrategy =
80+
WindowUtil.convertToWindowingStrategy(
81+
(RexCall) scan.getCall(), scanInput.getRowType());
82+
// 1. get fields to push down
83+
ImmutableBitSet projectFields = RelOptUtil.InputFinder.bits(project.getProjects(), null);
84+
int scanInputFieldCount = scanInput.getRowType().getFieldCount();
85+
ImmutableBitSet toPushFields =
86+
ImmutableBitSet.range(0, scanInputFieldCount)
87+
.intersect(projectFields)
88+
.set(windowingStrategy.getTimeAttributeIndex());
89+
if (toPushFields.cardinality() == scanInputFieldCount) {
90+
return;
91+
}
92+
93+
// 2. create new input of window table function scan
94+
RelBuilder relBuilder = call.builder();
95+
RelNode newScanInput = createInnerProject(relBuilder, scanInput, toPushFields);
96+
97+
// mapping origin field index to new field index, used to rewrite WindowTableFunction and
98+
// top project
99+
Map<Integer, Integer> mapping =
100+
getFieldMapping(
101+
scan.getRowType().getFieldCount(), scanInputFieldCount, toPushFields);
102+
103+
// 3. create new window table function scan
104+
LogicalTableFunctionScan newScan =
105+
createNewTableFunctionScan(
106+
relBuilder,
107+
scan,
108+
windowingStrategy.getTimeAttributeType(),
109+
newScanInput,
110+
mapping);
111+
112+
// 4. create top project
113+
RelNode topProject = createTopProject(relBuilder, project, newScan, mapping);
114+
call.transformTo(topProject);
115+
}
116+
117+
private Map<Integer, Integer> getFieldMapping(
118+
int scanFieldCount, int scanInputFieldCount, ImmutableBitSet toPushFields) {
119+
int toPushFieldCount = toPushFields.cardinality();
120+
Map<Integer, Integer> mapping = new HashMap<>();
121+
IntStream.range(0, scanFieldCount)
122+
.forEach(
123+
idx -> {
124+
int newPosition;
125+
if (idx < scanInputFieldCount) {
126+
newPosition = toPushFields.indexOf(idx);
127+
} else {
128+
newPosition = toPushFieldCount + idx - scanInputFieldCount;
129+
}
130+
mapping.put(idx, newPosition);
131+
});
132+
return mapping;
133+
}
134+
135+
private RelNode createInnerProject(
136+
RelBuilder relBuilder, RelNode scanInput, ImmutableBitSet toPushFields) {
137+
relBuilder.push(scanInput);
138+
List<RexInputRef> newProjects =
139+
toPushFields.toList().stream().map(relBuilder::field).collect(Collectors.toList());
140+
return relBuilder.project(newProjects).build();
141+
}
142+
143+
private LogicalTableFunctionScan createNewTableFunctionScan(
144+
RelBuilder relBuilder,
145+
LogicalTableFunctionScan oldScan,
146+
LogicalType timeAttributeType,
147+
RelNode newInput,
148+
Map<Integer, Integer> mapping) {
149+
relBuilder.push(newInput);
150+
RexNode newCall = rewriteWindowCall((RexCall) oldScan.getCall(), mapping, relBuilder);
151+
RelOptCluster cluster = oldScan.getCluster();
152+
FlinkTypeFactory typeFactory = (FlinkTypeFactory) cluster.getTypeFactory();
153+
RelDataType newScanOutputType =
154+
SqlWindowTableFunction.inferRowType(
155+
typeFactory,
156+
newInput.getRowType(),
157+
typeFactory.createFieldTypeFromLogicalType(timeAttributeType));
158+
return LogicalTableFunctionScan.create(
159+
cluster,
160+
new ArrayList<>(Collections.singleton(newInput)),
161+
newCall,
162+
oldScan.getElementType(),
163+
newScanOutputType,
164+
oldScan.getColumnMappings());
165+
}
166+
167+
private RexNode rewriteWindowCall(
168+
RexCall windowCall, Map<Integer, Integer> mapping, RelBuilder relBuilder) {
169+
List<RexNode> newOperands = new ArrayList<>();
170+
Iterator<RexNode> operandsItr = windowCall.getOperands().iterator();
171+
// Note: skip to rewrite the first operand of window table function because it is a special
172+
// ref to a table instead of a normal input ref, if process it as a regular input ref, an
173+
// exception would be thrown out. It's safe to use first operand of function because
174+
// framework never use it (or avoid to use it).
175+
newOperands.add(operandsItr.next());
176+
while (operandsItr.hasNext()) {
177+
newOperands.add(adjustInputRef(operandsItr.next(), mapping));
178+
}
179+
return relBuilder.call(windowCall.getOperator(), newOperands);
180+
}
181+
182+
private RelNode createTopProject(
183+
RelBuilder relBuilder,
184+
LogicalProject oldProject,
185+
LogicalTableFunctionScan newInput,
186+
Map<Integer, Integer> mapping) {
187+
List<Pair<RexNode, String>> newTopProjects =
188+
oldProject.getNamedProjects().stream()
189+
.map(r -> Pair.of(adjustInputRef(r.left, mapping), r.right))
190+
.collect(Collectors.toList());
191+
return relBuilder
192+
.push(newInput)
193+
.project(Pair.left(newTopProjects), Pair.right(newTopProjects))
194+
.build();
195+
}
196+
197+
private RexNode adjustInputRef(RexNode expr, Map<Integer, Integer> mapping) {
198+
return expr.accept(
199+
new RexShuttle() {
200+
201+
@Override
202+
public RexNode visitInputRef(RexInputRef inputRef) {
203+
Integer newIndex = mapping.get(inputRef.getIndex());
204+
return new RexInputRef(newIndex, inputRef.getType());
205+
}
206+
});
207+
}
208+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,9 @@ object FlinkStreamRuleSets {
215215
//removes constant keys from an Agg
216216
CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS,
217217
// push project through a Union
218-
CoreRules.PROJECT_SET_OP_TRANSPOSE
218+
CoreRules.PROJECT_SET_OP_TRANSPOSE,
219+
// push a projection to the child of a WindowTableFunctionScan
220+
ProjectWindowTableFunctionTransposeRule.INSTANCE
219221
)
220222

221223
val JOIN_REORDER_PREPARE_RULES: RuleSet = RuleSets.ofList(

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,6 @@
8686
"nullable" : true,
8787
"precision" : 3
8888
}
89-
}, {
90-
"kind" : "REX_CALL",
91-
"operator" : {
92-
"name" : "PROCTIME",
93-
"kind" : "OTHER_FUNCTION",
94-
"syntax" : "FUNCTION"
95-
},
96-
"operands" : [ ],
97-
"type" : {
98-
"timestampKind" : "PROCTIME",
99-
"typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
100-
"nullable" : false
101-
}
10289
} ],
10390
"condition" : null,
10491
"id" : 2,
@@ -125,16 +112,9 @@
125112
"precision" : 3,
126113
"kind" : "REGULAR"
127114
}
128-
}, {
129-
"proctime" : {
130-
"type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
131-
"nullable" : false,
132-
"precision" : 3,
133-
"kind" : "PROCTIME"
134-
}
135115
} ]
136116
},
137-
"description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
117+
"description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime])"
138118
}, {
139119
"class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
140120
"watermarkExpr" : {
@@ -193,13 +173,6 @@
193173
"precision" : 3,
194174
"kind" : "ROWTIME"
195175
}
196-
}, {
197-
"proctime" : {
198-
"type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
199-
"nullable" : false,
200-
"precision" : 3,
201-
"kind" : "PROCTIME"
202-
}
203176
} ]
204177
},
205178
"description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,6 @@
8686
"nullable" : true,
8787
"precision" : 3
8888
}
89-
}, {
90-
"kind" : "REX_CALL",
91-
"operator" : {
92-
"name" : "PROCTIME",
93-
"kind" : "OTHER_FUNCTION",
94-
"syntax" : "FUNCTION"
95-
},
96-
"operands" : [ ],
97-
"type" : {
98-
"timestampKind" : "PROCTIME",
99-
"typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
100-
"nullable" : false
101-
}
10289
} ],
10390
"condition" : null,
10491
"id" : 2,
@@ -125,16 +112,9 @@
125112
"precision" : 3,
126113
"kind" : "REGULAR"
127114
}
128-
}, {
129-
"proctime" : {
130-
"type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
131-
"nullable" : false,
132-
"precision" : 3,
133-
"kind" : "PROCTIME"
134-
}
135115
} ]
136116
},
137-
"description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
117+
"description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime])"
138118
}, {
139119
"class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
140120
"watermarkExpr" : {
@@ -193,13 +173,6 @@
193173
"precision" : 3,
194174
"kind" : "ROWTIME"
195175
}
196-
}, {
197-
"proctime" : {
198-
"type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
199-
"nullable" : false,
200-
"precision" : 3,
201-
"kind" : "PROCTIME"
202-
}
203176
} ]
204177
},
205178
"description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,6 @@
8686
"nullable" : true,
8787
"precision" : 3
8888
}
89-
}, {
90-
"kind" : "REX_CALL",
91-
"operator" : {
92-
"name" : "PROCTIME",
93-
"kind" : "OTHER_FUNCTION",
94-
"syntax" : "FUNCTION"
95-
},
96-
"operands" : [ ],
97-
"type" : {
98-
"timestampKind" : "PROCTIME",
99-
"typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
100-
"nullable" : false
101-
}
10289
} ],
10390
"condition" : null,
10491
"id" : 2,
@@ -125,16 +112,9 @@
125112
"precision" : 3,
126113
"kind" : "REGULAR"
127114
}
128-
}, {
129-
"proctime" : {
130-
"type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
131-
"nullable" : false,
132-
"precision" : 3,
133-
"kind" : "PROCTIME"
134-
}
135115
} ]
136116
},
137-
"description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
117+
"description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime])"
138118
}, {
139119
"class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
140120
"watermarkExpr" : {
@@ -193,13 +173,6 @@
193173
"precision" : 3,
194174
"kind" : "ROWTIME"
195175
}
196-
}, {
197-
"proctime" : {
198-
"type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
199-
"nullable" : false,
200-
"precision" : 3,
201-
"kind" : "PROCTIME"
202-
}
203176
} ]
204177
},
205178
"description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"

0 commit comments

Comments
 (0)