[multistage] Enable runInBroker / useBrokerPruning by Default + Misc. Improvements#16204
Conversation
| return Boolean.parseBoolean(option); | ||
| } | ||
| } | ||
| if (aggRel.getAggCallList().isEmpty()) { |
There was a problem hiding this comment.
Note for Reviewers: For queries such as WITH tmp AS (SELECT DISTINCT col1, col2 FROM tbl LIMIT 1000) SELECT ..., at present we were not trimming groups by default which would have meant that the group-by could have grown really huge.
For such queries, we can always leverage group-trimming, and this change enables that.
.../java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
Show resolved
Hide resolved
.../java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
Outdated
Show resolved
Hide resolved
| }, | ||
| { | ||
| "description": "Example of query that avoids exchanges for aggregates", | ||
| "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR with teamOne as (select col2, percentile(col3, 50) as sum_of_runs from a group by col2), teamTwo as (select col2, percentile(col3, 50) as sum_of_runs from a group by col2), all as (select col2, sum_of_runs from teamOne union all select col2, sum_of_runs from teamTwo) select col2, percentile(sum_of_runs, 50) from all group by col2", |
There was a problem hiding this comment.
Query taken from AggregatePlans.json:
pinot/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
Lines 138 to 159 in 472c53d
The plan with v2 optimizer is better in the multiple ways:
- We automatically can detect that the group-by column,
col2, is the partitioning column. So the leaf stages execute the aggregate directly instead of splitting them. - We are automatically able to skip the exchange required for the final aggregation above the union.
In other words, what the existing optimizer is not able to achieve even with hints, we can achieve without any hints.
| "\n PhysicalSort(fetch=[100])", | ||
| "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL], limit=[100])", | ||
| "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1, 2]])", | ||
| "\n PhysicalAggregate(group=[{0, 1, 7}], aggType=[LEAF], limit=[100])", |
There was a problem hiding this comment.
Shows effectiveness of auto-enabling group-trim for group-by queries with no agg calls.
| @@ -90,6 +93,13 @@ public static RelCollation apply(RelCollation relCollation, PinotDistMapping map | |||
| return RelCollations.of(newFieldCollations); | |||
There was a problem hiding this comment.
have a question. Is it intentionally to select the newFieldIndices.get(0)?
public static RelCollation apply(RelCollation relCollation, PinotDistMapping mapping) {
if (relCollation.getKeys().isEmpty()) {
return relCollation;
}
List<RelFieldCollation> newFieldCollations = new ArrayList<>();
for (RelFieldCollation fieldCollation : relCollation.getFieldCollations()) {
List<Integer> newFieldIndices = mapping.getTargets(fieldCollation.getFieldIndex());
if (CollectionUtils.isEmpty(newFieldIndices)) {
break;
}
newFieldCollations.add(fieldCollation.withFieldIndex(newFieldIndices.get(0)));
}
return RelCollations.of(newFieldCollations);
There was a problem hiding this comment.
Yeah that was by design because I wanted to keep it simple at the time. For context, this can occur in scenarios such as follows (which are rare or unlikely):
Project(col1=$0, col2=$0)
Sort(collation=[order by $0 desc])
TableScan(col1)
In this case, the project could be said to be ordered by both col1 and col2, but for now I am only preserving one of the indices to keep things simple. But I think it should be okay to add all field indexes too. I can take that up in a follow-up.
pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #16204 +/- ##
============================================
+ Coverage 62.90% 63.22% +0.32%
+ Complexity 1386 1366 -20
============================================
Files 2867 2953 +86
Lines 163354 170388 +7034
Branches 24952 26068 +1116
============================================
+ Hits 102755 107728 +4973
- Misses 52847 54502 +1655
- Partials 7752 8158 +406
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
runInBrokeranduseBrokerPruningby default. Both of these are only applicable whenusePhysicalOptimizeris set andrunInBrokeris only applicable whenuseLiteModeis set too.PinotDataDistribution.apply(PinotDistMapping). Currently we were always dropping collation, but now we take in a boolean to allow callers to choose whether they want to drop collation or not.Test Plan
Added more unit-tests. We also have E2E query tests that run 100s of queries and matches results between general V2 engine and the physical optimized queries.