Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add option to replace SortMergeJoin with ShuffledHashJoin #1007

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Oct 9, 2024

Which issue does this PR close?

Closes #1006

Rationale for this change

Improved performance

What changes are included in this PR?

  • Add new config option to replace SMJ with SHJ

How are these changes tested?

I manually ran TPC-H and saw improved performance. I will post benchmarks once I have run more tests.

@andygrove andygrove marked this pull request as draft October 9, 2024 18:01
CometFilter [ca_address_sk,ca_state]
CometScan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state]
InputAdapter
BroadcastExchange #7
Copy link
Member Author

@andygrove andygrove Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a regression that I am looking into (falling back to Spark for BroadcastHashJoin)

@andygrove
Copy link
Member Author

andygrove commented Oct 9, 2024

Here is a teaser for the performance improvement. This is for TPC-H q11 (SF=100) with broadcast joins disabled (I am looking into a regression with those). I ran the query 5 times each with rule enabled vs disabled.

Rule Off

79.87537693977356,
77.76734256744385,
75.35734295845032,
75.44863200187683,
72.88174152374268

Rule On

39.33945274353027,
36.159271240234375,
35.83299708366394,
35.638232707977295,
35.67777371406555

@parthchandra
Copy link
Contributor

There is a small danger in enabling this without having a good estimate of the size of the build side. ShuffleHashJoin has limits on how much data it can process efficiently. If the build side hash table has no spilling then a large enough build side will cause OOMs and if there is spilling, then SMJ can frequently lead to better performance. We might even see this when we scale the benchmark from SF1 to say SF10.
Is there a way for us to get cardinality and row size for the build side somehow?
Still worth adding this option though.

@parthchandra
Copy link
Contributor

if there is spilling, then SMJ can frequently lead to better performance
I have seen this happen with Spark with some TPC-DS queries at SF10.

conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin")
.doc("Whether to replace SortMergeJoin with ShuffledHashJoin for improved performance.")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a default value as false for stablility. Spark decides to use SMJ for some reasons including data statistics. If Spark thinks SHJ may not work, I think we better follow it except for explicitly asking by users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other accelerators (Spark RAPIDS and Gluten) default this to true. Perhaps we should benchmark at large scale factors before and see if we run into any issues?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is okay for the benchmark datasets like TPCDS or TPCH. The cases I worry about is the production ones. But it might be more internal cases.

For OSS, maybe enabling it by default is okay.

Copy link
Member

@viirya viirya Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, we should add some more descriptions here to mention the risk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should benchmark at large scale factors before and see if we run into any issues?

Agreed. (Also, when I wrote SF1 and SF10 I meant 1TB, and 10TB which is really SF 1000 and SF 10000).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, I disabled the feature by default. I created the following PR to enable it by default and update the tests. I will add documentation as part of this PR.

#1008

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a new follow on issue for enabling by default:

#1011

@andygrove
Copy link
Member Author

Current benchmarks:

tpch_allqueries

Speedup of using HashJoin instead of SortMergeJoin:

tpch_queries_speedup

@codecov-commenter
Copy link

codecov-commenter commented Oct 9, 2024

Codecov Report

Attention: Patch coverage is 18.18182% with 27 lines in your changes missing coverage. Please review.

Project coverage is 54.91%. Comparing base (591f45a) to head (8f5d440).

Files with missing lines Patch % Lines
...ain/scala/org/apache/comet/rules/RewriteJoin.scala 0.00% 24 Missing ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 40.00% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1007       +/-   ##
=============================================
+ Coverage     34.30%   54.91%   +20.61%     
+ Complexity      887      858       -29     
=============================================
  Files           112      110        -2     
  Lines         43429    10913    -32516     
  Branches       9623     2110     -7513     
=============================================
- Hits          14897     5993     -8904     
+ Misses        25473     3845    -21628     
+ Partials       3059     1075     -1984     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove marked this pull request as ready for review October 10, 2024 03:06
@andygrove
Copy link
Member Author

I will add documentation to this PR today, explaining pros/cons of this feature in our tuning guide.

kube/Dockerfile Outdated
@@ -65,4 +65,4 @@ ENV SCALA_VERSION=2.12
USER root

# note the use of a wildcard in the file name so that this works with both snapshot and final release versions
COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-0.2.0*.jar $SPARK_HOME/jars
COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-*.jar $SPARK_HOME/jars
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but ran into this hard-coded version number during testing

@andygrove
Copy link
Member Author

@viirya @parthchandra This is now ready for review. The new option is disabled by default and I added a section to the tuning guide explaining why users may want to enable this new option.

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

@andygrove
Copy link
Member Author

I have run into a deadlock when running TPC-DS benchmarks with this feature, so I am moving to draft while I investigate. It is possibly related to the memory pool issues that we are also working on in other PRs.

@andygrove
Copy link
Member Author

andygrove commented Oct 15, 2024

After upmerging, I no longer see the deadlock, but instead get an error if I have insufficient memory allocated, which is an improvement.

org.apache.comet.CometNativeException (External error: Internal error: 

Partition is still not able to allocate enough memory for the array builders after spilling..

However, when I increase memory, I see queries fail due to #1019.

@andygrove andygrove marked this pull request as ready for review October 18, 2024 20:55
@andygrove
Copy link
Member Author

I have now marked the feature as experimental and explained in the tuning guide that there is no spill to disk so this could result in OOM.

@andygrove
Copy link
Member Author

Fresh benchmarks after upmerging.

tpch_allqueries

@andygrove
Copy link
Member Author

TPC-DS excluding q97 (OOM with ShuffledHashJoin).

tpcds_allqueries

val COMET_REPLACE_SMJ: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin")
.doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " +
"for improved performance. See tuning guide for more information regarding stability of " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a link to the tuning guide?

@@ -50,6 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. See tuning guide for more information regarding stability of this feature. | false |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a link to the tuning guide? 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add option to replace SortMergeJoin with ShuffleHashJoin
5 participants