-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning #42194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning #42194
Changes from all commits
bc7f179
260d054
2a29387
b2b3a10
cb82be4
7c6cc23
50632c7
5a4a6dc
31eca2d
030516e
fa93c66
82488fb
a1db61d
3a7ea49
21255cf
1a5e2b7
3c98fd7
5d227f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1500,6 +1500,16 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val V2_BUCKETING_SHUFFLE_ENABLED = | ||
buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled") | ||
.doc("During a storage-partitioned join, whether to allow to shuffle only one side." + | ||
"When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " + | ||
"only shuffle the other side. This optimization will reduce the amount of data that " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we make the algorithm smarter? If the other side is large, doing a Let's think of an extreme case: one side reports There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the |
||
s"needs to be shuffle. This config requires ${V2_BUCKETING_ENABLED.key} to be enabled") | ||
.version("4.0.0") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") | ||
.doc("The maximum number of buckets allowed.") | ||
.version("2.4.0") | ||
|
@@ -4877,6 +4887,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { | |
def v2BucketingPartiallyClusteredDistributionEnabled: Boolean = | ||
getConf(SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED) | ||
|
||
def v2BucketingShuffleEnabled: Boolean = | ||
getConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED) | ||
|
||
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = | ||
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.