Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2882,15 +2882,14 @@ public void testExplainPlanQueryV2()
assertEquals(response1, "{\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\","
+ "\"STRING\"]},\"rows\":[[\"EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS name FROM mytable "
+ "GROUP BY name ORDER BY 1\",\"Execution Plan\\n"
+ "LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])\\n"
+ "LogicalSort(sort0=[$0], dir0=[ASC])\\n"
+ " PinotLogicalSortExchange("
+ "distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])\\n"
+ " LogicalSort(sort0=[$0], dir0=[ASC])\\n"
+ " LogicalProject(count=[$1], name=[$0])\\n"
+ " LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])\\n"
+ " PinotLogicalExchange(distribution=[hash[0]])\\n"
+ " LogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n"
+ " LogicalTableScan(table=[[mytable]])\\n"
+ " LogicalProject(count=[$1], name=[$0])\\n"
+ " LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])\\n"
+ " PinotLogicalExchange(distribution=[hash[0]])\\n"
+ " LogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n"
+ " LogicalTableScan(table=[[mytable]])\\n"
+ "\"]]}");

// In the query below, FlightNum column has an inverted index and there is no data satisfying the predicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class PinotSortExchangeCopyRule extends RelRule<RelRule.Config> {

public static final PinotSortExchangeCopyRule SORT_EXCHANGE_COPY =
PinotSortExchangeCopyRule.Config.DEFAULT.toRule();
private static final int DEFAULT_SORT_EXCHANGE_COPY_THRESHOLD = 10_000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10_000 is a reasonable heuristic until we get a config/hint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth to make this configurable as a query param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably. but i haven't decided whether to use a hint or use a query param. so for now i will keep this as a default and if we need to support configuration we will do that then.

private static final TypeFactory TYPE_FACTORY = new TypeFactory(new TypeSystem());
private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
private static final RexLiteral REX_ZERO = REX_BUILDER.makeLiteral(0,
Expand Down Expand Up @@ -87,6 +88,10 @@ public void onMatch(RelOptRuleCall call) {
int total = RexExpressionUtils.getValueAsInt(sort.fetch) + RexExpressionUtils.getValueAsInt(sort.offset);
fetch = REX_BUILDER.makeLiteral(total, TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
}
// do not transform sort-exchange copy when there's no fetch limit, or fetch amount is larger than threshold
if (fetch == null || RexExpressionUtils.getValueAsInt(fetch) > DEFAULT_SORT_EXCHANGE_COPY_THRESHOLD) {
return;
}

final RelNode newExchangeInput = sort.copy(sort.getTraitSet(), exchange.getInput(), collation, null, fetch);
final RelNode exchangeCopy = exchange.copy(exchange.getTraitSet(), newExchangeInput, exchange.getDistribution());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,7 @@ public void shouldMatchSortOnly() {

// Then:
ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture());

RelNode sortCopy = sortCopyCapture.getValue();
Assert.assertTrue(sortCopy instanceof LogicalSort);
Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);

LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
Assert.assertEquals(innerSort.getCollation(), collation);
Assert.assertNull((innerSort).offset);
Assert.assertNull((innerSort).fetch);
Mockito.verify(_call, Mockito.never()).transformTo(sortCopyCapture.capture());
}

@Test
Expand Down
38 changes: 18 additions & 20 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$6])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$6])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
},
Expand All @@ -25,17 +24,16 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, a.ts AS ts1, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$6])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n LogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$6])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
},
Expand Down
69 changes: 37 additions & 32 deletions pinot-query-planner/src/test/resources/queries/OrderByPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col1 FROM a ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0])",
"\n LogicalTableScan(table=[[a]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand All @@ -19,11 +18,10 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1 FROM a ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(value1=[$0])",
"\n LogicalTableScan(table=[[a]])",
"\n LogicalProject(value1=[$0])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand Down Expand Up @@ -51,18 +49,28 @@
"\n"
]
},
{
"description": "Select * order by on 2 columns with super large limit",
"sql": "EXPLAIN PLAN FOR SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 10000000",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC], fetch=[10000000])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
},
{
"description": "Order by and group by",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a GROUP BY a.col1 ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand All @@ -71,13 +79,12 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1, SUM(a.col3) FROM a GROUP BY a.col1 ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand All @@ -86,13 +93,12 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, SUM(a.col3) AS sum FROM a GROUP BY a.col1 ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
Expand All @@ -101,13 +107,12 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1 AS value1, SUM(a.col3) AS sum FROM a GROUP BY a.col1 ORDER BY a.col1",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
"\nLogicalSort(sort0=[$0], dir0=[ASC])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
}
Expand Down
Loading