Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.pinot.query.planner.logical.LiteralHintUtils;


/**
Expand All @@ -47,18 +46,6 @@ private PinotHintOptions() {

public static class InternalAggregateOptions {
public static final String AGG_TYPE = "agg_type";
/**
* agg call signature is used to store LITERAL inputs to the Aggregate Call. which is not supported in Calcite
* here
* 1. we store the Map of Pair[aggCallIdx, argListIdx] to RexLiteral to indicate the RexLiteral being passed into
* the aggregateCalls[aggCallIdx].operandList[argListIdx] is supposed to be a RexLiteral.
* 2. not all RexLiteral types are supported to be part of the input constant call signature.
* 3. RexLiteral are encoded as String and decoded as Pinot Literal objects.
*
* see: {@link LiteralHintUtils}.
* see: https://issues.apache.org/jira/projects/CALCITE/issues/CALCITE-5833
*/
public static final String AGG_CALL_SIGNATURE = "agg_call_signature";
}

public static class AggregateOptions {
Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ private PinotQueryRuleSets() {
PruneEmptyRules.UNION_INSTANCE
);

// Pinot specific rules to run using a single RuleCollection since we attach aggregate info after optimizer.
public static final Collection<RelOptRule> PINOT_AGG_PROCESS_RULES = ImmutableList.of(
PinotAggregateLiteralAttachmentRule.INSTANCE
);

// Pinot specific rules that should be run AFTER all other rules
public static final Collection<RelOptRule> PINOT_POST_RULES = ImmutableList.of(
// Evaluate the Literal filter nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,6 @@ private static HepProgram getOptProgram() {
hepProgramBuilder.addRuleInstance(relOptRule);
}

// ----
// Run Pinot rule to attach aggregation auxiliary info
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PINOT_AGG_PROCESS_RULES);

// ----
// Pushdown filters using a single HepInstruction.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private static Expression compileFunctionExpression(RexExpression.FunctionCall r
}
break;
default:
functionName = functionKind.name();
functionName = canonicalizeFunctionName(functionKind.name());
break;
}
List<RexExpression> childNodes = rexCall.getFunctionOperands();
Expand Down Expand Up @@ -288,7 +288,7 @@ private static Expression compileOrExpression(RexExpression.FunctionCall orNode,

private static Expression getFunctionExpression(String canonicalName) {
Expression expression = new Expression(ExpressionType.FUNCTION);
Function function = new Function(canonicalizeFunctionName(canonicalName));
Function function = new Function(canonicalName);
expression.setFunctionCall(function);
return expression;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ private static List<RexExpression> toFunctionOperands(RexInputRef rexInputRef, S
}

public static RexExpression fromAggregateCall(AggregateCall aggregateCall) {
List<RexExpression> operands =
aggregateCall.getArgList().stream().map(RexExpression.InputRef::new).collect(Collectors.toList());
List<RexExpression> operands = new ArrayList<>(aggregateCall.rexList.size());
for (RexNode rexNode : aggregateCall.rexList) {
operands.add(fromRexNode(rexNode));
}
return new RexExpression.FunctionCall(aggregateCall.getAggregation().getKind(),
RelToPlanNodeConverter.convertToColumnDataType(aggregateCall.getType()),
aggregateCall.getAggregation().getName(), operands, aggregateCall.isDistinct());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public void testPlanQueryMultiThread()
public void testQueryWithHint() {
// Hinting the query to use final stage aggregation makes server directly return final result
// This is useful when data is already partitioned by col1
String query = "SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ col1, COUNT(*) FROM b GROUP BY col1";
String query =
"SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ col1, COUNT(*) FROM b GROUP BY col1";
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
Expand Down
18 changes: 9 additions & 9 deletions pinot-query-planner/src/test/resources/queries/GroupByPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1, SUM(a.col3) FROM a GROUP BY a.col1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
Expand All @@ -128,7 +128,7 @@
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[/(CAST($1):DOUBLE NOT NULL, $2)], EXPR$3=[$3], EXPR$4=[$4])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[COUNT()], EXPR$3=[MAX($1)], EXPR$4=[MIN($1)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()], agg#2=[MAX($1)], agg#3=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
Expand All @@ -140,7 +140,7 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, _UTF-8'a'))])",
Expand All @@ -153,7 +153,7 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1, SUM(a.col3), MAX(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], EXPR$2=[MAX($1)])",
"\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[MAX($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, _UTF-8'a'))])",
Expand All @@ -167,7 +167,7 @@
"notes": "TODO: Needs follow up. Project should only keep a.col1 since the other columns are pushed to the filter, but it currently keeps them all",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[COUNT()])",
"\nLogicalAggregate(group=[{0}], agg#0=[COUNT()])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, _UTF-8'a'))])",
Expand All @@ -181,7 +181,7 @@
"output": [
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
"\n LogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, _UTF-8'a'))])",
Expand All @@ -196,7 +196,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($1)], agg#2=[MAX($1)], agg#3=[MIN($1)])",
"\n LogicalAggregate(group=[{0}], agg#0=[COUNT()], agg#1=[$SUM0($1)], agg#2=[MAX($1)], agg#3=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, _UTF-8'a'))])",
Expand All @@ -211,7 +211,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[$1])",
"\n LogicalFilter(condition=[AND(>=($2, 0), <($3, 20), <=($1, 10), =(/(CAST($1):DOUBLE NOT NULL, $4), 5))])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[MAX($1)], agg#2=[MIN($1)], agg#3=[COUNT()])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[MAX($1)], agg#2=[MIN($1)], agg#3=[COUNT()])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, _UTF-8'a'))])",
Expand All @@ -226,7 +226,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$0], count=[$1], SUM=[$2])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], count=[COUNT()], SUM=[$SUM0($1)], agg#2=[MAX($1)], agg#3=[MIN($1)])",
"\n LogicalAggregate(group=[{0}], agg#0=[COUNT()], agg#1=[$SUM0($1)], agg#2=[MAX($1)], agg#3=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, _UTF-8'a'))])",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
Expand Down Expand Up @@ -121,7 +121,7 @@
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
Expand Down
Loading