-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use streaming aggregation for a correlated scalar subquery #10731
Use streaming aggregation for a correlated scalar subquery #10731
Conversation
5a70194
to
c40a872
Compare
c40a872
to
bcdbd1f
Compare
@@ -45,6 +45,7 @@ | |||
private final PlanNode source; | |||
private final Map<Symbol, Aggregation> aggregations; | |||
private final List<List<Symbol>> groupingSets; | |||
private final List<Symbol> preGroupedSymbols; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that. This is clever and more generic. I was just thinking about another type of aggregation AggregationNode.Step
, e.g: STREAMING
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this like a trait but inside a plan node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sort of. It's the same approach we took for WindowNode (prePartitionedInputs, preSortedOrderPrefix)
@@ -295,6 +311,20 @@ public HashAggregationOperator( | |||
this.groupByTypes = ImmutableList.copyOf(groupByTypes); | |||
this.groupByChannels = ImmutableList.copyOf(groupByChannels); | |||
this.globalAggregationGroupIds = ImmutableList.copyOf(globalAggregationGroupIds); | |||
this.preGroupedChannels = ImmutableList.copyOf(preGroupedChannels); | |||
ImmutableList.Builder<Type> builder = ImmutableList.builder(); | |||
for (Integer channel : preGroupedChannels) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract to a static method
this.preGroupedChannels = ImmutableList.copyOf(preGroupedChannels); | ||
ImmutableList.Builder<Type> builder = ImmutableList.builder(); | ||
for (Integer channel : preGroupedChannels) { | ||
boolean found = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
found
is unused
@@ -455,7 +528,7 @@ public Page getOutput() | |||
} | |||
|
|||
// only flush if we are finishing or the aggregation builder is full | |||
if (!finishing && (aggregationBuilder == null || !aggregationBuilder.isFull())) { | |||
if (!finishing && (aggregationBuilder == null || !aggregationBuilder.isFull()) && pendingInput == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will work poorly if there is a lot of small groups (e.g: we will flush small pages and constantly resetAggregationBuilder
).
Additional condition here should be that aggregationBuilder
accumulated enough groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not as you would allocate aggregation builders per group. It is probably quite expensive as those are complicated objects
unfinishedWork = aggregationBuilder.processPage(page.getRegion(0, groupEnd)); | ||
pendingInput = page.getRegion(groupEnd, page.getPositionCount() - groupEnd); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if groups always span entire page, then you will never flush aggregates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Good catch. Thank you, Karol.
@@ -56,6 +59,7 @@ | |||
private final List<Type> groupByTypes; | |||
private final List<Integer> groupByChannels; | |||
private final List<Integer> globalAggregationGroupIds; | |||
private final List<Integer> preGroupedChannels; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is clever but it makes me wonder if we will use this feature of HashAggregationOperator
?
I was rather thinking of a dedicated streaming aggregation operator that would allow us to avoid hash lookups, hash computations and so on (e.g: similar as AggregationOperator
). Such dedicated streaming aggregation operator should be more performant in correlated subqueries context than extended HashAggregationOperator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 @martint Karol, I'm seeing that we currently use AggregationOperator
for global aggregations (set of grouping keys is empty) and HashAggregationOperator
for all other aggregations. I'm also seeing that correlated subqueries generate aggregations with grouping keys made of unique
and all the probe columns. Such aggregations prevent pruning of probe columns and require unnecessary hashing. Alternatively, sub-queries could be using aggregations on just the unique
and apply arbitrary
grouping function to the probe columns. E.g.
select name, (select max(name) from region where regionkey = nation.regionkey) from nation;
could be re-written to
- Aggregation [unique] -> arbitrary(nation.name), max(region.name)
- LeftJoin (region.regionkey = nation.regionkey)
- AssignUniqueId
- TableScan(nation)
- TableScan(region)
This way, the planner would see that aggregation input is grouped on all grouping columns and hashing is unnecessary and be able to plan streaming aggregation without hashing. Otherwise, if the planner only sees that aggregation input is grouped on a subset of grouping columns, it can't automatically decide whether simple aggregation or hash aggregation is better. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, sub-queries could be using aggregations on just the unique and apply arbitrary grouping function to the probe columns. E.g.
I think such construct will confuse stats computation. We could improve TransformCorrelatedScalarAggregationToJoin
though to not include unused symbols in top aggregation. I think for that we would need to extend LateralJoinNode
to include explicit list of outputSymbols
as JoinNode
do.
Nota that pruning passes through LateralJoinNode
but correlated (but unused later on) symbols won't be pruned from probe side and will take part in aggregation.
This way, the planner would see that aggregation input is grouped on all grouping columns and hashing is unnecessary and be able to plan streaming aggregation without hashing
I'm confused. I think AssignUniqueId
makes all columns grouped (not only uniqueId
column). See: https://github.com/prestodb/presto/pull/10731/files/1b9e5c11be55febd887fc80bd1a6fbc68d08da26#diff-64cfd4c0cfe165b7dcc4934f126200e5
Optional.empty()), | ||
child.getProperties()); | ||
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange)); | ||
} | ||
|
||
return planAndEnforceChildren(node, requiredProperties, requiredProperties); | ||
StreamPreferredProperties childRequirements = parentPreferences |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please move this to a separate rule, similar rule (computes local stream properties): https://github.com/starburstdata/presto/blob/3ec61ddcba8ece1cca5497799b5ec3b46e1177f2/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddAdaptiveExchangeBelowPartialAggregation.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way it makes it harder to migrate AddLocalExchanges
to IterativeOptimizer
. Maybe we should push harder towards traits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kokosing Grzegorz, what do you refer to by traits
? Is there an example I could check out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Traits is a concept of properties but in the world of IterativeOptimizer
or Memo
(similar to stats or cost), where you store some information (property) about group reference (relation) in the memo. That way you don't need to recompute it per each rule invocation and also you could possibly use it rule pattern saying something like "this rule will trigger when node is aggregation and its source is already pre aggregated by columns a,b,c).
There is some initial (POC) PR about this: #9691, but it requires changes to match the CBO.
|
||
ImmutableList.Builder<LocalProperty<Symbol>> localPropertiesBuilder = ImmutableList.builder(); | ||
localPropertiesBuilder.addAll(properties.getLocalProperties()); | ||
localPropertiesBuilder.add(new GroupingProperty<>(ImmutableSet.of(node.getIdColumn()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make stronger statement here that we are grouped on all columns, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Karol, I think you are correct. Thanks for pointing it out. It might be the final piece I was missing.
We can say that the data is grouped on {unique + subset-of-source-symbols}
for all subsets of the source symbols. This property will survive the join and will allow streaming aggregation without any hashing (as you suggested earlier). I'll implement StreamingAggregationOperator
similar to AggregationOperator
and drop changes to HashAggregationOperator
.
e17e174
to
30af132
Compare
@sopel39 Karol, I implemented a dedicated |
{ | ||
ImmutableList.Builder<Aggregator> builder = ImmutableList.builder(); | ||
for (AccumulatorFactory factory : accumulatorFactories) { | ||
builder.add(new Aggregator(factory, step)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support partial streaming aggregations now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 I don't have a use case for partial streaming aggregation yet and I'm not sure how I would implement it given the concerns you raised on the changes to HashAggregationOperator
. E.g. we'd need a reusable/resettable HashAggregationBuilder
, right? What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Karol, I might have misunderstood your question. You are asking about Step#PARTIAL
, right? I was thinking about the case when data stream is grouped on a subset of grouping keys.
StreamingAggregationOperator
should work for Step#PARTIAL
aggregations just fine. Is there anything special that needs to be taken care of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbasmanova I was just wondering if we plan to support partial streaming aggregation at this moment. Both partial streaming aggregation and streaming hash aggregation seem like a great features, but I wonder if there are practical use cases for them at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support partial streaming aggregations now?
It occurred to me that such imprecise sentence might be understood with negative bias which I did not intend.
private List<Aggregator> aggregates; | ||
private final PageBuilder pageBuilder; | ||
private Page pendingInput; | ||
private Page previousPage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of whole page we should just store single row from group (seems easier to grasp)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Is where an easy way to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think similar to what you do but just get region of size 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Isn't this what previousPage = page.getRegion(page.getPositionCount() - 1, 1);
does already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. I would just change field name then
|
||
private void updateMemoryUsage() | ||
{ | ||
long memorySize = pageBuilder.getRetainedSizeInBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should account form previous page too
requireNonNull(page, "page is null"); | ||
|
||
if (previousPage != null) { | ||
if (!rowEqualsRow(0, previousPage, 0, page)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is implicit assumption that the previous page contains only rows from single group. Maybe consider keeping just a single row from group (my other comment)
} | ||
} | ||
|
||
private void processInput(Page page) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems rather complex flow. What would you about something like (pseudo code):
for (row in page) {
if (previousGroupRow != null && !previousGroupRow.equals(row))
evaluateAggregatesToPageBuilder();
resetAggregators();
previousGroupRow = null;
}
addRowToAggregates();
if (previousGroupRow == null) {
// first row from group
previousGroupRow = row
}
}
and
void evaluateAggregatesToPageBuilder() {
if (pageBuilder.isFull()) {
outputPages.add(pageBuiler.build());
pageBuilder.reset();
}
...
}
The idea is to store produced output pages in order to simplify processing of input pages.
You wouldn't then have to check
if (pageBuilder.getPositionCount() >= minOutputPageRowCount || pageBuilder.getRetainedSizeInBytes() >= minOutputPageSize.toBytes()) {
because you could write needsInput
as:
boolean needsInput() {
return !finishing && outputPages.isEmpty()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this snipped I used row by row iteration, but that is not how Accumulator
work. We should operate on ranges, but principals stay the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Karol, thanks for the suggestion. I used some of the ideas to simplify the logic, but couldn't get it to quite the shape you asked for. Any chance you could take another look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic in this class (and this method in particular) would be easier to follow if instead of tracking "previousRow" you just kept track of "currentGroup" -- you only need to compare against that, ever.
Then, this method might be rewritten as follows (beware, I haven't tested it or made sure the logic is 100% correct):
private void processInput(Page page)
{
requireNonNull(page, "page is null");
if (currentGroup != null) {
if (!rowEqualsRow(0, page, 0, currentGroup)) {
// page starts with new group, so flush it
evaluateAggregatesToPageBuilder(currentGroup, 0);
currentGroup = page.getRegion(0, 1);
if (pageBuilder.isFull()) {
return;
}
}
}
int startPosition = 0;
while (startPosition < page.getPositionCount()) {
// may be equal to page.getPositionCount() if the end is not found in this page
int nextGroupStart = findNextGroupStart(startPosition, page);
addRowsToAggregates(page, startPosition, nextGroupStart - 1);
if (nextGroupStart < page.getPositionCount()) {
// current group stops somewhere in the middle of the page, so flush it
evaluateAggregatesToPageBuilder(page, startPosition);
currentGroup = page.getRegion(nextGroupStart, 1);
if (pageBuilder.isFull()) {
pendingInput = page.getRegion(nextGroupStart, page.getPositionCount() - nextGroupStart + 1);
return;
}
}
startPosition = nextGroupStart;
}
}
with findNextGroupStart as:
private int findNextGroupStart(int startPosition, Page page)
{
for (int i = startPosition; i < page.getPositionCount(); i++) {
if (!rowEqualsRow(i, page, 0, currentGroup)) {
return i;
}
}
return page.getPositionCount();
}
@@ -2339,6 +2340,10 @@ private PhysicalOperation planGroupByAggregation( | |||
} | |||
} | |||
|
|||
boolean streaming = !node.getPreGroupedSymbols().isEmpty() | |||
&& node.getGroupingSets().size() == 1 | |||
&& node.getPreGroupedSymbols().equals(Iterables.getOnlyElement(node.getGroupingSets())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably should compare sets here instead of lists.
private boolean canSkipHashGeneration(AggregationNode node) | ||
{ | ||
if (!node.getPreGroupedSymbols().isEmpty() && | ||
node.getGroupingSets().size() == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add AggregationNode#isStreamingAggregation
method. Use it in local execution planner too
@@ -156,6 +156,7 @@ private AggregationNode replaceAggregationSource( | |||
source, | |||
aggregation.getAggregations(), | |||
ImmutableList.of(groupingSet), | |||
ImmutableList.of(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you shouldn't create non-streaming partial aggregation from streaming aggregation. This will hurt performance and might be incorrect too (partial aggregations might break grouping since they can flush in a an undeterministic way, e.g: mid-group).
@@ -215,6 +215,7 @@ private PlanNode split(AggregationNode node, Context context) | |||
node.getSource(), | |||
intermediateAggregation, | |||
node.getGroupingSets(), | |||
ImmutableList.of(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you shouldn't create non-streaming partial aggregation for streaming aggregation. This will hurt performance and might be incorrect too (partial aggregations might break grouping since they can flush in a an undeterministic way).
{ | ||
if (!node.getPreGroupedSymbols().isEmpty() && | ||
node.getGroupingSets().size() == 1 | ||
&& node.getPreGroupedSymbols().equals(Iterables.getOnlyElement(node.getGroupingSets()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should also check that the only grouping set is non-empty.
3b4e101
to
18019de
Compare
Optional: You could capture pattern:
and then rewrite aggregation to:
this way we would reduce row comparison overhead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this ready for final pass?
// (1) result.getHashSymbols() contains all requiredHashes; | ||
// (2) result.getHashSymbols() is a superset of requiredHashes combined with preferredHashes (no pruning is needed) | ||
Set<HashComputation> resultHashes = result.getHashSymbols().keySet(); | ||
preferenceSatisfied = resultHashes.containsAll(requiredHashes.getHashes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was the issue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For plans where Join and RemoteExchange are separated by AssignUniqueId, this logic prevented the hash on join keys produced by RemoteExchange to be reused by the Join.
This plan (created by the new PushUpAssignUniqueIdThroughRemoteExchange
rule)
- Join
- AssignUniqueId
- RemoteExchange
was changed to
- Join
- Project: (add hash on probe join key)
- AssignUniqueId
- Project: (drop hash on probe join key)
- RemoteExchange
E.g.
This query
SELECT name, (SELECT max(name) FROM region WHERE regionkey = nation.regionkey AND length(name) > length(nation.name)) FROM nation
was planned as
- Aggregate[<name, regionkey, unique>] => [name:varchar(25), regionkey:bigint, unique:bigint, max:varchar(25)]
max := "max"("name_1")
- LeftJoin[("regionkey" = "regionkey_0") AND ("length"("name_1") > "length"("name"))][$hashvalue_17, $hashvalue_18] => [name:varchar(25), regionkey:bigint, unique:bigint, name_1:varchar(25)]
Distribution: PARTITIONED
- Project[] => [name:varchar(25), regionkey:bigint, unique:bigint, $hashvalue_17:bigint]
$hashvalue_17 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0))
- AssignUniqueId => [name:varchar(25), regionkey:bigint, unique:bigint]
- Project[] => [name:varchar(25), regionkey:bigint]
- RemoteExchange[REPARTITION][$hashvalue] => name:varchar(25), regionkey:bigint, $hashvalue:bigint
- ScanProject[table = local:tpch:nation:sf0.01, originalConstraint = true] => [name:varchar(25), regionkey:bigint, $hashvalue_16:bigint]
instead of
- Aggregate[<name, regionkey, unique>] => [name:varchar(25), regionkey:bigint, unique:bigint, max:varchar(25)]
max := "max"("name_1")
- LeftJoin[("regionkey" = "regionkey_0") AND ("length"("name_1") > "length"("name"))][$hashvalue, $hashvalue_17] => [name:varchar(25), regionkey:bigint, unique:bigint, name_1:varchar(25)]
Distribution: PARTITIONED
- AssignUniqueId => [name:varchar(25), regionkey:bigint, $hashvalue:bigint, unique:bigint]
- RemoteExchange[REPARTITION][$hashvalue] => name:varchar(25), regionkey:bigint, $hashvalue:bigint
- ScanProject[table = local:tpch:nation:sf0.01, originalConstraint = true] => [name:varchar(25), regionkey:bigint, $hashvalue_16:bigint]
// (2) result.getHashSymbols() is a superset of requiredHashes combined with preferredHashes (no pruning is needed) | ||
Set<HashComputation> resultHashes = result.getHashSymbols().keySet(); | ||
preferenceSatisfied = resultHashes.containsAll(requiredHashes.getHashes()) | ||
&& ImmutableSet.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract union set to a separate variable
2c9560f
to
d7be4f0
Compare
Yes, I think so. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
@@ -45,6 +45,7 @@ | |||
private final PlanNode source; | |||
private final Map<Symbol, Aggregation> aggregations; | |||
private final List<List<Symbol>> groupingSets; | |||
private final List<Symbol> preGroupedSymbols; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this like a trait but inside a plan node?
@@ -185,7 +198,7 @@ public boolean hasOrderings() | |||
@Override | |||
public PlanNode replaceChildren(List<PlanNode> newChildren) | |||
{ | |||
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, step, hashSymbol, groupIdSymbol); | |||
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, preGroupedSymbols, step, hashSymbol, groupIdSymbol); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[just thinking] If understand it correctly, then it is hard to say if newChildren
is still grouped on preGroupedSymbols
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rule should take care not to produce invalid plan.
Maybe we should validate aggregations at the end of planning like we validate default aggregations with: com.facebook.presto.sql.planner.sanity.ValidateAggregationsWithDefaultValues
?
@@ -222,6 +235,16 @@ public boolean hasSingleNodeExecutionPreference(FunctionRegistry functionRegistr | |||
return (hasEmptyGroupingSet() && !hasNonEmptyGroupingSet()) || (hasDefaultOutput() && !isDecomposable(functionRegistry)); | |||
} | |||
|
|||
public boolean isStreamable() | |||
{ | |||
if (!preGroupedSymbols.isEmpty() && groupingSets.size() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you need to check that aggregations.isEmpty()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kokosing I might be missing something, but aggregations
provide the computation to perform on each group (sum, count, arbitrary). We don't want these to be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I misunderstood what streamable aggregations mean. Thanks.
Optional.empty()), | ||
child.getProperties()); | ||
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange)); | ||
} | ||
|
||
return planAndEnforceChildren(node, requiredProperties, requiredProperties); | ||
StreamPreferredProperties childRequirements = parentPreferences |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way it makes it harder to migrate AddLocalExchanges
to IterativeOptimizer
. Maybe we should push harder towards traits.
PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements); | ||
|
||
List<Symbol> preGroupedSymbols; | ||
if (!groupingKeys.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
groupingKeys
is an union of all symbols in groupingSets
. As I understand currently there is no point to find plan with preGroupedSymbols
matching groupingKeys
if there are multiple entries in groupingSets
. Maybe you should collect this only when groupingSets.size() == 1
?
@Test | ||
public void testStreamingAggregationForCorrelatedSubquery() | ||
{ | ||
assertPlanWithSession("SELECT name, (SELECT max(name) FROM region WHERE regionkey = nation.regionkey AND length(name) > length(nation.name)) FROM nation", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please put each argument in separate line like:
method(
arg1,
arg2,
...,
lastArg);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kokosing I believe I'm doing this except in cases where it consumes too much vertical space without adding any value. I think this is consistent with the rest of the code in this file. Is there a specific change you'd like me to make? Could you point it out?
@@ -670,7 +670,16 @@ private PlanWithProperties planAndEnforce( | |||
|
|||
boolean preferenceSatisfied; | |||
if (pruneExtraHashSymbols) { | |||
preferenceSatisfied = result.getHashSymbols().keySet().equals(requiredHashes.getHashes()); | |||
// Make sure that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't be this separated to different commit? Can you add a test for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// (1) result has all required hashes | ||
// (2) any extra hashes are preferred hashes (e.g. no pruning is needed) | ||
Set<HashComputation> resultHashes = result.getHashSymbols().keySet(); | ||
ImmutableSet<HashComputation> requiredAndPreferredHashes = ImmutableSet.<HashComputation>builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
declare it as Set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reviewed up to: Optimize aggregation in a correlated scalar subquery
@@ -185,7 +198,7 @@ public boolean hasOrderings() | |||
@Override | |||
public PlanNode replaceChildren(List<PlanNode> newChildren) | |||
{ | |||
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, step, hashSymbol, groupIdSymbol); | |||
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, preGroupedSymbols, step, hashSymbol, groupIdSymbol); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rule should take care not to produce invalid plan.
Maybe we should validate aggregations at the end of planning like we validate default aggregations with: com.facebook.presto.sql.planner.sanity.ValidateAggregationsWithDefaultValues
?
{ | ||
if (!preGroupedSymbols.isEmpty() && groupingSets.size() == 1) { | ||
List<Symbol> groupingSet = Iterables.getOnlyElement(groupingSets); | ||
return !groupingSet.isEmpty() && Sets.newHashSet(preGroupedSymbols).equals(Sets.newHashSet(groupingSet)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to check !groupingSet.isEmpty()
(is is already implied by !preGroupedSymbols.isEmpty()
).
Maybe use ImmutableSet.copyOf
instead of Sets.newHashSet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 I'm moving these checks to AddLocalExchange
and adding a sanity check to the AggregationNode
constructor. This should address this comment and Rule should take care not to produce invalid plan
above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validator makes sure that other rules didn't break input properties of streaming aggregation.
{ | ||
if (!preGroupedSymbols.isEmpty() && groupingSets.size() == 1) { | ||
List<Symbol> groupingSet = Iterables.getOnlyElement(groupingSets); | ||
return !groupingSet.isEmpty() && Sets.newHashSet(preGroupedSymbols).equals(Sets.newHashSet(groupingSet)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: aggregation is also streamable when groupingSet
contains preGroupedSymbols
(but not equals), but we just don't support it yet (via HashAggregationOperator
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 That's right
if (!groupingKeys.isEmpty()) { | ||
List<LocalProperty<Symbol>> desiredProperties = ImmutableList.of(new GroupingProperty<>(groupingKeys)); | ||
Iterator<Optional<LocalProperty<Symbol>>> matchIterator = LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).iterator(); | ||
Optional<LocalProperty<Symbol>> groupingRequirement = matchIterator.next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use getOnlyElement
|
||
List<Symbol> preGroupedSymbols; | ||
if (!groupingKeys.isEmpty()) { | ||
List<LocalProperty<Symbol>> desiredProperties = ImmutableList.of(new GroupingProperty<>(groupingKeys)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract this logic to a separate method
return leftProperties | ||
.withUnspecifiedPartitioning() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: I would extract it to separate commit and maybe add a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Before I spend too much time building out testing infrastructure, could you tell if there is an existing test I could use as an example. I don't see any unit tests for StreamPropertyDerivations
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I improved assign unique id node properties I added test like: TestLogicalPlanner#testUsesDistributedJoinIfNaturallyPartitionedOnProbeSymbols
(I've synthesized a query for which improvements made a difference).
If it is not possible to find such query for your changes maybe they are unused? We could also skip the test for now if it's really hard to find proper test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 In isolation, StreamPropertyDerivations
changes for LEFT join are not needed. However, they are necessary to propagate newly generated properties of AssignUniqueId
. Without LEFT join changes, newly added test TestLogicalPlanner#testStreamingAggregationForCorrelatedSubquery
fails.
I can move these changes into a separate commit, but I don't know how to test them without testing changes to AssignUniqueId
as well.
@@ -431,7 +429,14 @@ public StreamProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<St | |||
@Override | |||
public StreamProperties visitAssignUniqueId(AssignUniqueId node, List<StreamProperties> inputProperties) | |||
{ | |||
return Iterables.getOnlyElement(inputProperties); | |||
StreamProperties properties = Iterables.getOnlyElement(inputProperties); | |||
if (properties.getPartitioningColumns().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment, e.g:
// preserve input (possibly preferred) partitioning
node(ValuesNode.class))) | ||
.withAlias("NON_NULL", expression("true"))))))))), | ||
plan -> assertEquals(countFinalAggregationNodes(plan), extraAggregation ? 2 : 1)); | ||
aggregation(ImmutableMap.of("COUNT", functionCall("count", ImmutableList.of("NON_NULL"))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: we could explicitly test aggregation step here.
anyTree(tableScan("nation", ImmutableMap.of("n_name", "name", "n_regionkey", "regionkey"))))), | ||
anyTree(tableScan("region", ImmutableMap.of("r_name", "name"))))))); | ||
|
||
assertPlanWithSession("SELECT name, (SELECT max(name) FROM region WHERE regionkey > nation.regionkey) FROM nation", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment what is the difference from previous test
this.getQueryRunner().getDefaultSession(), | ||
false, | ||
anyTree( | ||
aggregation(ImmutableList.of(ImmutableList.of("n_name", "n_regionkey", "unique")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please break lines after each argument.
d7be4f0
to
161ad36
Compare
857bc70
to
04957e2
Compare
04957e2
to
c1c42cf
Compare
@@ -613,6 +633,20 @@ private PlanWithProperties deriveProperties(PlanNode result, List<StreamProperti | |||
} | |||
} | |||
|
|||
private List<Symbol> computePreGroupedSymbols(List<List<Symbol>> groupingSets, List<Symbol> groupingKeys, PlanWithProperties childWithProperties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this method is misleading. It doesn't really compute pre-grouped columns. Rather, it validates that the provided list is pre-grouped (it's an all-or-nothing).
I suspect this is so that we can extend it in the future to return a subset, but it makes reasoning about the code harder. My preference would be to replace it with an implementation that is more precise about the current intent. In fact, you can probably just replace this whole thing with this in the caller:
List<Symbol> preGroupedSymbols = ImmutableList.of();
if (LocalProperties.match(childWithProperties.getProperties().getLocalProperties(), new GroupingProperty<>(groupingKeys)).isPresent()) {
preGroupedSymbols = groupingKeys;
}
At a minimum, there should be a comment in the method explaining why it's abstracted that way (but IMO, that's premature over-abstraction)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint I like the proposed code, but... it doesn't looks that nice after I fix compilation issues. Here is what I get:
List<Symbol> preGroupedSymbols = ImmutableList.of();
if (!Iterables.getOnlyElement(LocalProperties.match(child.getProperties().getLocalProperties(), ImmutableList.of(new GroupingProperty<>(groupingKeys)))).isPresent()) {
preGroupedSymbols = groupingKeys;
}
Iterables.getOnlyElement
and ImmutableList.of
make it a lot more verbose and having "! match" is counterintuitive.
I can make it less verbose by adding match
override to LocalProperties
:
public static <T> Optional<LocalProperty<T>> match(List<LocalProperty<T>> actuals, LocalProperty<T> desired) {
return Iterables.getOnlyElement(match(actuals, ImmutableList.of(desired)));
}
but that brings !
and match
closer to each other so it actually reads as "if no match, then ..."
if (!LocalProperties.match(child.getProperties().getLocalProperties(), new GroupingProperty<>(groupingKeys)).isPresent()) {
preGroupedSymbols = groupingKeys;
}
Would renaming this method to extractPreGrouppedSymbols
work better? Or do you have any other suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... yeah, that doesn't read that nicely.
If we're going to keep the method I'd suggest something like:
List<Symbol> preGroupedSymbols = ImmutableList.of();
if (isGroupedOn(groupingKeys, child.getProperties().getLocalProperties())) {
preGroupedSymbols = groupingKeys;
}
and
private boolean isGroupedOn(List<Symbol> columns, List<LocalProperty<Symbol>> properties)
{
// !isPresent() indicates the property was satisfied completely
return !LocalProperties.match(properties, LocalProperties.grouped(columns)).get(0).isPresent();
}
Although, you could just do that inline and have a comment to clarify the behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint I could do that, but...just to clarify, are you implying that I should add support for multiple grouping sets ? (the name and implementation of isGroupOn
suggest that) One of the challenges in doing that would be to find queries for testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just saying that from the point of view of the AggregationNode, one or many grouping sets doesn't matter. The computation of the grouping sets is handled by GroupIdNode, which produces an additional synthetic column the aggregation has to group on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint I looked into how multiple grouping sets are implemented. Like you mentioned, we use an auxiliary GroupIdNode
that
...for every input row, emits rows for each
grouping set specified. The row generated will contain NULLs in the
channels corresponding to the columns present in the union of all
grouping columns but not present in the current grouping set.
I couldn't find where and how the rows with different values in group_id
column are redirected to the right aggregators. But, assuming this happens somehow, the implementation of GroupIdOperator
doesn't seem to preserve any local properties on the grouping keys and therefore doesn't allow for streaming aggregations.
Given that the presence of GroupIdNode
prevents streaming aggregation, what would be the benefit of updating the StreamingAggregationOperator
to handle multiple grouping sets and a group_id
column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not saying we should add support for multiple grouping sets to the operator, but from the point of view of deriving properties, we shouldn't have to special case any knowledge about multiple grouping sets. "pregrouped symbols" should just indicate that certain symbols are pre-grouped. The decision of whether to use streaming should be made by the the rule that turns an aggregation that has pregrouped group-by columns + a single grouping set into a streaming agg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint I can move groupingSets.size() == 1 && !Iterables.getOnlyElement(groupingSets).isEmpty()
check to AggregationNode#isStreamable
and inline isGroupedOn
's logic you suggested here. Would that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that would be perfect.
private List<Symbol> computePreGroupedSymbols(List<List<Symbol>> groupingSets, List<Symbol> groupingKeys, PlanWithProperties childWithProperties) | ||
{ | ||
// TODO Add streaming aggregation support for multiple grouping sets | ||
if (groupingSets.size() == 1 && !Iterables.getOnlyElement(groupingSets).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the special casing for 1 vs many grouping sets? From the point of view of the aggregation there should be no difference in how they execute. There's just an extra column containing the group id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint Primarily because I don't have a use case for multiple grouping sets. I didn't want to add non-trivial functionality that I can't test.
@@ -383,6 +383,16 @@ public ActualProperties visitJoin(JoinNode node, List<ActualProperties> inputPro | |||
constants.putAll(probeProperties.getConstants()); | |||
constants.putAll(buildProperties.getConstants()); | |||
|
|||
if (node.isCrossJoin()) { | |||
// Cross join preserves only constants from probe and build sides. | |||
// Cross join doesn't preserve sorting or grouping local properties on either side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's interesting. It used to preserve sorting/grouping on the left side, but I guess that broke when we implemented the optimized execution strategy that generates RLE blocks for the smaller page on either side. It also means that the NestedLoopsJoin operator is now misnamed, since it doesn't behave like a NLJ anymore -- it's now truly a CrossJoinOperator.
@@ -106,6 +106,7 @@ public String toString() | |||
{ | |||
return toStringHelper(this) | |||
.omitNullValues() | |||
.add("type", joinType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martint Yes and no. I added this while debugging test failures. Do you want me to put it into a separate commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That'd be ideal. The way I think about these kinds of changes is "if I had to revert the main change due to a bug, do I also need to lose this?"
@@ -183,13 +183,23 @@ public ActualProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<Ac | |||
public ActualProperties visitAssignUniqueId(AssignUniqueId node, List<ActualProperties> inputProperties) | |||
{ | |||
ActualProperties properties = Iterables.getOnlyElement(inputProperties); | |||
|
|||
ImmutableList.Builder<LocalProperty<Symbol>> localPropertiesBuilder = ImmutableList.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name this "newProperties" or "newLocalProperties" ? It makes the intent clearer. Also the "builder" part in the name is irrelevant (sometimes you have to do it to deconflict with other variables, but this is not such a case)
{ | ||
AssignUniqueId assignUniqueId = captures.get(ASSIGN_UNIQUE_ID); | ||
PartitioningScheme partitioningScheme = node.getPartitioningScheme(); | ||
if (partitioningScheme.getPartitioning().getColumns().contains(assignUniqueId.getIdColumn())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining that the partitioning being done by the remote exchange depends on the results of the id column added by AssignUniqueId, so we can't move it above.
@@ -183,13 +183,23 @@ public ActualProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<Ac | |||
public ActualProperties visitAssignUniqueId(AssignUniqueId node, List<ActualProperties> inputProperties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit message is still off. It mentions HashGenerationOptimizer, but that seems to be changed elsewhere.
} | ||
} | ||
|
||
private static Page rearrangePage(Page page, int[] channels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this "extractColumns" ? rearrangePage is a bit too vague and doesn't convey what's the purpose of the method
return new Page(page.getPositionCount(), newBlocks); | ||
} | ||
|
||
private void addRowsToAggregates(Page page, int startPosition, int endPosition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be clearer if endPosition were inclusive
d634be7
to
7f74701
Compare
@martint addressed/replied to comments |
7f74701
to
3333ab1
Compare
@martint comments addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but take a look at the test failures.
@@ -56,6 +56,11 @@ | |||
|
|||
private static boolean isSupportedAggregationNode(AggregationNode aggregationNode) | |||
{ | |||
// Don't split streaming aggregations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it part of rule pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 Isn't it already so?
private static final Pattern<AggregationNode> PATTERN = aggregation()
.matching(PushPartialAggregationThroughJoin::isSupportedAggregationNode)
.with(source().matching(join().capturedAs(JOIN_NODE)));
@@ -255,7 +255,7 @@ public PlanWithProperties visitAggregation(AggregationNode node, StreamPreferred | |||
return planAndEnforceChildren(node, singleStream(), defaultParallelism(session)); | |||
} | |||
|
|||
StreamPreferredProperties requiredProperties = parentPreferences.withDefaultParallelism(session).withPartitioning(node.getGroupingKeys()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit title is too long.
{ | ||
// "orders" table is naturally grouped on orderkey | ||
// this grouping should survive an inner join and allow for streaming aggregation later | ||
// this grouping should not survive a cross join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move 364 comment to d288aee#diff-42e96520332fe6de8a8ff605e9f8db12R380
// "orders" table is naturally grouped on orderkey | ||
// this grouping should survive an inner join and allow for streaming aggregation later | ||
// this grouping should not survive a cross join | ||
assertPlan("SELECT o.orderkey, count(*) FROM orders o, lineitem l WHERE o.orderkey=l.orderkey GROUP BY 1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need GROUP BY 1
? How can you project unaggregated o.orderkey
in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 GROUP BY 1
here is equivalent to GROUP BY o.orderkey
if (properties.getNodePartitioning().isPresent()) { | ||
// preserve input (possibly preferred) partitioning | ||
return properties; | ||
return ActualProperties.builderFrom(properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it explicit:
ActualProperties.builder()
// preserve input (possibly preferred) partitioning
.global(properties)
.constants(properties.getConstants())
.local(newLocalProperties.build())
.build()
} | ||
|
||
if (step.isOutputPartial()) { | ||
systemMemoryContext.setBytes(memorySize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line is never reached in current code.
this.groupByTypes = ImmutableList.copyOf(requireNonNull(groupByTypes, "groupByTypes is null")); | ||
this.groupByChannels = Ints.toArray(requireNonNull(groupByChannels, "groupByChannels is null")); | ||
this.accumulatorFactories = requireNonNull(accumulatorFactories, "accumulatorFactories is null"); | ||
this.step = requireNonNull(step, "step is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check state that step is FINAL
?
if (nextGroupStart < page.getPositionCount()) { | ||
// current group stops somewhere in the middle of the page, so flush it | ||
evaluateAndFlushGroup(page, startPosition); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
while (true) { | ||
// may be equal to page.getPositionCount() if the end is not found in this page | ||
int nextGroupStart = findNextGroupStart(startPosition, groupByPage); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
@@ -92,6 +93,11 @@ public final void destroyPlanTest() | |||
queryRunner = null; | |||
} | |||
|
|||
public ConnectorId getCurrentConnectorId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use it in com.facebook.presto.sql.planner.sanity.TestValidateAggregationsWithDefaultValues#setup
too
3333ab1
to
ebe3291
Compare
Also, fixed dormant bug in local property derivations for cross joins.
Scalar subqueries are often rewritten as follows: - Aggregation(unique, a, b) - LeftJoin - AssignUniqueId(unique) - probe (a, b) - build In this plan, aggregation input is grouped on all grouping keys and allows for single-step streaming execution. This commit updates PropertyDerivations and StreamPropertyDerivations to derive partitioned_on(unique) global property and {grouped(unique), constant(a), constant(b)} local properties for aggregation input. These changes allow AddLocalExchange to plan streaming aggregations. Specific changes are: - Update PropertyDerivations to - set {grouped(unique), constant(a), constant(b)} local properties for the output of AssignUniqueId node; - Update StreamPropertyDerivations to - set partitioned_on(unique) for the output of AssignUniqueId node;
Append (STREAMING) to Aggregate node when printing query plan. For example, EXPLAIN (TYPE DISTRIBUTED) SELECT (SELECT count(*) FROM region r2 WHERE r2.regionkey > r1.regionkey) FROM region r1; - Aggregate(STREAMING)[regionkey, unique] => [regionkey:bigint, unique:bigint, count:bigint] count := "count"("non_null") - LeftJoin[("regionkey_0" > "regionkey")] => [regionkey:bigint, unique:bigint, non_null:boolean] Distribution: REPLICATED SortExpression["regionkey_0"] - AssignUniqueId => [regionkey:bigint, unique:bigint] - TableScan[tpch:tpch:region:sf0.01, originalConstraint = true] => [regionkey:bigint]
Scalar subqueries are often rewritten as follows: - Aggregation(unique, a, b) - LeftJoin - RemoteExchange (REPARTITION) - AssignUniqueId(unique) - probe (a, b) - RemoteExchange (REPARTITION) - build In these plans, streaming aggregation is not applied because grouped(unique, a, b) local property generated by the AssignUniqueId node can't reach the Aggregation node as it gets dropped when passing through the RemoteExchange node. This commit adds an optimizer rule to push up AssignUniqueId node through the RemoteExchange to allow for streaming aggregation. It also updates HashGenerationOptimizer to avoid generating unnecessary hashes for streaming aggregations. The modified plan looks like this: - Aggregation(unique, a, b) - LeftJoin - AssignUniqueId(unique) - RemoteExchange (REPARTITION) - probe (a, b) - RemoteExchange (REPARTITION) - build
StreamingAggregationOperator applies only when aggregation source is already grouped on the grouping keys, e.g. aggregations over joins generated for the scalar subqueries. In these cases, StreamingAggregationOperator is more efficient than HashAggregationOperator as it uses less CPU and a lot less memory. Add a benchmark to compare StreamingAggregationOperator and HashAggregationOperator for pre-grouped data sources. (operatorType) (rowsPerGroup) Mode Cnt Score Error Units streaming 1 avgt 30 379.203 ± 9.431 ms/op streaming 10 avgt 30 68.950 ± 3.164 ms/op streaming 1000 avgt 30 27.623 ± 0.776 ms/op hash 1 avgt 30 451.901 ± 15.327 ms/op hash 10 avgt 30 94.244 ± 2.437 ms/op hash 1000 avgt 30 53.038 ± 0.980 ms/op
ebe3291
to
cc916c4
Compare
🎉 |
Optimize aggregation in a correlated scalar subquery to remove unnecessary local exchange and partial aggregation and enable streaming mode.
Correlated subquery like
select name, (select max(name) from region where regionkey = nation.regionkey) from nation;
is rewritten to a left join with an aggregation over synthetic row ID column generated by AssignUniqueId node:
Since "unique" column is unique, the aggregation input is partitioned on grouping keys and therefore aggregation can be executed in a single step without any exchanges. Furthermore, since the output of the join is grouped on "unique" column, the aggregation doesn't need to accumulate all of the input in memory before emitting results. It can generate results every time the value of the "unique" column changes.
This PR implements streaming support in
HashAggregationOperator
, updatesStreamPropertyDerivations
andPropertyDerivations
to setparitioned_on(unique)
andgrouped(unique)
properties for the output of theAssignUniqueId
node, and uses these properties inAddLocalExchanges
to enable streaming aggregation.Fixes #8171