Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use streaming aggregation for a correlated scalar subquery #10731

Merged
merged 10 commits into from
Jun 20, 2018

Conversation

mbasmanova
Copy link
Contributor

@mbasmanova mbasmanova commented May 31, 2018

Optimize aggregation in a correlated scalar subquery to remove unnecessary local exchange and partial aggregation and enable streaming mode.

Correlated subquery like

select name, (select max(name) from region where regionkey = nation.regionkey) from nation;

is rewritten to a left join with an aggregation over synthetic row ID column generated by AssignUniqueId node:

- Aggregation (unique, probe.*)
  - LeftJoin
     - AssignUniqueId (unique)
        - probe
     - build

Since "unique" column is unique, the aggregation input is partitioned on grouping keys and therefore aggregation can be executed in a single step without any exchanges. Furthermore, since the output of the join is grouped on "unique" column, the aggregation doesn't need to accumulate all of the input in memory before emitting results. It can generate results every time the value of the "unique" column changes.

This PR implements streaming support in HashAggregationOperator, updates StreamPropertyDerivations and PropertyDerivations to set paritioned_on(unique) and grouped(unique) properties for the output of the AssignUniqueId node, and uses these properties in AddLocalExchanges to enable streaming aggregation.

Fixes #8171

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch 2 times, most recently from 5a70194 to c40a872 Compare June 2, 2018 00:23
@mbasmanova mbasmanova changed the title [WIP] Streaming aggregation operator [WIP] Optimize aggregation in correlated scalar subquery Jun 2, 2018
@mbasmanova mbasmanova changed the title [WIP] Optimize aggregation in correlated scalar subquery [WIP] Optimize aggregation in a correlated scalar subquery Jun 2, 2018
@mbasmanova mbasmanova force-pushed the streaming-aggregation branch from c40a872 to bcdbd1f Compare June 2, 2018 01:19
@mbasmanova mbasmanova changed the title [WIP] Optimize aggregation in a correlated scalar subquery Optimize aggregation in a correlated scalar subquery Jun 2, 2018
@mbasmanova mbasmanova requested a review from sopel39 June 2, 2018 05:17
@@ -45,6 +45,7 @@
private final PlanNode source;
private final Map<Symbol, Aggregation> aggregations;
private final List<List<Symbol>> groupingSets;
private final List<Symbol> preGroupedSymbols;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that. This is clever and more generic. I was just thinking about another type of aggregation AggregationNode.Step, e.g: STREAMING

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this like a trait but inside a plan node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sort of. It's the same approach we took for WindowNode (prePartitionedInputs, preSortedOrderPrefix)

@@ -295,6 +311,20 @@ public HashAggregationOperator(
this.groupByTypes = ImmutableList.copyOf(groupByTypes);
this.groupByChannels = ImmutableList.copyOf(groupByChannels);
this.globalAggregationGroupIds = ImmutableList.copyOf(globalAggregationGroupIds);
this.preGroupedChannels = ImmutableList.copyOf(preGroupedChannels);
ImmutableList.Builder<Type> builder = ImmutableList.builder();
for (Integer channel : preGroupedChannels) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract to a static method

this.preGroupedChannels = ImmutableList.copyOf(preGroupedChannels);
ImmutableList.Builder<Type> builder = ImmutableList.builder();
for (Integer channel : preGroupedChannels) {
boolean found = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found is unused

@@ -455,7 +528,7 @@ public Page getOutput()
}

// only flush if we are finishing or the aggregation builder is full
if (!finishing && (aggregationBuilder == null || !aggregationBuilder.isFull())) {
if (!finishing && (aggregationBuilder == null || !aggregationBuilder.isFull()) && pendingInput == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work poorly if there is a lot of small groups (e.g: we will flush small pages and constantly resetAggregationBuilder).

Additional condition here should be that aggregationBuilder accumulated enough groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi @sopel39 Karol, I was wondering about this and was thinking of using MergingPageOutput to avoid producing tiny pages. Do you think that will help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not as you would allocate aggregation builders per group. It is probably quite expensive as those are complicated objects

unfinishedWork = aggregationBuilder.processPage(page.getRegion(0, groupEnd));
pendingInput = page.getRegion(groupEnd, page.getPositionCount() - groupEnd);
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if groups always span entire page, then you will never flush aggregates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Good catch. Thank you, Karol.

@@ -56,6 +59,7 @@
private final List<Type> groupByTypes;
private final List<Integer> groupByChannels;
private final List<Integer> globalAggregationGroupIds;
private final List<Integer> preGroupedChannels;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is clever but it makes me wonder if we will use this feature of HashAggregationOperator?

I was rather thinking of a dedicated streaming aggregation operator that would allow us to avoid hash lookups, hash computations and so on (e.g: similar as AggregationOperator). Such dedicated streaming aggregation operator should be more performant in correlated subqueries context than extended HashAggregationOperator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 @martint Karol, I'm seeing that we currently use AggregationOperator for global aggregations (set of grouping keys is empty) and HashAggregationOperator for all other aggregations. I'm also seeing that correlated subqueries generate aggregations with grouping keys made of unique and all the probe columns. Such aggregations prevent pruning of probe columns and require unnecessary hashing. Alternatively, sub-queries could be using aggregations on just the unique and apply arbitrary grouping function to the probe columns. E.g.

select name, (select max(name) from region where regionkey = nation.regionkey) from nation;

could be re-written to

- Aggregation [unique] -> arbitrary(nation.name), max(region.name)
   - LeftJoin (region.regionkey = nation.regionkey)
      - AssignUniqueId
         - TableScan(nation)
      - TableScan(region)

This way, the planner would see that aggregation input is grouped on all grouping columns and hashing is unnecessary and be able to plan streaming aggregation without hashing. Otherwise, if the planner only sees that aggregation input is grouped on a subset of grouping columns, it can't automatically decide whether simple aggregation or hash aggregation is better. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, sub-queries could be using aggregations on just the unique and apply arbitrary grouping function to the probe columns. E.g.

I think such construct will confuse stats computation. We could improve TransformCorrelatedScalarAggregationToJoin though to not include unused symbols in top aggregation. I think for that we would need to extend LateralJoinNode to include explicit list of outputSymbols as JoinNode do.

Nota that pruning passes through LateralJoinNode but correlated (but unused later on) symbols won't be pruned from probe side and will take part in aggregation.

This way, the planner would see that aggregation input is grouped on all grouping columns and hashing is unnecessary and be able to plan streaming aggregation without hashing

I'm confused. I think AssignUniqueId makes all columns grouped (not only uniqueId column). See: https://github.com/prestodb/presto/pull/10731/files/1b9e5c11be55febd887fc80bd1a6fbc68d08da26#diff-64cfd4c0cfe165b7dcc4934f126200e5

Optional.empty()),
child.getProperties());
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
}

return planAndEnforceChildren(node, requiredProperties, requiredProperties);
StreamPreferredProperties childRequirements = parentPreferences
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 @martint Karol, as I was planning these changes I asked Martin if this should be a separate rule. At the time Martin suggested to keep it in AddLocalExchanges (similar to logic in visitWindow) to avoid traversing the tree and re-computing the properties.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way it makes it harder to migrate AddLocalExchanges to IterativeOptimizer. Maybe we should push harder towards traits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kokosing Grzegorz, what do you refer to by traits? Is there an example I could check out?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traits is a concept of properties but in the world of IterativeOptimizer or Memo (similar to stats or cost), where you store some information (property) about group reference (relation) in the memo. That way you don't need to recompute it per each rule invocation and also you could possibly use it rule pattern saying something like "this rule will trigger when node is aggregation and its source is already pre aggregated by columns a,b,c).

There is some initial (POC) PR about this: #9691, but it requires changes to match the CBO.

@mbasmanova mbasmanova requested a review from findepi June 4, 2018 17:06

ImmutableList.Builder<LocalProperty<Symbol>> localPropertiesBuilder = ImmutableList.builder();
localPropertiesBuilder.addAll(properties.getLocalProperties());
localPropertiesBuilder.add(new GroupingProperty<>(ImmutableSet.of(node.getIdColumn())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make stronger statement here that we are grouped on all columns, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Karol, I think you are correct. Thanks for pointing it out. It might be the final piece I was missing.

We can say that the data is grouped on {unique + subset-of-source-symbols} for all subsets of the source symbols. This property will survive the join and will allow streaming aggregation without any hashing (as you suggested earlier). I'll implement StreamingAggregationOperator similar to AggregationOperator and drop changes to HashAggregationOperator.

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch 4 times, most recently from e17e174 to 30af132 Compare June 6, 2018 03:59
@mbasmanova
Copy link
Contributor Author

@sopel39 Karol, I implemented a dedicated StreamingAggregationOperator similar to AggregationOperator and dropped changes to HashAggregationOperator. Would you take another look?

{
ImmutableList.Builder<Aggregator> builder = ImmutableList.builder();
for (AccumulatorFactory factory : accumulatorFactories) {
builder.add(new Aggregator(factory, step));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to support partial streaming aggregations now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 I don't have a use case for partial streaming aggregation yet and I'm not sure how I would implement it given the concerns you raised on the changes to HashAggregationOperator. E.g. we'd need a reusable/resettable HashAggregationBuilder , right? What are your thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Karol, I might have misunderstood your question. You are asking about Step#PARTIAL, right? I was thinking about the case when data stream is grouped on a subset of grouping keys.

StreamingAggregationOperator should work for Step#PARTIAL aggregations just fine. Is there anything special that needs to be taken care of?

Copy link
Contributor

@sopel39 sopel39 Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova I was just wondering if we plan to support partial streaming aggregation at this moment. Both partial streaming aggregation and streaming hash aggregation seem like a great features, but I wonder if there are practical use cases for them at this point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to support partial streaming aggregations now?

It occurred to me that such imprecise sentence might be understood with negative bias which I did not intend.

private List<Aggregator> aggregates;
private final PageBuilder pageBuilder;
private Page pendingInput;
private Page previousPage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of whole page we should just store single row from group (seems easier to grasp)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Is where an easy way to do that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think similar to what you do but just get region of size 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Isn't this what previousPage = page.getRegion(page.getPositionCount() - 1, 1); does already?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I would just change field name then


private void updateMemoryUsage()
{
long memorySize = pageBuilder.getRetainedSizeInBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should account form previous page too

requireNonNull(page, "page is null");

if (previousPage != null) {
if (!rowEqualsRow(0, previousPage, 0, page)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is implicit assumption that the previous page contains only rows from single group. Maybe consider keeping just a single row from group (my other comment)

}
}

private void processInput(Page page)
Copy link
Contributor

@sopel39 sopel39 Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems rather complex flow. What would you about something like (pseudo code):

for (row in page) {
  if (previousGroupRow != null && !previousGroupRow.equals(row))
    evaluateAggregatesToPageBuilder();
    resetAggregators();
    previousGroupRow = null;
  }

  addRowToAggregates();
  if (previousGroupRow == null) {
    // first row from group
    previousGroupRow = row
  }
}

and

void evaluateAggregatesToPageBuilder() {
  if (pageBuilder.isFull()) {
    outputPages.add(pageBuiler.build());
    pageBuilder.reset();
  }
  ...
}

The idea is to store produced output pages in order to simplify processing of input pages.

You wouldn't then have to check
if (pageBuilder.getPositionCount() >= minOutputPageRowCount || pageBuilder.getRetainedSizeInBytes() >= minOutputPageSize.toBytes()) {

because you could write needsInput as:

boolean needsInput() {
  return !finishing && outputPages.isEmpty() 
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this snipped I used row by row iteration, but that is not how Accumulator work. We should operate on ranges, but principals stay the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Karol, thanks for the suggestion. I used some of the ideas to simplify the logic, but couldn't get it to quite the shape you asked for. Any chance you could take another look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic in this class (and this method in particular) would be easier to follow if instead of tracking "previousRow" you just kept track of "currentGroup" -- you only need to compare against that, ever.

Then, this method might be rewritten as follows (beware, I haven't tested it or made sure the logic is 100% correct):

private void processInput(Page page)
{
    requireNonNull(page, "page is null");

    if (currentGroup != null) {
        if (!rowEqualsRow(0, page, 0, currentGroup)) {
            // page starts with new group, so flush it
            evaluateAggregatesToPageBuilder(currentGroup, 0);

            currentGroup = page.getRegion(0, 1);

            if (pageBuilder.isFull()) {
                return;
            }
        }
    }

    int startPosition = 0;
    while (startPosition < page.getPositionCount()) {
        // may be equal to page.getPositionCount() if the end is not found in this page
        int nextGroupStart = findNextGroupStart(startPosition, page);

        addRowsToAggregates(page, startPosition, nextGroupStart - 1);

        if (nextGroupStart < page.getPositionCount()) {
            // current group stops somewhere in the middle of the page, so flush it
            evaluateAggregatesToPageBuilder(page, startPosition);

            currentGroup = page.getRegion(nextGroupStart, 1);

            if (pageBuilder.isFull()) {
                pendingInput = page.getRegion(nextGroupStart, page.getPositionCount() - nextGroupStart + 1);
                return;
            }
        }

        startPosition = nextGroupStart;
    }
}

with findNextGroupStart as:

private int findNextGroupStart(int startPosition, Page page)
{
    for (int i = startPosition; i < page.getPositionCount(); i++) {
        if (!rowEqualsRow(i, page, 0, currentGroup)) {
            return i;
        }
    }

    return page.getPositionCount();
}

@@ -2339,6 +2340,10 @@ private PhysicalOperation planGroupByAggregation(
}
}

boolean streaming = !node.getPreGroupedSymbols().isEmpty()
&& node.getGroupingSets().size() == 1
&& node.getPreGroupedSymbols().equals(Iterables.getOnlyElement(node.getGroupingSets()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably should compare sets here instead of lists.

private boolean canSkipHashGeneration(AggregationNode node)
{
if (!node.getPreGroupedSymbols().isEmpty() &&
node.getGroupingSets().size() == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add AggregationNode#isStreamingAggregation method. Use it in local execution planner too

@@ -156,6 +156,7 @@ private AggregationNode replaceAggregationSource(
source,
aggregation.getAggregations(),
ImmutableList.of(groupingSet),
ImmutableList.of(),
Copy link
Contributor

@sopel39 sopel39 Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't create non-streaming partial aggregation from streaming aggregation. This will hurt performance and might be incorrect too (partial aggregations might break grouping since they can flush in a an undeterministic way, e.g: mid-group).

@@ -215,6 +215,7 @@ private PlanNode split(AggregationNode node, Context context)
node.getSource(),
intermediateAggregation,
node.getGroupingSets(),
ImmutableList.of(),
Copy link
Contributor

@sopel39 sopel39 Jun 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't create non-streaming partial aggregation for streaming aggregation. This will hurt performance and might be incorrect too (partial aggregations might break grouping since they can flush in a an undeterministic way).

{
if (!node.getPreGroupedSymbols().isEmpty() &&
node.getGroupingSets().size() == 1
&& node.getPreGroupedSymbols().equals(Iterables.getOnlyElement(node.getGroupingSets()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also check that the only grouping set is non-empty.

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch 3 times, most recently from 3b4e101 to 18019de Compare June 7, 2018 03:15
@sopel39
Copy link
Contributor

sopel39 commented Jun 7, 2018

Optional: You could capture pattern:

aggregation
   - left join
      - assign unique id

and then rewrite aggregation to:

aggregation[STREAMING](keys=allProbeSymbols, preGroupedKeys=unique)

this way we would reduce row comparison overhead

Copy link
Contributor

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this ready for final pass?

// (1) result.getHashSymbols() contains all requiredHashes;
// (2) result.getHashSymbols() is a superset of requiredHashes combined with preferredHashes (no pruning is needed)
Set<HashComputation> resultHashes = result.getHashSymbols().keySet();
preferenceSatisfied = resultHashes.containsAll(requiredHashes.getHashes())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the issue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39

For plans where Join and RemoteExchange are separated by AssignUniqueId, this logic prevented the hash on join keys produced by RemoteExchange to be reused by the Join.

This plan (created by the new PushUpAssignUniqueIdThroughRemoteExchange rule)

- Join
   - AssignUniqueId
      - RemoteExchange

was changed to

- Join
   - Project: (add hash on probe join key)
      - AssignUniqueId
          - Project: (drop hash on probe join key)
             - RemoteExchange

E.g.

This query

SELECT name, (SELECT max(name) FROM region WHERE regionkey = nation.regionkey AND length(name) > length(nation.name)) FROM nation

was planned as

- Aggregate[<name, regionkey, unique>] => [name:varchar(25), regionkey:bigint, unique:bigint, max:varchar(25)]
        max := "max"("name_1")
    - LeftJoin[("regionkey" = "regionkey_0") AND ("length"("name_1") > "length"("name"))][$hashvalue_17, $hashvalue_18] => [name:varchar(25), regionkey:bigint, unique:bigint, name_1:varchar(25)]
            Distribution: PARTITIONED
        - Project[] => [name:varchar(25), regionkey:bigint, unique:bigint, $hashvalue_17:bigint]
                $hashvalue_17 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0))
            - AssignUniqueId => [name:varchar(25), regionkey:bigint, unique:bigint]
                - Project[] => [name:varchar(25), regionkey:bigint]
                    - RemoteExchange[REPARTITION][$hashvalue] => name:varchar(25), regionkey:bigint, $hashvalue:bigint
                        - ScanProject[table = local:tpch:nation:sf0.01, originalConstraint = true] => [name:varchar(25), regionkey:bigint, $hashvalue_16:bigint]

instead of

- Aggregate[<name, regionkey, unique>] => [name:varchar(25), regionkey:bigint, unique:bigint, max:varchar(25)]
        max := "max"("name_1")
    - LeftJoin[("regionkey" = "regionkey_0") AND ("length"("name_1") > "length"("name"))][$hashvalue, $hashvalue_17] => [name:varchar(25), regionkey:bigint, unique:bigint, name_1:varchar(25)]
            Distribution: PARTITIONED
        - AssignUniqueId => [name:varchar(25), regionkey:bigint, $hashvalue:bigint, unique:bigint]
            - RemoteExchange[REPARTITION][$hashvalue] => name:varchar(25), regionkey:bigint, $hashvalue:bigint
                - ScanProject[table = local:tpch:nation:sf0.01, originalConstraint = true] => [name:varchar(25), regionkey:bigint, $hashvalue_16:bigint]

// (2) result.getHashSymbols() is a superset of requiredHashes combined with preferredHashes (no pruning is needed)
Set<HashComputation> resultHashes = result.getHashSymbols().keySet();
preferenceSatisfied = resultHashes.containsAll(requiredHashes.getHashes())
&& ImmutableSet.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract union set to a separate variable

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch 2 times, most recently from 2c9560f to d7be4f0 Compare June 7, 2018 18:00
@mbasmanova
Copy link
Contributor Author

@sopel39

is this ready for final pass?

Yes, I think so.

Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@@ -45,6 +45,7 @@
private final PlanNode source;
private final Map<Symbol, Aggregation> aggregations;
private final List<List<Symbol>> groupingSets;
private final List<Symbol> preGroupedSymbols;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this like a trait but inside a plan node?

@@ -185,7 +198,7 @@ public boolean hasOrderings()
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, step, hashSymbol, groupIdSymbol);
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, preGroupedSymbols, step, hashSymbol, groupIdSymbol);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[just thinking] If understand it correctly, then it is hard to say if newChildren is still grouped on preGroupedSymbols

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule should take care not to produce invalid plan.

Maybe we should validate aggregations at the end of planning like we validate default aggregations with: com.facebook.presto.sql.planner.sanity.ValidateAggregationsWithDefaultValues?

@@ -222,6 +235,16 @@ public boolean hasSingleNodeExecutionPreference(FunctionRegistry functionRegistr
return (hasEmptyGroupingSet() && !hasNonEmptyGroupingSet()) || (hasDefaultOutput() && !isDecomposable(functionRegistry));
}

public boolean isStreamable()
{
if (!preGroupedSymbols.isEmpty() && groupingSets.size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to check that aggregations.isEmpty()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kokosing I might be missing something, but aggregations provide the computation to perform on each group (sum, count, arbitrary). We don't want these to be empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I misunderstood what streamable aggregations mean. Thanks.

Optional.empty()),
child.getProperties());
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
}

return planAndEnforceChildren(node, requiredProperties, requiredProperties);
StreamPreferredProperties childRequirements = parentPreferences
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way it makes it harder to migrate AddLocalExchanges to IterativeOptimizer. Maybe we should push harder towards traits.

PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements);

List<Symbol> preGroupedSymbols;
if (!groupingKeys.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupingKeys is an union of all symbols in groupingSets. As I understand currently there is no point to find plan with preGroupedSymbols matching groupingKeys if there are multiple entries in groupingSets. Maybe you should collect this only when groupingSets.size() == 1?

@Test
public void testStreamingAggregationForCorrelatedSubquery()
{
assertPlanWithSession("SELECT name, (SELECT max(name) FROM region WHERE regionkey = nation.regionkey AND length(name) > length(nation.name)) FROM nation",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put each argument in separate line like:

method(
    arg1,
    arg2,
    ...,
    lastArg);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kokosing I believe I'm doing this except in cases where it consumes too much vertical space without adding any value. I think this is consistent with the rest of the code in this file. Is there a specific change you'd like me to make? Could you point it out?

@@ -670,7 +670,16 @@ private PlanWithProperties planAndEnforce(

boolean preferenceSatisfied;
if (pruneExtraHashSymbols) {
preferenceSatisfied = result.getHashSymbols().keySet().equals(requiredHashes.getHashes());
// Make sure that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be this separated to different commit? Can you add a test for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kokosing @sopel39 I don't see any unit tests for HashGenerationOptimizer and I cannot use BaseRuleTest as it is an old-style rule. What should I do? Is there an example I could follow?

// (1) result has all required hashes
// (2) any extra hashes are preferred hashes (e.g. no pruning is needed)
Set<HashComputation> resultHashes = result.getHashSymbols().keySet();
ImmutableSet<HashComputation> requiredAndPreferredHashes = ImmutableSet.<HashComputation>builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declare it as Set

Copy link
Contributor

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewed up to: Optimize aggregation in a correlated scalar subquery

@@ -185,7 +198,7 @@ public boolean hasOrderings()
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren)
{
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, step, hashSymbol, groupIdSymbol);
return new AggregationNode(getId(), Iterables.getOnlyElement(newChildren), aggregations, groupingSets, preGroupedSymbols, step, hashSymbol, groupIdSymbol);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule should take care not to produce invalid plan.

Maybe we should validate aggregations at the end of planning like we validate default aggregations with: com.facebook.presto.sql.planner.sanity.ValidateAggregationsWithDefaultValues?

{
if (!preGroupedSymbols.isEmpty() && groupingSets.size() == 1) {
List<Symbol> groupingSet = Iterables.getOnlyElement(groupingSets);
return !groupingSet.isEmpty() && Sets.newHashSet(preGroupedSymbols).equals(Sets.newHashSet(groupingSet));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to check !groupingSet.isEmpty() (is is already implied by !preGroupedSymbols.isEmpty()).

Maybe use ImmutableSet.copyOf instead of Sets.newHashSet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 I'm moving these checks to AddLocalExchange and adding a sanity check to the AggregationNode constructor. This should address this comment and Rule should take care not to produce invalid plan above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validator makes sure that other rules didn't break input properties of streaming aggregation.

{
if (!preGroupedSymbols.isEmpty() && groupingSets.size() == 1) {
List<Symbol> groupingSet = Iterables.getOnlyElement(groupingSets);
return !groupingSet.isEmpty() && Sets.newHashSet(preGroupedSymbols).equals(Sets.newHashSet(groupingSet));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: aggregation is also streamable when groupingSet contains preGroupedSymbols (but not equals), but we just don't support it yet (via HashAggregationOperator).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 That's right

if (!groupingKeys.isEmpty()) {
List<LocalProperty<Symbol>> desiredProperties = ImmutableList.of(new GroupingProperty<>(groupingKeys));
Iterator<Optional<LocalProperty<Symbol>>> matchIterator = LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).iterator();
Optional<LocalProperty<Symbol>> groupingRequirement = matchIterator.next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use getOnlyElement


List<Symbol> preGroupedSymbols;
if (!groupingKeys.isEmpty()) {
List<LocalProperty<Symbol>> desiredProperties = ImmutableList.of(new GroupingProperty<>(groupingKeys));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract this logic to a separate method

return leftProperties
.withUnspecifiedPartitioning()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: I would extract it to separate commit and maybe add a test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Before I spend too much time building out testing infrastructure, could you tell if there is an existing test I could use as an example. I don't see any unit tests for StreamPropertyDerivations.

Copy link
Contributor

@sopel39 sopel39 Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I improved assign unique id node properties I added test like: TestLogicalPlanner#testUsesDistributedJoinIfNaturallyPartitionedOnProbeSymbols (I've synthesized a query for which improvements made a difference).

If it is not possible to find such query for your changes maybe they are unused? We could also skip the test for now if it's really hard to find proper test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 In isolation, StreamPropertyDerivations changes for LEFT join are not needed. However, they are necessary to propagate newly generated properties of AssignUniqueId. Without LEFT join changes, newly added test TestLogicalPlanner#testStreamingAggregationForCorrelatedSubquery fails.

I can move these changes into a separate commit, but I don't know how to test them without testing changes to AssignUniqueId as well.

@@ -431,7 +429,14 @@ public StreamProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<St
@Override
public StreamProperties visitAssignUniqueId(AssignUniqueId node, List<StreamProperties> inputProperties)
{
return Iterables.getOnlyElement(inputProperties);
StreamProperties properties = Iterables.getOnlyElement(inputProperties);
if (properties.getPartitioningColumns().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment, e.g:

                // preserve input (possibly preferred) partitioning

node(ValuesNode.class)))
.withAlias("NON_NULL", expression("true"))))))))),
plan -> assertEquals(countFinalAggregationNodes(plan), extraAggregation ? 2 : 1));
aggregation(ImmutableMap.of("COUNT", functionCall("count", ImmutableList.of("NON_NULL"))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: we could explicitly test aggregation step here.

anyTree(tableScan("nation", ImmutableMap.of("n_name", "name", "n_regionkey", "regionkey"))))),
anyTree(tableScan("region", ImmutableMap.of("r_name", "name")))))));

assertPlanWithSession("SELECT name, (SELECT max(name) FROM region WHERE regionkey > nation.regionkey) FROM nation",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment what is the difference from previous test

this.getQueryRunner().getDefaultSession(),
false,
anyTree(
aggregation(ImmutableList.of(ImmutableList.of("n_name", "n_regionkey", "unique")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please break lines after each argument.

@findepi findepi changed the title Optimize aggregation in a correlated scalar subquery Use streaming aggregation for a correlated scalar subquery Jun 11, 2018
@mbasmanova mbasmanova force-pushed the streaming-aggregation branch from d7be4f0 to 161ad36 Compare June 12, 2018 01:53
@mbasmanova mbasmanova force-pushed the streaming-aggregation branch from 857bc70 to 04957e2 Compare June 14, 2018 23:50
@mbasmanova
Copy link
Contributor Author

@martint @sopel39 Martin, Karol, thank you for reviews. I addressed your comments and updated the PR.

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch from 04957e2 to c1c42cf Compare June 15, 2018 17:49
@@ -613,6 +633,20 @@ private PlanWithProperties deriveProperties(PlanNode result, List<StreamProperti
}
}

private List<Symbol> computePreGroupedSymbols(List<List<Symbol>> groupingSets, List<Symbol> groupingKeys, PlanWithProperties childWithProperties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method is misleading. It doesn't really compute pre-grouped columns. Rather, it validates that the provided list is pre-grouped (it's an all-or-nothing).

I suspect this is so that we can extend it in the future to return a subset, but it makes reasoning about the code harder. My preference would be to replace it with an implementation that is more precise about the current intent. In fact, you can probably just replace this whole thing with this in the caller:

List<Symbol> preGroupedSymbols = ImmutableList.of();

if (LocalProperties.match(childWithProperties.getProperties().getLocalProperties(), new GroupingProperty<>(groupingKeys)).isPresent()) {
    preGroupedSymbols = groupingKeys;
}

At a minimum, there should be a comment in the method explaining why it's abstracted that way (but IMO, that's premature over-abstraction)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint I like the proposed code, but... it doesn't looks that nice after I fix compilation issues. Here is what I get:

List<Symbol> preGroupedSymbols = ImmutableList.of();

if (!Iterables.getOnlyElement(LocalProperties.match(child.getProperties().getLocalProperties(), ImmutableList.of(new GroupingProperty<>(groupingKeys)))).isPresent()) {
    preGroupedSymbols = groupingKeys;
}

Iterables.getOnlyElement and ImmutableList.of make it a lot more verbose and having "! match" is counterintuitive.

I can make it less verbose by adding match override to LocalProperties:

public static <T> Optional<LocalProperty<T>> match(List<LocalProperty<T>> actuals, LocalProperty<T> desired) {
    return Iterables.getOnlyElement(match(actuals, ImmutableList.of(desired)));
}

but that brings ! and match closer to each other so it actually reads as "if no match, then ..."

if (!LocalProperties.match(child.getProperties().getLocalProperties(), new GroupingProperty<>(groupingKeys)).isPresent()) {
    preGroupedSymbols = groupingKeys;
}

Would renaming this method to extractPreGrouppedSymbols work better? Or do you have any other suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... yeah, that doesn't read that nicely.

If we're going to keep the method I'd suggest something like:

List<Symbol> preGroupedSymbols = ImmutableList.of();
if (isGroupedOn(groupingKeys, child.getProperties().getLocalProperties())) {
    preGroupedSymbols = groupingKeys;
}

and

private boolean isGroupedOn(List<Symbol> columns, List<LocalProperty<Symbol>> properties)
{
    // !isPresent() indicates the property was satisfied completely
    return !LocalProperties.match(properties, LocalProperties.grouped(columns)).get(0).isPresent();
}

Although, you could just do that inline and have a comment to clarify the behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint I could do that, but...just to clarify, are you implying that I should add support for multiple grouping sets ? (the name and implementation of isGroupOn suggest that) One of the challenges in doing that would be to find queries for testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just saying that from the point of view of the AggregationNode, one or many grouping sets doesn't matter. The computation of the grouping sets is handled by GroupIdNode, which produces an additional synthetic column the aggregation has to group on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint I looked into how multiple grouping sets are implemented. Like you mentioned, we use an auxiliary GroupIdNode that

...for every input row, emits rows for each
  grouping set specified. The row generated will contain NULLs in the
  channels corresponding to the columns present in the union of all
  grouping columns but not present in the current grouping set.

I couldn't find where and how the rows with different values in group_id column are redirected to the right aggregators. But, assuming this happens somehow, the implementation of GroupIdOperator doesn't seem to preserve any local properties on the grouping keys and therefore doesn't allow for streaming aggregations.

Given that the presence of GroupIdNode prevents streaming aggregation, what would be the benefit of updating the StreamingAggregationOperator to handle multiple grouping sets and a group_id column?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying we should add support for multiple grouping sets to the operator, but from the point of view of deriving properties, we shouldn't have to special case any knowledge about multiple grouping sets. "pregrouped symbols" should just indicate that certain symbols are pre-grouped. The decision of whether to use streaming should be made by the the rule that turns an aggregation that has pregrouped group-by columns + a single grouping set into a streaming agg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint I can move groupingSets.size() == 1 && !Iterables.getOnlyElement(groupingSets).isEmpty() check to AggregationNode#isStreamable and inline isGroupedOn's logic you suggested here. Would that work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that would be perfect.

private List<Symbol> computePreGroupedSymbols(List<List<Symbol>> groupingSets, List<Symbol> groupingKeys, PlanWithProperties childWithProperties)
{
// TODO Add streaming aggregation support for multiple grouping sets
if (groupingSets.size() == 1 && !Iterables.getOnlyElement(groupingSets).isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the special casing for 1 vs many grouping sets? From the point of view of the aggregation there should be no difference in how they execute. There's just an extra column containing the group id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint Primarily because I don't have a use case for multiple grouping sets. I didn't want to add non-trivial functionality that I can't test.

@@ -383,6 +383,16 @@ public ActualProperties visitJoin(JoinNode node, List<ActualProperties> inputPro
constants.putAll(probeProperties.getConstants());
constants.putAll(buildProperties.getConstants());

if (node.isCrossJoin()) {
// Cross join preserves only constants from probe and build sides.
// Cross join doesn't preserve sorting or grouping local properties on either side.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. It used to preserve sorting/grouping on the left side, but I guess that broke when we implemented the optimized execution strategy that generates RLE blocks for the smaller page on either side. It also means that the NestedLoopsJoin operator is now misnamed, since it doesn't behave like a NLJ anymore -- it's now truly a CrossJoinOperator.

@@ -106,6 +106,7 @@ public String toString()
{
return toStringHelper(this)
.omitNullValues()
.add("type", joinType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint Yes and no. I added this while debugging test failures. Do you want me to put it into a separate commit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be ideal. The way I think about these kinds of changes is "if I had to revert the main change due to a bug, do I also need to lose this?"

@@ -183,13 +183,23 @@ public ActualProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<Ac
public ActualProperties visitAssignUniqueId(AssignUniqueId node, List<ActualProperties> inputProperties)
{
ActualProperties properties = Iterables.getOnlyElement(inputProperties);

ImmutableList.Builder<LocalProperty<Symbol>> localPropertiesBuilder = ImmutableList.builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name this "newProperties" or "newLocalProperties" ? It makes the intent clearer. Also the "builder" part in the name is irrelevant (sometimes you have to do it to deconflict with other variables, but this is not such a case)

{
AssignUniqueId assignUniqueId = captures.get(ASSIGN_UNIQUE_ID);
PartitioningScheme partitioningScheme = node.getPartitioningScheme();
if (partitioningScheme.getPartitioning().getColumns().contains(assignUniqueId.getIdColumn())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining that the partitioning being done by the remote exchange depends on the results of the id column added by AssignUniqueId, so we can't move it above.

@@ -183,13 +183,23 @@ public ActualProperties visitEnforceSingleRow(EnforceSingleRowNode node, List<Ac
public ActualProperties visitAssignUniqueId(AssignUniqueId node, List<ActualProperties> inputProperties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit message is still off. It mentions HashGenerationOptimizer, but that seems to be changed elsewhere.

}
}

private static Page rearrangePage(Page page, int[] channels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this "extractColumns" ? rearrangePage is a bit too vague and doesn't convey what's the purpose of the method

return new Page(page.getPositionCount(), newBlocks);
}

private void addRowsToAggregates(Page page, int startPosition, int endPosition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be clearer if endPosition were inclusive

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch 2 times, most recently from d634be7 to 7f74701 Compare June 15, 2018 20:24
@mbasmanova
Copy link
Contributor Author

@martint addressed/replied to comments

@mbasmanova mbasmanova force-pushed the streaming-aggregation branch from 7f74701 to 3333ab1 Compare June 18, 2018 18:24
@mbasmanova
Copy link
Contributor Author

@martint comments addressed

Copy link
Contributor

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but take a look at the test failures.

@mbasmanova
Copy link
Contributor Author

@martint Thank you, Martin.

@sopel39 Karol, is this good to go?

@@ -56,6 +56,11 @@

private static boolean isSupportedAggregationNode(AggregationNode aggregationNode)
{
// Don't split streaming aggregations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it part of rule pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Isn't it already so?

    private static final Pattern<AggregationNode> PATTERN = aggregation()
            .matching(PushPartialAggregationThroughJoin::isSupportedAggregationNode)
            .with(source().matching(join().capturedAs(JOIN_NODE)));

@@ -255,7 +255,7 @@ public PlanWithProperties visitAggregation(AggregationNode node, StreamPreferred
return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
}

StreamPreferredProperties requiredProperties = parentPreferences.withDefaultParallelism(session).withPartitioning(node.getGroupingKeys());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit title is too long.

{
// "orders" table is naturally grouped on orderkey
// this grouping should survive an inner join and allow for streaming aggregation later
// this grouping should not survive a cross join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// "orders" table is naturally grouped on orderkey
// this grouping should survive an inner join and allow for streaming aggregation later
// this grouping should not survive a cross join
assertPlan("SELECT o.orderkey, count(*) FROM orders o, lineitem l WHERE o.orderkey=l.orderkey GROUP BY 1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need GROUP BY 1? How can you project unaggregated o.orderkey in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 GROUP BY 1 here is equivalent to GROUP BY o.orderkey

if (properties.getNodePartitioning().isPresent()) {
// preserve input (possibly preferred) partitioning
return properties;
return ActualProperties.builderFrom(properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it explicit:

ActualProperties.builder()
  // preserve input (possibly preferred) partitioning
  .global(properties)
  .constants(properties.getConstants())
  .local(newLocalProperties.build())
  .build()

}

if (step.isOutputPartial()) {
systemMemoryContext.setBytes(memorySize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is never reached in current code.

this.groupByTypes = ImmutableList.copyOf(requireNonNull(groupByTypes, "groupByTypes is null"));
this.groupByChannels = Ints.toArray(requireNonNull(groupByChannels, "groupByChannels is null"));
this.accumulatorFactories = requireNonNull(accumulatorFactories, "accumulatorFactories is null");
this.step = requireNonNull(step, "step is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check state that step is FINAL?

if (nextGroupStart < page.getPositionCount()) {
// current group stops somewhere in the middle of the page, so flush it
evaluateAndFlushGroup(page, startPosition);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

while (true) {
// may be equal to page.getPositionCount() if the end is not found in this page
int nextGroupStart = findNextGroupStart(startPosition, groupByPage);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

@@ -92,6 +93,11 @@ public final void destroyPlanTest()
queryRunner = null;
}

public ConnectorId getCurrentConnectorId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use it in com.facebook.presto.sql.planner.sanity.TestValidateAggregationsWithDefaultValues#setup too

Also, fixed dormant bug in local property derivations for cross joins.
Scalar subqueries are often rewritten as follows:

- Aggregation(unique, a, b)
  - LeftJoin
    - AssignUniqueId(unique)
      - probe (a, b)
    - build

In this plan, aggregation input is grouped on all grouping keys and allows for
single-step streaming execution. This commit updates PropertyDerivations and
StreamPropertyDerivations to derive partitioned_on(unique) global property and
{grouped(unique), constant(a), constant(b)} local properties for aggregation
input. These changes allow AddLocalExchange to plan streaming aggregations.

Specific changes are:

- Update PropertyDerivations to
  - set {grouped(unique), constant(a), constant(b)} local properties for the
    output of AssignUniqueId node;

- Update StreamPropertyDerivations to
  - set partitioned_on(unique) for the output of AssignUniqueId node;
Append (STREAMING) to Aggregate node when printing query plan.

For example,

EXPLAIN (TYPE DISTRIBUTED)
SELECT (SELECT count(*) FROM region r2 WHERE r2.regionkey > r1.regionkey)
FROM region r1;

- Aggregate(STREAMING)[regionkey, unique] => [regionkey:bigint, unique:bigint, count:bigint]
       count := "count"("non_null")
   - LeftJoin[("regionkey_0" > "regionkey")] => [regionkey:bigint, unique:bigint, non_null:boolean]
           Distribution: REPLICATED
           SortExpression["regionkey_0"]
       - AssignUniqueId => [regionkey:bigint, unique:bigint]
           - TableScan[tpch:tpch:region:sf0.01, originalConstraint = true] => [regionkey:bigint]
Scalar subqueries are often rewritten as follows:

- Aggregation(unique, a, b)
  - LeftJoin
    - RemoteExchange (REPARTITION)
      - AssignUniqueId(unique)
        - probe (a, b)
    - RemoteExchange (REPARTITION)
      - build

In these plans, streaming aggregation is not applied because
grouped(unique, a, b) local property generated by the AssignUniqueId node
can't reach the Aggregation node as it gets dropped when passing through the
RemoteExchange node.

This commit adds an optimizer rule to push up AssignUniqueId node through the
RemoteExchange to allow for streaming aggregation. It also updates
HashGenerationOptimizer to avoid generating unnecessary hashes for streaming
aggregations. The modified plan looks like this:

- Aggregation(unique, a, b)
  - LeftJoin
    - AssignUniqueId(unique)
      - RemoteExchange (REPARTITION)
        - probe (a, b)
    - RemoteExchange (REPARTITION)
      - build
StreamingAggregationOperator applies only when aggregation source is already
grouped on the grouping keys, e.g. aggregations over joins generated for the
scalar subqueries. In these cases, StreamingAggregationOperator is
more efficient than HashAggregationOperator as it uses less CPU and a lot less
memory.

Add a benchmark to compare StreamingAggregationOperator and
HashAggregationOperator for pre-grouped data sources.

  (operatorType)  (rowsPerGroup)  Mode  Cnt    Score    Error  Units
       streaming               1  avgt   30  379.203 ±  9.431  ms/op
       streaming              10  avgt   30   68.950 ±  3.164  ms/op
       streaming            1000  avgt   30   27.623 ±  0.776  ms/op
            hash               1  avgt   30  451.901 ± 15.327  ms/op
            hash              10  avgt   30   94.244 ±  2.437  ms/op
            hash            1000  avgt   30   53.038 ±  0.980  ms/op
@mbasmanova mbasmanova force-pushed the streaming-aggregation branch from ebe3291 to cc916c4 Compare June 20, 2018 16:13
@mbasmanova mbasmanova merged commit c50646e into prestodb:master Jun 20, 2018
@mbasmanova mbasmanova deleted the streaming-aggregation branch June 20, 2018 20:33
@findepi
Copy link
Contributor

findepi commented Jun 20, 2018

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants