-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-12978][SQL] Skip unnecessary final group-by when input data already clustered with group-by keys #10896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #49985 has finished for PR 10896 at commit
|
5ab19c1
to
1b7e3d8
Compare
Test build #49988 has finished for PR 10896 at commit
|
@marmbrus Could you review this and give me suggestions? |
@yhuai would be better to review this, but neither of those plans look great to me. Why are we not partial aggregating before a shuffle? Seems like that will ship a lot of data around for no reason. |
I guess that exchange is added because there is a |
Yes, it is. The input query is;
|
Okay, but that code doesn't actually produce an exchange right? Since its captured by the cache?
Eitherway, I'll let @yhuai sign off on this. |
@@ -86,20 +86,40 @@ object Utils { | |||
aggregateExpressions: Seq[AggregateExpression], | |||
aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), Attribute], | |||
resultExpressions: Seq[NamedExpression], | |||
skipUnnecessaryAggregate: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be clearer if you called this partialAggregation
. Its not unnecessary, its an optimization in most cases.
Ah, yes..., the code produces no exchange because of cache. |
As @marmbrus said, we also need push down partial aggregation under an exchange;
into
|
@yhuai ping |
Test build #50812 has finished for PR 10896 at commit
|
@yhuai ping |
I probably will not be able to take a close look on this PR until later this month. I have a question regarding the approach of PR. Right now, we always plan partial aggregation operators first (in SparkStrategies) and then add Exchange operators (in EnsureRequirements). Another approach will be that we do not add partial aggregation operators in SparkStrategies. Then, after we figure out where we need exchange operators, we add partial aggregation operators. This approach probably needs more code changes. But, I feel it is a more cleaner approach. |
@yhuai The second approach's good to me though, I'm not exactly sure how to remove unnecessary final aggregation covered in this pr. IMO these kinds of partial aggregation optimization seem to be similar to |
140da25
to
9d77c90
Compare
Test build #56883 has finished for PR 10896 at commit
|
Test build #56884 has finished for PR 10896 at commit
|
cc @hvanhovell can you review this? |
@hvanhovell ping |
@maropu I'll take a look today. Is the description up-to-date? |
@hvanhovell yeah, it is up-to-dated. |
@@ -81,20 +81,38 @@ object Utils { | |||
groupingExpressions: Seq[NamedExpression], | |||
aggregateExpressions: Seq[AggregateExpression], | |||
resultExpressions: Seq[NamedExpression], | |||
partialAggregation: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial aggregation IMO implies that we add a partial aggregation step. What do you think?
@@ -257,10 +257,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |||
planLater(child)) | |||
} | |||
} else if (functionsWithDistinct.isEmpty) { | |||
// Check if the child operator satisfies the group-by distribution requirements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not move this code block into aggregate.Utils.planAggregateWithoutDistinct
?
Thank for you comments! I'll check them in a few days. |
2b1bea6
to
36553bc
Compare
|
||
def unapply(plan: SparkPlan): Option[Distribution] = plan match { | ||
case agg: AggregateExec | ||
if agg.aggregateExpressions.map(_.aggregateFunction).forall(_.supportsPartial) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this in a function. This can be found a few times in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, okay.
okay, done |
Test build #64322 has finished for PR 10896 at commit
|
8a81e23
to
d5e0ed3
Compare
Test build #64324 has finished for PR 10896 at commit
|
Test build #64388 has finished for PR 10896 at commit
|
@hvanhovell could you also give me comments on #13852? |
LGTM - merging to master. Thanks! |
After this PR, we create the partial aggregate operator in I have a simpler idea: add a new rule which is run after |
* - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous) | ||
* - PartialMerge (now there is at most 1 tuple per group) | ||
* - StateStoreSave (saves the tuple for the next batch) | ||
* - Complete (output the current result of the aggregation) | ||
* | ||
* If the first aggregation needs a shuffle to satisfy its distribution, a map-side partial | ||
* an aggregation and a shuffle are added in `EnsureRequirements`. | ||
*/ | ||
def planStreamingAggregation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have we tested the streaming aggregation with the optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a bit risky to touch this part.
+1 for @cloud-fan's proposal. Instead of creating a performant plan using tricky code, it's clearer to create a naive but correct physical plan first and then optimize it. |
You could also argue the other way around, planning a partial aggregate is also a premature optimization, and that the planning of such an I do think things could be simplified even more, and that either pruning an unneeded partial aggregate or planning one in a new rule both have merit. |
@cloud-fan @liancheng yea, adding a new rule after |
I agree that partial aggregate is also kind of optimization, and it's tricky to put it in planner. I think it makes sense to clean it up, after we have sufficient discussion and come to a consensus, but not finishing it within an optimization. For this particular optimization, I think it's much simpler to add an extra rule to merge the partial and final aggregate, than spreading the aggregation stuff to cc @yhuai too |
Sorry for my bad explanation. yes, I agree that we remove the aggregation stuff from |
@maropu Thank you for working on this. Sorry that I did not get time to look at it after you updated the pr. I looked at it today. I think this optimization deserves a feature flag since it determines if we can generate a valid physical plan. We can enable it by default. But, we will have the flexibility to disable it when there is an issue. After looking at the code, I am not sure it is a good approach to put the logic of adding partial aggregate operators in EnsureRequirements. Originally, I thought we could have a individual rule to add partial aggregate operators and then either extract logic in EnsureRequirements as a utility function or run EnsureRequirements again. Also, due to the complexity of the logic for planning aggregations, seems after the change it is hard to track the planner logic. So, seems it will be good to try your your original proposal by adding a rule to remove unnecessary operators (like @cloud-fan implemented in #14876). In this way, it will also be very easy to add the feature flag and keep the optimization rule in a single place. Later, we can revisit this approach if we can clean up the planner logic for aggregation. What do you think? |
@yhuai Thanks your comment and I agree with you. We'll keep the discussion. |
## What changes were proposed in this pull request? according to the discussion in the original PR #10896 and the new approach PR #14876 , we decided to revert these 2 PRs and go with the new approach. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #14909 from cloud-fan/revert.
This ticket targets the optimization to skip an unnecessary group-by operation below;
Without opt.:
With opt.: