Skip to content

[SPARK-37455][SQL] Replace hash with sort aggregate if child is already sorted #34702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Nov 24, 2021

What changes were proposed in this pull request?

In the query plan, if the child of hash aggregate is already sorted on group-by columns, we can replace hash aggregate with sort aggregate for better performance, as sort aggregate does not have hashing overhead of hash aggregate. Add a physical plan rule ReplaceHashWithSortAgg here, and can be disabled by config spark.sql.execution.replaceHashWithSortAgg.

In addition, to help review as this PR triggers several TPCDS plan files change. The files below are having the real code change:

  • SQLConf.scala
  • QueryExecution.scala
  • ReplaceHashWithSortAgg.scala
  • AdaptiveSparkPlanExec.scala
  • HashAggregateExec.scala
  • ReplaceHashWithSortAggSuite.scala
  • SQLMetricsSuite.scala

Why are the changes needed?

To get better query performance by leveraging sort ordering in query plan.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in ReplaceHashWithSortAggSuite.scala.

@github-actions github-actions bot added the SQL label Nov 24, 2021
@c21
Copy link
Contributor Author

c21 commented Nov 24, 2021

cc @cloud-fan could you help take a look when you have time? Thanks!

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50066/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50066/

@SparkQA
Copy link

SparkQA commented Nov 25, 2021

Test build #145594 has finished for PR 34702 at commit 6448864.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -423,6 +423,9 @@ object QueryExecution {
PlanSubqueries(sparkSession),
RemoveRedundantProjects,
EnsureRequirements(),
// `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` to guarantee the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because the planner is top-down so we don't know the child ordering during planning? Then we have to add a new rule to change the agg algorithm in a post-hoc way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. If we change our planning to bottom-up and propagate each node output ordering info during planning, then we can run this rule during planning. For now, we have to add it after EnsureRequirements.

if (SortOrder.orderingSatisfies(
partialAgg.child.outputOrdering, sortAgg.requiredChildOrdering.head)) {
sortAgg.copy(
aggregateExpressions = sortAgg.aggregateExpressions.map(_.copy(mode = Complete)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it always right? I think we also need to check the output partitioning to see if we can eliminate the partial agg.

An example is df.sortWithinPartitions. It does not cluster the data, just sort it within each partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - I don't think we need to check output partitioning, as we are matching a pair of final and partial hash agg, without shuffle in between:

  HashAggregate(final)
          |                                       SortAggregate(complete)
HashAggregate(partial)             =>                    |
          |                                            child
        child 

So child must already have proper output partitioning for SortAggregate, o.w. it cannot satisfy original HashAggregate(final)'s required distribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, if there is a shuffle in the middle, we can't optimize? This looks quite limited, as having a shuffle in the middle is very common.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a shuffle in the middle, we can't optimize?

We can, and the rule here also does pattern matching for single HashAggregate below. I added a unit test case in ReplaceHashWithSortAggSuite.scala to demonstrate replacing partial aggregate - "replace partial hash aggregate with sort aggregate". But I think it would be rare to be able to replace final aggregate (though this rule also covers it), as final aggregate is almostly always immediately after a shuffle, so there's no sort ordering before final aggregate.

Spark native shuffle does not guarantee any sort orders, for Cosco (a remote shuffle service we are running in-house), we support sorted shuffle, so final aggregate can also be possible to replace.

@SparkQA
Copy link

SparkQA commented Nov 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50120/

@SparkQA
Copy link

SparkQA commented Nov 26, 2021

Test build #145649 has finished for PR 34702 at commit a683137.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 26, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50120/

@@ -694,7 +694,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}

test("SPARK-25497: LIMIT within whole stage codegen should not consume all the inputs") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.REPLACE_HASH_WITH_SORT_AGG_ENABLED.key -> "false") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test is testing multiple limit operators and hash aggregate operators in one single stage. Disable the rule here because sort aggregate does not support code-gen now, and will break the test logic.

@SparkQA
Copy link

SparkQA commented Nov 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50142/

@SparkQA
Copy link

SparkQA commented Nov 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50142/

@SparkQA
Copy link

SparkQA commented Nov 27, 2021

Test build #145672 has finished for PR 34702 at commit e8609fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Nov 29, 2021

This PR is ready for review again thanks @cloud-fan.

/**
* Check if `partialAgg` to be partial aggregate of `finalAgg`.
*/
private def isPartialAgg(partialAgg: HashAggregateExec, finalAgg: HashAggregateExec): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like reverse enginering the AggUtils. Could we just link the partial and final agg when they are constructed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanelk - yeah I agree this is mostly reverse engineering and we can do a better job here. I tried link partial and final agg in AggUtils and check linked physical plan to be same or not. This does not quite work due to we are doing top-down planning, and the linked partial agg not being same as planned partial agg (having PlanLater operator in linked partial agg).

I found a more elegant way to do it, by checking the linked logical plan of both aggs to be same. Updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan for review, thanks.

@SparkQA
Copy link

SparkQA commented Dec 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50300/

@SparkQA
Copy link

SparkQA commented Dec 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50300/

@SparkQA
Copy link

SparkQA commented Dec 2, 2021

Test build #145825 has finished for PR 34702 at commit cff1424.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* HashAggregate(t1.i, SUM, final)
* | SortAggregate(t1.i, SUM, complete)
* HashAggregate(t1.i, SUM, partial) => |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an orthogonal optimization: we can merge adjacent partial and final aggregates (no shuffle between them) into one complete aggregate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we can add a rule later to optimize it. I vaguely remember someone proposed this in OSS before but seems impact is not high.

@cloud-fan
Copy link
Contributor

The code change LGTM. Since there are quite some TPCDS queries that get plan changes, can we run a TPCDS benchmark to verify performance improvement?

@c21
Copy link
Contributor Author

c21 commented Dec 3, 2021

Since there are quite some TPCDS queries that get plan changes, can we run a TPCDS benchmark to verify performance improvement?

@cloud-fan - sure. Today I ran the TPCDS benchmark (sf=1) on one AWS r3.xlarge (same as #26049). I don't see much performance difference compared enabling and disabling this rule:

Then I tried with sf=5, but the benchmark has task failure with no space left on device, so the benchmark cannot be conducted on single machine.

Do you recommend disabling this rule by default? After adding sort aggregate code-gen, we can do more large scale testing to enable it. WDYT?

@c21
Copy link
Contributor Author

c21 commented Dec 3, 2021

Discussed offline with @cloud-fan, we decide to disable the rule by default now. After adding sort aggregate code-gen, a large-scale TPCDS benchmark can be done later.

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50355/

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50355/

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Test build #145880 has finished for PR 34702 at commit 8ce7d27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 544865d Dec 3, 2021
@c21
Copy link
Contributor Author

c21 commented Dec 3, 2021

Thank you @cloud-fan and @tanelk for review!

@c21 c21 deleted the agg-rule branch December 3, 2021 21:58
*/
private def replaceHashAgg(plan: SparkPlan): SparkPlan = {
plan.transformDown {
case hashAgg: HashAggregateExec if hashAgg.groupingExpressions.nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, shall we handle ObjectHashAggregateExec as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - yeah I agree. Don't see a problem why we cannot do it. Created https://issues.apache.org/jira/browse/SPARK-37557 for followup. Will do it shortly, thanks.

cloud-fan pushed a commit that referenced this pull request Dec 7, 2021
…s already sorted

### What changes were proposed in this pull request?

This is a follow up of #34702 (comment) , where we can replace object hash aggregate with sort aggregate as well. This PR is to handle object hash aggregate.

### Why are the changes needed?

Increase coverage of rule by handling object hash aggregate as well.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified unit test in `ReplaceHashWithSortAggSuite.scala` to cover object hash aggregate (by using aggregate expression `COLLECT_LIST`).

Closes #34824 from c21/agg-rule-followup.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants