Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.calcite.rel.rules;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.calcite.adapter.enumerable.EnumerableRules;
Expand All @@ -32,14 +33,11 @@ private PinotQueryRuleSets() {
// do not instantiate.
}

public static final Collection<RelOptRule> LOGICAL_OPT_RULES =
public static final Collection<RelOptRule> BASIC_RULES =
Arrays.asList(EnumerableRules.ENUMERABLE_FILTER_RULE, EnumerableRules.ENUMERABLE_JOIN_RULE,
EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_WINDOW_RULE,
EnumerableRules.ENUMERABLE_SORT_RULE, EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,

// ------------------------------------------------------------------
// Calcite core rules

// push a filter into a join
CoreRules.FILTER_INTO_JOIN,
// push filter through an aggregation
Expand Down Expand Up @@ -90,29 +88,39 @@ private PinotQueryRuleSets() {
PinotReduceAggregateFunctionsRule.INSTANCE,
CoreRules.AGGREGATE_REDUCE_FUNCTIONS,

// remove unnecessary sort rule
CoreRules.SORT_REMOVE,

// prune empty results rules
PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE, PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE,

// ------------------------------------------------------------------
// Pinot specific rules
// ------------------------------------------------------------------

// ---- rules apply before exchange insertion.
PinotFilterExpandSearchRule.INSTANCE,

// ---- rules that insert exchange.
// add an extra exchange for sort
PinotSortExchangeNodeInsertRule.INSTANCE,
// copy exchanges down, this must be done after SortExchangeNodeInsertRule
PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,

PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.INSTANCE,
PinotWindowExchangeNodeInsertRule.INSTANCE
);
// Expand all SEARCH nodes to simplified filter nodes. SEARCH nodes get created for queries with range
// predicates, in-clauses, etc.
PinotFilterExpandSearchRule.INSTANCE
);

// Filter pushdown rules run using a RuleCollection since we want to push down a filter as much as possible in a
// single HepInstruction.
public static final Collection<RelOptRule> FILTER_PUSHDOWN_RULES = ImmutableList.of(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: by default Calcite will use depth first order for running these rules. Also it won't do a "fullRestartAfterTransformation" unless we use HepMatchOrder.TOP_DOWN or HepMatchOrder.BOTTOM_UP.

I think using depth first order without doing full restarts after transformation should be fine but would be good if someone else also chimes in. Note that the match order can be changed for only this collection (it's a HepInstruction) so it doesn't need to be a global setting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now we should be good. HEP planner is used here to avoid a lengthy volcano planner that adds latency to the planning phase. as long as the plan results are determinisitc we can always change the way we configure the planner IMO

CoreRules.FILTER_INTO_JOIN,
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
CoreRules.FILTER_SET_OP_TRANSPOSE,
CoreRules.FILTER_PROJECT_TRANSPOSE
);

// The pruner rules run top-down to ensure Calcite restarts from root node after applying a transformation.
public static final Collection<RelOptRule> PRUNE_RULES = ImmutableList.of(
CoreRules.PROJECT_MERGE,
CoreRules.AGGREGATE_REMOVE,
CoreRules.SORT_REMOVE,
PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE, PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE
);

// Pinot specific rules that should be run after all other rules
public static final Collection<RelOptRule> PINOT_POST_RULES = ImmutableList.of(
// add an extra exchange for sort
PinotSortExchangeNodeInsertRule.INSTANCE,
// copy exchanges down, this must be done after SortExchangeNodeInsertRule
PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,

PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.INSTANCE,
PinotWindowExchangeNodeInsertRule.INSTANCE
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.PinotCalciteCatalogReader;
Expand Down Expand Up @@ -79,7 +79,6 @@ public class QueryEnvironment {
private final HepProgram _hepProgram;

// Pinot extensions
private final Collection<RelOptRule> _logicalRuleSet;
private final WorkerManager _workerManager;
private final TableCache _tableCache;

Expand Down Expand Up @@ -110,12 +109,24 @@ public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, Worke
.addRelBuilderConfigTransform(c -> c.withAggregateUnique(true)))
.build();

// optimizer rules
_logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES;

// optimizer
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
for (RelOptRule relOptRule : _logicalRuleSet) {
// Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
// best to be explicit.
hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);
// First run the basic rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
// the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
for (RelOptRule relOptRule : PinotQueryRuleSets.BASIC_RULES) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
// Pushdown filters using a single HepInstruction.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);

// Prune duplicate/unnecessary nodes using a single HepInstruction.
// TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PRUNE_RULES);

// Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
hepProgramBuilder.addRuleInstance(relOptRule);
}
_hepProgram = hepProgramBuilder.build();
Expand Down
60 changes: 28 additions & 32 deletions pinot-query-planner/src/test/resources/queries/AggregatePlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
"sql": "EXPLAIN PLAN FOR SELECT AVG(a.col4) as avg FROM a WHERE a.col3 >= 0 AND a.col2 = 'pink floyd'",
"output": [
"Execution Plan",
"\nLogicalProject(avg=[/($0, $1)])",
"\n LogicalProject($f0=[CASE(=($1, 0), null:DECIMAL(1000, 0), $0)], $f1=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT()])",
"\n LogicalProject(col4=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\nLogicalProject(avg=[/(CASE(=($1, 0), null:DECIMAL(1000, 0), $0), $1)])",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder what made this 2 project merge possible after the rule changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because I am now doing the pruning/merging of redundant operators in the end and using a RuleCollection. (if that's what you meant)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. gotcha. this is great!

"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT()])",
"\n LogicalProject(col4=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand All @@ -22,14 +21,13 @@
"sql": "EXPLAIN PLAN FOR SELECT AVG(a.col4) as avg, SUM(a.col4) as sum, MAX(a.col4) as max FROM a WHERE a.col3 >= 0 AND a.col2 = 'pink floyd'",
"output": [
"Execution Plan",
"\nLogicalProject(avg=[/($0, $1)], sum=[CASE(=($1, 0), null:DECIMAL(1000, 0), $2)], max=[$3])",
"\n LogicalProject($f0=[CASE(=($1, 0), null:DECIMAL(1000, 0), $0)], $f1=[$1], sum=[$0], max=[$2])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)], max=[MAX($2)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT()], max=[MAX($0)])",
"\n LogicalProject(col4=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\nLogicalProject(avg=[/(CASE(=($1, 0), null:DECIMAL(1000, 0), $0), $1)], sum=[CASE(=($1, 0), null:DECIMAL(1000, 0), $0)], max=[$2])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)], max=[MAX($2)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[COUNT()], max=[MAX($0)])",
"\n LogicalProject(col4=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand All @@ -38,14 +36,13 @@
"sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) as avg, COUNT(*) as count FROM a WHERE a.col3 >= 0 AND a.col2 = 'pink floyd'",
"output": [
"Execution Plan",
"\nLogicalProject(avg=[/(CAST($0):DOUBLE, $1)], count=[$1])",
"\n LogicalProject($f0=[CASE(=($1, 0), null:INTEGER, $0)], $f1=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\nLogicalProject(avg=[/(CAST(CASE(=($1, 0), null:INTEGER, $0)):DOUBLE, $1)], count=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand Down Expand Up @@ -97,14 +94,13 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ skipLeafStageGroupByAggregation */ AVG(a.col3) as avg, COUNT(*) as count FROM a WHERE a.col3 >= 0 AND a.col2 = 'pink floyd'",
"output": [
"Execution Plan",
"\nLogicalProject(avg=[/(CAST($0):DOUBLE, $1)], count=[$1])",
"\n LogicalProject($f0=[CASE(=($1, 0), null:INTEGER, $0)], $f1=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\nLogicalProject(avg=[/(CAST(CASE(=($1, 0), null:INTEGER, $0)):DOUBLE, $1)], count=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand Down
Loading