Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial aggregation is missing when filter reduces the data to a single group #10971

Closed
mbasmanova opened this issue Jul 3, 2018 · 11 comments
Closed

Comments

@mbasmanova
Copy link
Contributor

The plan for the following query is missing partial aggregation making it so that aggregation happens very slowly on a single node. This looks like a regression caused by #10731

explain select sum(totalprice), orderstatus from orders where orderstatus='O' group by orderstatus;

 - Output[_col0, orderstatus] => [sum:double, orderstatus:varchar(1)]                                                                                                                
         Cost: {rows: 1 (20B), cpu: 1026620.00, memory: 20.00, network: 220010.00}                                                                                                   
         _col0 := sum                                                                                                                                                                
     - RemoteExchange[GATHER] => orderstatus:varchar(1), sum:double                                                                                                                  
             Cost: {rows: 1 (20B), cpu: 1026620.00, memory: 20.00, network: 220010.00}                                                                                               
         - Aggregate(STREAMING)[orderstatus] => [orderstatus:varchar(1), sum:double]                                                                                                 
                 Cost: {rows: 1 (20B), cpu: 1026620.00, memory: 20.00, network: 219990.00}                                                                                           
                 sum := "sum"("totalprice")                                                                                                                                          
             - LocalExchange[HASH][$hashvalue] ("orderstatus") => orderstatus:varchar(1), totalprice:double, $hashvalue:bigint                                                       
                     Cost: {rows: 7333 (214.83kB), cpu: 806630.00, memory: 0.00, network: 219990.00}                                                                                 
                 - RemoteExchange[REPARTITION][$hashvalue_9] => orderstatus:varchar(1), totalprice:double, $hashvalue_9:bigint                                                       
                         Cost: {rows: 7333 (214.83kB), cpu: 586640.00, memory: 0.00, network: 219990.00}                                                                             
                     - ScanProject[table = tpch:tpch:orders:sf0.01, originalConstraint = ("orderstatus" = 'O')] => [orderstatus:varchar(1), totalprice:double, $hashvalue_10:bigint] 
                             Cost: {rows: 7333 (143.22kB), cpu: 146660.00, memory: 0.00, network: 0.00}/{rows: 7333 (214.83kB), cpu: 366650.00, memory: 0.00, network: 0.00}         
                             $hashvalue_10 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("orderstatus"), 0))                                                          
                             orderstatus := tpch:orderstatus                                                                                                                         
                                 :: [[O]]                                                                                                                                            
                             totalprice := tpch:totalprice                                                                                                                           
@mbasmanova
Copy link
Contributor Author

This is because PushPartialAggregationThroughExchange rule doesn't apply to streaming aggregations.

@mbasmanova
Copy link
Contributor Author

CC @sopel39

@mbasmanova
Copy link
Contributor Author

I'm seeing that AddExchanges#visitAggregation inserts a RemoteExchange under the AggregationNode to satisfy partitioned_on(orderstatus) property:

- Aggregation (SINGLE, orderstatus)
  - TableScan 

becomes

- Aggregation (SINGLE, orderstatus)
  - RemoteExchange (HASH, orderstatus)
     - TableScan 

Then AddLocalExchanges sets AggregationNode#preGroupedSymbols to {orderstatus} based on Constant(orderstatus) stream property and PushPartialAggregationThroughExchange is skipped.

@mbasmanova
Copy link
Contributor Author

Streaming aggregation over repartitioning of gather exchange implies that all grouping keys are constant. One option is to rewrite it as a global aggregation. That would be my preference, but I think global aggregations currently don't support constant columns. We'd have to wrap grouping keys in arbitrary(k). Another option is to extend PushPartialAggregationThroughExchange to streaming aggregations as follows:

- Aggregation (SINGLE, STREAMING)
   - Exchange

becomes

- Aggregation (FINAL)
   - Exchange
      - Aggregation (PARTIAL, STREAMING)

We can drop streaming from the partial aggregation for simplicity.

@martint @sopel39 Thoughts?

@sopel39
Copy link
Contributor

sopel39 commented Jul 4, 2018

Another option is to extend PushPartialAggregationThroughExchange to streaming aggregations as follows:

Maybe we could simply add partial streaming aggregation always, e.g: rewrite:

- Aggregation (SINGLE, STREAMING)
   - Exchange

into

- Aggregation (FINAL, STREAMING)
   - Exchange
     - Aggregation (PARTIAL, STREAMING)

Is there a case when such rewrite is invalid? As you noted streaming aggregation above exchange implies constant pre-grouped symbols.

Alternatively we could rewrite to:

- Aggregation (FINAL, STREAMING)
   - Exchange
     - Aggregation (PARTIAL, HASH)

One option is to rewrite it as a global aggregation

Global aggregation produce default value, which we don't want here.

@mbasmanova
Copy link
Contributor Author

@sopel39

Global aggregation produce default value, which we don't want here.

That's right. Thanks for pointing it out.

Maybe we could simply add partial streaming aggregation always

That should work. It should be a pretty simple change.

@mbasmanova
Copy link
Contributor Author

@sopel39 @martint Martin and I discussed this some more. We believe streaming aggregation above exchange implies constant pre-grouped symbols assertion is true today, but may change later, at which point we may have a regression. To properly make a decision about whether a streaming aggregation can be split over remote exchange, we need the rule to know the properties of the data, e.g. ideally we need traits. For now, the proposal is to update AddLocalExchanges to populate AggregatonNode#preGroupedKeys only if they are not constant, e.g. if we expect multiple groups.

@sopel39
Copy link
Contributor

sopel39 commented Jul 6, 2018

We believe streaming aggregation above exchange implies constant pre-grouped symbols assertion is true today, but may change later, at which point we may have a regression.

Still, I think that sources of GATHER/REPARTITION exchange needs to be grouped by the same symbols as the output of exchange. What would be the case where this doesn't apply?

In any case, we would catch a regression via: ValidateStreamingAggregations.

To properly make a decision about whether a streaming aggregation can be split over remote exchange, we need the rule to know the properties of the data, e.g. ideally we need traits

Traits would be a useful abstraction, but not required. In ValidateStreamingAggregations you directly derive properties for instance. In this case I'm not sure if we need to check properties of exchange source since we can imply it's grouped by the same symbols as exchange output.

For now, the proposal is to update AddLocalExchanges to populate AggregatonNode#preGroupedKeys only if they are not constant, e.g. if we expect multiple groups.

That would add slower hash exchange when grouping symbols are constant. Do we want that?

@mbasmanova
Copy link
Contributor Author

@sopel @martin

Karol, thanks for sharing your thoughts. My thinking on this evolved a bit since yesterday.

PushPartialAggregationThroughExchange aggregation rule works with an aggregation over exchange:

- Aggregation (SINGLE, STREAMING)
  - Exchange (REMOTE)

The question here is whether streaming aggregation can be split over exchange.

I believe it is always safe to drop streaming and split the aggregation:

- Aggregation (FINAL)
  - Exchange (REMOTE)
    - Aggregation (PARTIAL)

This does "add slower hash exchange when grouping symbols are constant" and we'd rather avoid it.

It is also safe to not split the aggregation, but the loss of performance in this case is not acceptable.

To keep streaming aggregation for final, partial or both we need to either derive properties or consider all possible cases.

It is a given that exchange output is pre-grouped. There are two possibilities: (1) exchange input is pre-grouped and exchange preserve that grouping; (2) exchange itself enforces grouping (e.g. merging exchange enforces order).

In case of (1) it is safe to split the aggregation into streaming partial and streaming final. In case of (2) we can't safely add streaming partial aggregation, hence, we'll add hash aggregation. Since we don't really know how exchange enforces grouping and whether it relies of some properties of the input stream to do that, we can't assume that it will be able to enforce grouping on output of partial hash aggregation. Therefore, we can't assume that exchange output will be grouped and can't safely use streaming final aggregation. To summarize, in case of (2) it is only safe to drop streaming when splitting.

Without deriving properties, we can't know whether we have (1) or (2), hence, we have to drop streaming.

That said, I prefer this change over updating AddLocalExchange to not populate AggregationNode#preGroupedKeys when pre-grouped keys are constant. It seems cleaner to keep the change in the same rule whose limitation (lack of properties) requires it.

At the same time, I think it would be valuable to avoid "slower hash exchange when grouping symbols are constant". I see a couple of options to achieve that:

  • Change PushPartialAggregationThroughExchange to derive properties (similar to ValidateStreamingAggregations and some other rules) and add logic to preserve streaming if exchange input is pre-grouped.

  • Convince ourselves that in practice, we won't have case (2) above and rely on ValidateStreamingAggregations runtime check to catch any regressions.

My preference would be the following:

(1) submit a PR to always drop streaming when splitting to fix the regression in the upcoming release;
(2) submit a follow-up PR to derive properties in PushPartialAggregationThroughExchange and keep streaming if properties allow.

Martin, Karol, what do you think?

@martint
Copy link
Contributor

martint commented Jul 6, 2018

That seems reasonable, but it "(2) submit a follow-up PR to derive properties" may be harder than it seems or introduce things we want to move away from. The rule-based optimizer is not set up to make it easy for rules to arbitrarily walk the plan tree (to derive properties), which is why I've mentioned we might need to have support for traits and first-class properties support in the rule-based optimization framework before we can do that.

As to "slower hash exchange when grouping symbols are constant", we might want to measure the actual impact. It may turn out to be small enough that adding short-term hacks or unwanted complexity is not warranted.

@sopel39
Copy link
Contributor

sopel39 commented Jul 9, 2018

Just to note:

In case of (2) we can't safely add streaming partial aggregation, hence, we'll add hash aggregation
(2) exchange itself enforces grouping (e.g. merging exchange enforces order).

Merging exchange does preserve grouping property as sources themselves are ordered

mbasmanova added a commit to mbasmanova/presto that referenced this issue Jul 9, 2018
Streaming aggregation requires pre-grouped input. Splitting the
aggregation and pushing partial aggregation through the exchange may
or may not preserve grouping properties of the input. See
prestodb#10971 for more details.
mbasmanova added a commit that referenced this issue Jul 10, 2018
Streaming aggregation requires pre-grouped input. Splitting the
aggregation and pushing partial aggregation through the exchange may
or may not preserve grouping properties of the input. See
#10971 for more details.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants