Skip to content

Commit

Permalink
[CALCITE-6011] Add FilterWindowTransposeRule to push a Filter past a …
Browse files Browse the repository at this point in the history
…Window

Close apache#3439
  • Loading branch information
LakeShen authored and zabetak committed Oct 27, 2023
1 parent ef82a6c commit bdafeec
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 1 deletion.
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/calcite/plan/RelOptRules.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ private RelOptRules() {
CoreRules.EXCHANGE_REMOVE_CONSTANT_KEYS,
CoreRules.SORT_EXCHANGE_REMOVE_CONSTANT_KEYS,
CoreRules.SAMPLE_TO_FILTER,
CoreRules.FILTER_SAMPLE_TRANSPOSE);
CoreRules.FILTER_SAMPLE_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE);

static final List<RelOptRule> ABSTRACT_RULES =
ImmutableList.of(CoreRules.AGGREGATE_ANY_PULL_UP_CONSTANTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ private CoreRules() {}
public static final FilterSetOpTransposeRule FILTER_SET_OP_TRANSPOSE =
FilterSetOpTransposeRule.Config.DEFAULT.toRule();

/** Rule that pushes a {@link Filter} past a {@link org.apache.calcite.rel.core.Window}. */
public static final FilterWindowTransposeRule FILTER_WINDOW_TRANSPOSE =
FilterWindowTransposeRule.Config.DEFAULT.toRule();

/** Rule that reduces constants inside a {@link LogicalFilter}.
*
* @see #JOIN_REDUCE_EXPRESSIONS
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;

import com.google.common.collect.ImmutableList;

import org.immutables.value.Value;

import java.util.ArrayList;
import java.util.List;

/**
* Planner rule that pushes a {@link org.apache.calcite.rel.core.Filter}
* past a {@link org.apache.calcite.rel.core.Window}.
*
* <p> If {@code Filter} condition used columns belongs {@code Window} partition keys,
* then we could push the condition past the {@code Window}.
*
* <p> For example:
* <blockquote><pre>{@code
* LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
* LogicalFilter(condition=[>($1, 0)])
* LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
* LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
* LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
* }</pre></blockquote>
*
* <p> will convert to:
* <blockquote><pre>{@code
* LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
* LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
* LogicalFilter(condition=[>($0, 0)])
* LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
* }</pre></blockquote>
*
* @see CoreRules#FILTER_PROJECT_TRANSPOSE
* @see CoreRules#PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW
* @see CoreRules#FILTER_WINDOW_TRANSPOSE
*/
@Value.Enclosing
public class FilterWindowTransposeRule
extends RelRule<FilterWindowTransposeRule.Config>
implements TransformationRule {

protected FilterWindowTransposeRule(final Config config) {
super(config);
}

@Override public void onMatch(final RelOptRuleCall call) {
final Filter filterRel = call.rel(0);
final Window windowRel = call.rel(1);

// Get the window all groups
List<Window.Group> windowGroups = windowRel.groups;

// The window may have multi groups,now we could only
// deal one group case,so that we could know the partition keys.
if (windowGroups.size() != 1) {
return;
}

final List<RexNode> conditions =
RelOptUtil.conjunctions(filterRel.getCondition());
// The conditions which could be pushed to past window
final List<RexNode> pushedConditions = new ArrayList<>();
final List<RexNode> remainingConditions = new ArrayList<>();
final Window.Group group = windowGroups.get(0);
// Get the window partition keys
final ImmutableBitSet partitionKeys = group.keys;

for (RexNode condition : conditions) {
// Find the condition used columns
ImmutableBitSet rCols = RelOptUtil.InputFinder.bits(condition);
// If the window partition columns contains the condition used columns,
// then we could push the condition to past window.
if (partitionKeys.contains(rCols)) {
pushedConditions.add(condition);
} else {
remainingConditions.add(condition);
}
}

final RelBuilder builder = call.builder();
// Use the pushed conditions to create a new filter above the window's input.
RelNode rel = builder.push(windowRel.getInput()).filter(pushedConditions).build();
if (rel == windowRel.getInput(0)) {
return;
}
rel = windowRel.copy(windowRel.getTraitSet(), ImmutableList.of(rel));
rel = builder.push(rel).filter(remainingConditions).build();
call.transformTo(rel);
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {

Config DEFAULT = ImmutableFilterWindowTransposeRule.Config.of()
.withOperandSupplier(b0 ->
b0.operand(Filter.class).oneInput(b1 ->
b1.operand(Window.class).anyInputs()));

@Override default FilterWindowTransposeRule toRule() {
return new FilterWindowTransposeRule(this);
}
}
}
61 changes: 61 additions & 0 deletions core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,21 @@ private static boolean skipItem(RexNode expr) {
.checkUnchanged();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testNotPushFilterPastWindowWhenPredicateNotOnPartitionKey() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by NAME) from dept) t\n"
+ "where DEPTNO = 0";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

@Test void testAddRedundantSemiJoinRule() {
final String sql = "select 1 from emp inner join dept on emp.deptno = dept.deptno";
sql(sql).withRule(CoreRules.JOIN_ADD_REDUNDANT_SEMI_JOIN).check();
Expand Down Expand Up @@ -951,6 +966,52 @@ private void checkJoinToMultiJoinDoesNotMatchSemiOrAntiJoin(JoinRelType type) {
sql(sql).withRule(CoreRules.FILTER_AGGREGATE_TRANSPOSE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithOnePartitionColumn() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by DEPTNO) from dept)\n"
+ "where DEPTNO > 0";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithTwoPartitionColumns() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) from dept)\n"
+ "where DEPTNO > 0";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-6011">[CALCITE-6011]
* Add the planner rule that pushes the Filter past a Window</a>. */
@Test void testPushFilterPastWindowWithDoubleWindows() {
final String sql = "select * from\n"
+ "(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) as cnt,\n"
+ "sum(1) over (partition by DEPTNO) as all_sum from dept) t\n"
+ "where DEPTNO = 1";
sql(sql)
.withPreRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW)
.withRule(
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_WINDOW_TRANSPOSE,
CoreRules.PROJECT_REMOVE).check();
}

private RelOptFixture basePushFilterPastAggWithGroupingSets() {
return sql("${sql}")
.withPreRule(CoreRules.PROJECT_MERGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7118,6 +7118,30 @@ LogicalProject(EXPR$0=[1])
LogicalJoin(condition=[AND(=($7, $9), IS NOT NULL($1))], joinType=[inner])
LogicalTableScan(table=[[CATALOG, SALES, EMP]])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testNotPushFilterPastWindowWhenPredicateNotOnPartitionKey">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by NAME) from dept) t
where DEPTNO = 0]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
LogicalFilter(condition=[=($1, 0)])
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {1} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalFilter(condition=[=($0, 0)])
LogicalWindow(window#0=[window(partition {1} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -9907,6 +9931,79 @@ LogicalProject(NAME=[$1])
LogicalFilter(condition=[>($0, 10)])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithDoubleWindows">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) as cnt,
sum(1) over (partition by DEPTNO) as all_sum from dept) t
where DEPTNO = 1]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], CNT=[$2], ALL_SUM=[$3])
LogicalFilter(condition=[=($1, 1)])
LogicalProject(NAME=[$1], DEPTNO=[$0], CNT=[$2], ALL_SUM=[$3])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])], window#1=[window(partition {0} aggs [SUM($2)])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], CNT=[$2], ALL_SUM=[$3])
LogicalFilter(condition=[=($0, 1)])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])], window#1=[window(partition {0} aggs [SUM($2)])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithOnePartitionColumn">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by DEPTNO) from dept)
where DEPTNO > 0]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
LogicalFilter(condition=[>($1, 0)])
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0} aggs [COUNT()])])
LogicalFilter(condition=[>($0, 0)])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
<TestCase name="testPushFilterPastWindowWithTwoPartitionColumns">
<Resource name="sql">
<![CDATA[select * from
(select NAME, DEPTNO, count(*) over (partition by NAME, DEPTNO) from dept)
where DEPTNO > 0]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(NAME=[$0], DEPTNO=[$1], EXPR$2=[$2])
LogicalFilter(condition=[>($1, 0)])
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalProject(NAME=[$1], DEPTNO=[$0], EXPR$2=[$2])
LogicalWindow(window#0=[window(partition {0, 1} aggs [COUNT()])])
LogicalFilter(condition=[>($0, 0)])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
]]>
</Resource>
</TestCase>
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/resources/sql/winagg.iq
Original file line number Diff line number Diff line change
Expand Up @@ -798,5 +798,46 @@ from emp;
+--------+-------+-------+
(9 rows)

!ok

# [CALCITE-6011] Add the planner rule that pushes the Filter past a Window
# Get the initial result which not push filter past window.
select gender, count(*) over(partition by gender order by ename) as count1 from emp;
+--------+--------+
| GENDER | COUNT1 |
+--------+--------+
| F | 1 |
| F | 2 |
| F | 3 |
| F | 4 |
| F | 5 |
| F | 6 |
| M | 1 |
| M | 2 |
| M | 3 |
+--------+--------+
(9 rows)

!ok

# Get the plan and result which push filter past window
select gender, count(*) over(partition by gender order by ename) as count1 from emp where gender = 'F';
EnumerableCalc(expr#0..3=[{inputs}], GENDER=[$t2], $1=[$t3])
EnumerableWindow(window#0=[window(partition {2} order by [0] aggs [COUNT()])])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=['F'], expr#4=[=($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
EnumerableValues(tuples=[[{ 'Jane', 10, 'F' }, { 'Bob', 10, 'M' }, { 'Eric', 20, 'M' }, { 'Susan', 30, 'F' }, { 'Alice', 30, 'F' }, { 'Adam', 50, 'M' }, { 'Eve', 50, 'F' }, { 'Grace', 60, 'F' }, { 'Wilma', null, 'F' }]])
!plan
+--------+--------+
| GENDER | COUNT1 |
+--------+--------+
| F | 1 |
| F | 2 |
| F | 3 |
| F | 4 |
| F | 5 |
| F | 6 |
+--------+--------+
(6 rows)

!ok
# End winagg.iq

0 comments on commit bdafeec

Please sign in to comment.