Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable columnar shuffle by default #250

Merged
merged 32 commits into from
May 9, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Apr 9, 2024

Which issue does this PR close?

Closes #95.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya closed this Apr 10, 2024
@viirya viirya reopened this Apr 10, 2024
@codecov-commenter
Copy link

codecov-commenter commented Apr 10, 2024

Codecov Report

Attention: Patch coverage is 50.00000% with 4 lines in your changes are missing coverage. Please review.

Project coverage is 33.50%. Comparing base (9ab6c75) to head (ace91fe).
Report is 9 commits behind head on main.

Files Patch % Lines
.../scala/org/apache/spark/sql/comet/util/Utils.scala 0.00% 2 Missing ⚠️
...ain/scala/org/apache/comet/vector/NativeUtil.scala 0.00% 1 Missing ⚠️
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #250      +/-   ##
============================================
+ Coverage     33.47%   33.50%   +0.03%     
- Complexity      795      798       +3     
============================================
  Files           110      110              
  Lines         37533    37541       +8     
  Branches       8215     8217       +2     
============================================
+ Hits          12563    12579      +16     
+ Misses        22322    22321       -1     
+ Partials       2648     2641       -7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@viirya
Copy link
Member Author

viirya commented Apr 10, 2024

Hmm, all tests in TPCDSQuerySuite` are passed locally with this PR. I need to look at the CI failure.

@viirya
Copy link
Member Author

viirya commented Apr 11, 2024

Observed several Spark SQL test failures regarding aggregation: #260

@@ -1414,6 +1424,7 @@ index ed2e309fa07..4cfe0093da7 100644
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observed that Comet is unable to acquire enough memory for columnar shuffle when doing Spark SQL tests:

For example, DatasetPrimitiveSuite:

  Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 37.0 failed 1 times, most recent failure: Lost task 1.0 in stage 37.0 (TID 75) (e4773b5abe7e executor driver): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108848 bytes of memory, got 96 bytes. Available: 96
[info] 	at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocate(CometShuffleMemoryAllocator.java:132)
[info] 	at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocatePage(CometShuffleMemoryAllocator.java:119)
[info] 	at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.initialCurrentPage(SpillWriter.java:158)
[info] 	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.insertRow(CometDiskBlockWriter.java:284)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased Comet memoryoverhead to overcome it.

@viirya viirya force-pushed the columnar_shuffle_default branch 2 times, most recently from 54da021 to 7465384 Compare April 16, 2024 18:55
@viirya viirya force-pushed the columnar_shuffle_default branch from 37c8186 to b37070d Compare May 4, 2024 21:59
@viirya
Copy link
Member Author

viirya commented May 5, 2024

I fixed all Spark SQL test failures. Now waiting for #380 to be merged.

@viirya
Copy link
Member Author

viirya commented May 6, 2024

cc @sunchao @andygrove

Comment on lines +34 to +35
protected val aliasCandidateLimit: Int =
conf.getConfString("spark.sql.optimizer.expressionProjectionCandidateLimit", "100").toInt
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some Spark tests tune this config. So we need to get the configured value.

@viirya
Copy link
Member Author

viirya commented May 8, 2024

@andygrove @sunchao Could you take a look and see if you have any comments on this? Thanks.

Co-authored-by: Andy Grove <andygrove73@gmail.com>
Co-authored-by: Andy Grove <andygrove73@gmail.com>
assert(
- collect(df.queryExecution.executedPlan) { case e: ShuffleExchangeExec => e }.size == expected)
+ collect(df.queryExecution.executedPlan) {
+ case _: ShuffleExchangeExec | _: CometShuffleExchangeExec => 1 }.size == expected)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just check for ShuffleExchangeLike instead (and even push that change upstream?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. For upstream, not sure if it is accepted, but we can try.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to check ShuffleExchangeLike instead for the places which is possible.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with all of the Spark tests in the patch, but the changes to Comet LGTM

@viirya
Copy link
Member Author

viirya commented May 8, 2024

Thank you @andygrove

@viirya viirya merged commit 14494d3 into apache:main May 9, 2024
40 checks passed
@viirya
Copy link
Member Author

viirya commented May 9, 2024

Merged. Thanks @andygrove for review.

himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* feat: Enable columnar shuffle by default

* Update plan stability

* Fix

* Update diff

* Add Comet memoryoverhead for Spark SQL tests

* Update plan stability

* Update diff

* Update more diff

* Update DataFusion commit

* Update diff

* Update diff

* Update diff

* Update diff

* Update diff

* Fix more tests

* Fix more

* Fix

* Fix more

* Fix more

* Fix more

* Fix more

* Fix more

* Update diff

* Fix memory leak

* Update plan stability

* Restore diff

* Update core/src/execution/datafusion/planner.rs

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* Update core/src/execution/datafusion/planner.rs

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* Fix style

* Use ShuffleExchangeLike instead

---------

Co-authored-by: Andy Grove <andygrove73@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable columnar shuffle by default
3 participants