-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-38237][SQL][SS] Allow ClusteredDistribution
to require full clustering keys
#35574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ed5307f
27244d2
22696bc
89b6c52
e39c705
3f8e804
4897a86
2a6b4bb
39b0504
9463fc8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,9 +72,14 @@ case object AllTuples extends Distribution { | |
/** | ||
* Represents data where tuples that share the same values for the `clustering` | ||
* [[Expression Expressions]] will be co-located in the same partition. | ||
* | ||
* @param requireAllClusterKeys When true, `Partitioning` which satisfies this distribution, | ||
* must match all `clustering` expressions in the same ordering. | ||
*/ | ||
case class ClusteredDistribution( | ||
clustering: Seq[Expression], | ||
requireAllClusterKeys: Boolean = SQLConf.get.getConf( | ||
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION), | ||
requiredNumPartitions: Option[Int] = None) extends Distribution { | ||
require( | ||
clustering != Nil, | ||
|
@@ -88,6 +93,19 @@ case class ClusteredDistribution( | |
s"the actual number of partitions is $numPartitions.") | ||
HashPartitioning(clustering, numPartitions) | ||
} | ||
|
||
/** | ||
* Checks if `expressions` match all `clustering` expressions in the same ordering. | ||
* | ||
* `Partitioning` should call this to check its expressions when `requireAllClusterKeys` | ||
* is set to true. | ||
*/ | ||
def areAllClusterKeysMatched(expressions: Seq[Expression]): Boolean = { | ||
expressions.length == clustering.length && | ||
expressions.zip(clustering).forall { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For aggregate or window, I'm not sure whether we have any reordering mechanism similar to join. If not, this could be very limited? for instance if users have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This goes back to same discussion here - #35574 (comment) . I am more inclined to require same ordering. But if quorum of folks here think we should relax, then I am also fine. cc @cloud-fan. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I forgot this is discussed already (and I participated in the thread too... 😓 ). I'm fine with more strict ordering to start with. |
||
case (l, r) => l.semanticEquals(r) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -261,8 +279,14 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) | |
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { | ||
case (l, r) => l.semanticEquals(r) | ||
} | ||
case ClusteredDistribution(requiredClustering, _) => | ||
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) => | ||
if (requireAllClusterKeys) { | ||
// Checks `HashPartitioning` is partitioned on exactly same clustering keys of | ||
// `ClusteredDistribution`. | ||
c.areAllClusterKeysMatched(expressions) | ||
} else { | ||
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
} | ||
case _ => false | ||
} | ||
} | ||
|
@@ -322,8 +346,15 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) | |
// `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. | ||
val minSize = Seq(requiredOrdering.size, ordering.size).min | ||
requiredOrdering.take(minSize) == ordering.take(minSize) | ||
case ClusteredDistribution(requiredClustering, _) => | ||
ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) => | ||
val expressions = ordering.map(_.child) | ||
if (requireAllClusterKeys) { | ||
// Checks `RangePartitioning` is partitioned on exactly same clustering keys of | ||
// `ClusteredDistribution`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is less strict than the previous There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's less strict. Previously we don't allow |
||
c.areAllClusterKeysMatched(expressions) | ||
} else { | ||
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
} | ||
case _ => false | ||
} | ||
} | ||
|
@@ -524,10 +555,7 @@ case class HashShuffleSpec( | |
// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all | ||
// the join keys. | ||
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) { | ||
partitioning.expressions.length == distribution.clustering.length && | ||
partitioning.expressions.zip(distribution.clustering).forall { | ||
case (l, r) => l.semanticEquals(r) | ||
} | ||
distribution.areAllClusterKeysMatched(partitioning.expressions) | ||
} else { | ||
true | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's less breaking to put new parameter at the end, so that some caller-side code can remain unchanged.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - I agree for the point of caller-side code unchanged. I guess it's just feeling more coherent for others to read and understand code, when putting
clustering
andrequireAllClusterKeys
together. This was raised by #35574 (comment) by @HeartSaVioR as well. I am curious would adding the field in the middle here break other external library depending on Spark? I guess otherwise reviewers already paid the cost of time to review this PR, so not sure how important to change the caller-side code back. Just want to understand more here and I am open to change back.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK maybe it's not a big deal