-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Add option to replace SortMergeJoin with ShuffledHashJoin #1007
base: main
Are you sure you want to change the base?
Conversation
CometFilter [ca_address_sk,ca_state] | ||
CometScan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state] | ||
InputAdapter | ||
BroadcastExchange #7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a regression that I am looking into (falling back to Spark for BroadcastHashJoin)
Here is a teaser for the performance improvement. This is for TPC-H q11 (SF=100) with broadcast joins disabled (I am looking into a regression with those). I ran the query 5 times each with rule enabled vs disabled. Rule Off
Rule On
|
There is a small danger in enabling this without having a good estimate of the size of the build side. ShuffleHashJoin has limits on how much data it can process efficiently. If the build side hash table has no spilling then a large enough build side will cause OOMs and if there is spilling, then SMJ can frequently lead to better performance. We might even see this when we scale the benchmark from SF1 to say SF10. |
|
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") | ||
.doc("Whether to replace SortMergeJoin with ShuffledHashJoin for improved performance.") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a default value as false for stablility. Spark decides to use SMJ for some reasons including data statistics. If Spark thinks SHJ may not work, I think we better follow it except for explicitly asking by users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other accelerators (Spark RAPIDS and Gluten) default this to true. Perhaps we should benchmark at large scale factors before and see if we run into any issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is okay for the benchmark datasets like TPCDS or TPCH. The cases I worry about is the production ones. But it might be more internal cases.
For OSS, maybe enabling it by default is okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least, we should add some more descriptions here to mention the risk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should benchmark at large scale factors before and see if we run into any issues?
Agreed. (Also, when I wrote SF1 and SF10 I meant 1TB, and 10TB which is really SF 1000 and SF 10000).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this PR, I disabled the feature by default. I created the following PR to enable it by default and update the tests. I will add documentation as part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a new follow on issue for enabling by default:
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1007 +/- ##
=============================================
+ Coverage 34.30% 54.91% +20.61%
+ Complexity 887 858 -29
=============================================
Files 112 110 -2
Lines 43429 10913 -32516
Branches 9623 2110 -7513
=============================================
- Hits 14897 5993 -8904
+ Misses 25473 3845 -21628
+ Partials 3059 1075 -1984 ☔ View full report in Codecov by Sentry. |
I will add documentation to this PR today, explaining pros/cons of this feature in our tuning guide. |
kube/Dockerfile
Outdated
@@ -65,4 +65,4 @@ ENV SCALA_VERSION=2.12 | |||
USER root | |||
|
|||
# note the use of a wildcard in the file name so that this works with both snapshot and final release versions | |||
COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-0.2.0*.jar $SPARK_HOME/jars | |||
COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-*.jar $SPARK_HOME/jars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated, but ran into this hard-coded version number during testing
@viirya @parthchandra This is now ready for review. The new option is disabled by default and I added a section to the tuning guide explaining why users may want to enable this new option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.
I have run into a deadlock when running TPC-DS benchmarks with this feature, so I am moving to draft while I investigate. It is possibly related to the memory pool issues that we are also working on in other PRs. |
After upmerging, I no longer see the deadlock, but instead get an error if I have insufficient memory allocated, which is an improvement.
However, when I increase memory, I see queries fail due to #1019. |
I have now marked the feature as experimental and explained in the tuning guide that there is no spill to disk so this could result in OOM. |
val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = | ||
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") | ||
.doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " + | ||
"for improved performance. See tuning guide for more information regarding stability of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a link to the tuning guide?
@@ -50,6 +50,7 @@ Comet provides the following configuration settings. | |||
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | |||
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | |||
| spark.comet.exec.project.enabled | Whether to enable project by default. | true | | |||
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. See tuning guide for more information regarding stability of this feature. | false | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a link to the tuning guide? 🙏
Which issue does this PR close?
Closes #1006
Rationale for this change
Improved performance
What changes are included in this PR?
How are these changes tested?
I manually ran TPC-H and saw improved performance. I will post benchmarks once I have run more tests.