-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-37455][SQL] Replace hash with sort aggregate if child is already sorted #34702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan could you help take a look when you have time? Thanks! |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145594 has finished for PR 34702 at commit
|
@@ -423,6 +423,9 @@ object QueryExecution { | |||
PlanSubqueries(sparkSession), | |||
RemoveRedundantProjects, | |||
EnsureRequirements(), | |||
// `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` to guarantee the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because the planner is top-down so we don't know the child ordering during planning? Then we have to add a new rule to change the agg algorithm in a post-hoc way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is. If we change our planning to bottom-up and propagate each node output ordering info during planning, then we can run this rule during planning. For now, we have to add it after EnsureRequirements
.
if (SortOrder.orderingSatisfies( | ||
partialAgg.child.outputOrdering, sortAgg.requiredChildOrdering.head)) { | ||
sortAgg.copy( | ||
aggregateExpressions = sortAgg.aggregateExpressions.map(_.copy(mode = Complete)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it always right? I think we also need to check the output partitioning to see if we can eliminate the partial agg.
An example is df.sortWithinPartitions
. It does not cluster the data, just sort it within each partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - I don't think we need to check output partitioning, as we are matching a pair of final and partial hash agg, without shuffle in between:
HashAggregate(final)
| SortAggregate(complete)
HashAggregate(partial) => |
| child
child
So child
must already have proper output partitioning for SortAggregate
, o.w. it cannot satisfy original HashAggregate(final)
's required distribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok, if there is a shuffle in the middle, we can't optimize? This looks quite limited, as having a shuffle in the middle is very common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is a shuffle in the middle, we can't optimize?
We can, and the rule here also does pattern matching for single HashAggregate
below. I added a unit test case in ReplaceHashWithSortAggSuite.scala
to demonstrate replacing partial aggregate - "replace partial hash aggregate with sort aggregate"
. But I think it would be rare to be able to replace final aggregate (though this rule also covers it), as final aggregate is almostly always immediately after a shuffle, so there's no sort ordering before final aggregate.
Spark native shuffle does not guarantee any sort orders, for Cosco (a remote shuffle service we are running in-house), we support sorted shuffle, so final aggregate can also be possible to replace.
Kubernetes integration test starting |
Test build #145649 has finished for PR 34702 at commit
|
Kubernetes integration test status failure |
@@ -694,7 +694,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils | |||
} | |||
|
|||
test("SPARK-25497: LIMIT within whole stage codegen should not consume all the inputs") { | |||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { | |||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", | |||
SQLConf.REPLACE_HASH_WITH_SORT_AGG_ENABLED.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unit test is testing multiple limit operators and hash aggregate operators in one single stage. Disable the rule here because sort aggregate does not support code-gen now, and will break the test logic.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145672 has finished for PR 34702 at commit
|
This PR is ready for review again thanks @cloud-fan. |
/** | ||
* Check if `partialAgg` to be partial aggregate of `finalAgg`. | ||
*/ | ||
private def isPartialAgg(partialAgg: HashAggregateExec, finalAgg: HashAggregateExec): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like reverse enginering the AggUtils
. Could we just link the partial and final agg when they are constructed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tanelk - yeah I agree this is mostly reverse engineering and we can do a better job here. I tried link partial and final agg in AggUtils
and check linked physical plan to be same or not. This does not quite work due to we are doing top-down planning, and the linked partial agg not being same as planned partial agg (having PlanLater
operator in linked partial agg).
I found a more elegant way to do it, by checking the linked logical plan of both aggs to be same. Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan for review, thanks.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145825 has finished for PR 34702 at commit
|
* | ||
* HashAggregate(t1.i, SUM, final) | ||
* | SortAggregate(t1.i, SUM, complete) | ||
* HashAggregate(t1.i, SUM, partial) => | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an orthogonal optimization: we can merge adjacent partial and final aggregates (no shuffle between them) into one complete aggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we can add a rule later to optimize it. I vaguely remember someone proposed this in OSS before but seems impact is not high.
The code change LGTM. Since there are quite some TPCDS queries that get plan changes, can we run a TPCDS benchmark to verify performance improvement? |
@cloud-fan - sure. Today I ran the TPCDS benchmark (sf=1) on one AWS Then I tried with sf=5, but the benchmark has task failure with no space left on device, so the benchmark cannot be conducted on single machine. Do you recommend disabling this rule by default? After adding sort aggregate code-gen, we can do more large scale testing to enable it. WDYT? |
Discussed offline with @cloud-fan, we decide to disable the rule by default now. After adding sort aggregate code-gen, a large-scale TPCDS benchmark can be done later. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145880 has finished for PR 34702 at commit
|
thanks, merging to master! |
Thank you @cloud-fan and @tanelk for review! |
*/ | ||
private def replaceHashAgg(plan: SparkPlan): SparkPlan = { | ||
plan.transformDown { | ||
case hashAgg: HashAggregateExec if hashAgg.groupingExpressions.nonEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, shall we handle ObjectHashAggregateExec
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - yeah I agree. Don't see a problem why we cannot do it. Created https://issues.apache.org/jira/browse/SPARK-37557 for followup. Will do it shortly, thanks.
…s already sorted ### What changes were proposed in this pull request? This is a follow up of #34702 (comment) , where we can replace object hash aggregate with sort aggregate as well. This PR is to handle object hash aggregate. ### Why are the changes needed? Increase coverage of rule by handling object hash aggregate as well. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified unit test in `ReplaceHashWithSortAggSuite.scala` to cover object hash aggregate (by using aggregate expression `COLLECT_LIST`). Closes #34824 from c21/agg-rule-followup. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In the query plan, if the child of hash aggregate is already sorted on group-by columns, we can replace hash aggregate with sort aggregate for better performance, as sort aggregate does not have hashing overhead of hash aggregate. Add a physical plan rule
ReplaceHashWithSortAgg
here, and can be disabled by configspark.sql.execution.replaceHashWithSortAgg
.In addition, to help review as this PR triggers several TPCDS plan files change. The files below are having the real code change:
SQLConf.scala
QueryExecution.scala
ReplaceHashWithSortAgg.scala
AdaptiveSparkPlanExec.scala
HashAggregateExec.scala
ReplaceHashWithSortAggSuite.scala
SQLMetricsSuite.scala
Why are the changes needed?
To get better query performance by leveraging sort ordering in query plan.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
ReplaceHashWithSortAggSuite.scala
.