-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26297][SQL] improve the doc of Distribution/Partitioning #23249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,13 +22,11 @@ import org.apache.spark.sql.types.{DataType, IntegerType} | |
|
||
/** | ||
* Specifies how tuples that share common expressions will be distributed when a query is executed | ||
* in parallel on many machines. Distribution can be used to refer to two distinct physical | ||
* properties: | ||
* - Inter-node partitioning of data: In this case the distribution describes how tuples are | ||
* partitioned across physical machines in a cluster. Knowing this property allows some | ||
* operators (e.g., Aggregate) to perform partition local operations instead of global ones. | ||
* - Intra-partition ordering of data: In this case the distribution describes guarantees made | ||
* about how tuples are distributed within a single partition. | ||
* in parallel on many machines. | ||
* | ||
* Distribution here refers to inter-node partitioning of data. That is, it describes how tuples | ||
* are partitioned across physical machines in a cluster. Knowing this property allows some | ||
* operators (e.g., Aggregate) to perform partition local operations instead of global ones. | ||
*/ | ||
sealed trait Distribution { | ||
/** | ||
|
@@ -70,9 +68,7 @@ case object AllTuples extends Distribution { | |
|
||
/** | ||
* Represents data where tuples that share the same values for the `clustering` | ||
* [[Expression Expressions]] will be co-located. Based on the context, this | ||
* can mean such tuples are either co-located in the same partition or they will be contiguous | ||
* within a single partition. | ||
* [[Expression Expressions]] will be co-located in the same partition. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm, this is actually a way to name the link. I have learned something here :)... |
||
*/ | ||
case class ClusteredDistribution( | ||
clustering: Seq[Expression], | ||
|
@@ -118,10 +114,12 @@ case class HashClusteredDistribution( | |
|
||
/** | ||
* Represents data where tuples have been ordered according to the `ordering` | ||
* [[Expression Expressions]]. This is a strictly stronger guarantee than | ||
* [[ClusteredDistribution]] as an ordering will ensure that tuples that share the | ||
* same value for the ordering expressions are contiguous and will never be split across | ||
* partitions. | ||
* [[Expression Expressions]]. Its requirement is defined as the following: | ||
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or | ||
* equal to any row in the first partition, according to the `ordering` expressions. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why here we need this equality? Can we just have all the rows in the second partition must be larger than any row in the first partition? Do we need or use such equality? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that, only sort requires Here is a definition of the requirement. When designing protocols, it's important to make the requirement as weak as possible, and make guarantees as strong as possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Global sort (actually the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us keep the semantics unchanged at this moment. If needed, in the future, we can either introduce a new distribution type or change the existing types. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hvanhovell We need this relaxed requirement, otherwise we have to remove the optimization here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not change the semantic, I just correct the comment to represent what the current semantic is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, @cloud-fan is right about the "or equal to" part is necessary for RangePartitioning(a, b, c) satisfying OrderedDistribution(a, b). |
||
* | ||
* In other words, this distribution requires the rows to be ordered across partitions, but not | ||
* necessarily within a partition. | ||
*/ | ||
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition. Actually we already use this knowledge to optimize |
||
require( | ||
|
@@ -241,12 +239,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) | |
|
||
/** | ||
* Represents a partitioning where rows are split across partitions based on some total ordering of | ||
* the expressions specified in `ordering`. When data is partitioned in this manner the following | ||
* two conditions are guaranteed to hold: | ||
* - All row where the expressions in `ordering` evaluate to the same values will be in the same | ||
* partition. | ||
* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows | ||
* that are in between `min` and `max` in this `ordering` will reside in this partition. | ||
* the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees: | ||
* Given any 2 adjacent partitions, all the rows of the second partition must be larger than any row | ||
* in the first partition, according to the `ordering` expressions. | ||
* | ||
* This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as | ||
* there is no overlap between partitions. | ||
* | ||
* This class extends expression primarily so that transformations over expression will descend | ||
* into its child. | ||
|
@@ -262,6 +260,22 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) | |
super.satisfies0(required) || { | ||
required match { | ||
case OrderedDistribution(requiredOrdering) => | ||
// If `ordering` is a prefix of `requiredOrdering`: | ||
// Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the | ||
// RangePartitioning definition, any [a, b] in a previous partition must be smaller | ||
// than any [a, b] in the following partition. This also means any [a, b, c] in a | ||
// previous partition must be smaller than any [a, b, c] in the following partition. | ||
// Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`. | ||
// | ||
// If `requiredOrdering` is a prefix of `ordering`: | ||
// Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the | ||
// RangePartitioning definition, any [a, b, c] in a previous partition must be smaller | ||
// than any [a, b, c] in the following partition. If there is a [a1, b1] from a previous | ||
// partition which is larger than a [a2, b2] from the following partition, then there | ||
// must be a [a1, b1 c1] larger than [a2, b2, c2], which violates RangePartitioning | ||
// definition. So it's guaranteed that, any [a, b] in a previous partition must not be | ||
// greater(i.e. smaller or equal to) than any [a, b] in the following partition. Thus | ||
// `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. | ||
val minSize = Seq(requiredOrdering.size, ordering.size).min | ||
requiredOrdering.take(minSize) == ordering.take(minSize) | ||
case ClusteredDistribution(requiredClustering, _) => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to mention that there's another related but orthogonal physical property, i.e., the intra-partition ordering and maybe list an example here how operators take advantage of these two physical properties together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally remove everything about intra-partition, as we never leverage it and no partitioning provides this property. Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand that partitioning has nothing to do with intra-partition ordering at all. And it was wrong to include intra-partition ordering as part of the distribution properties. But I was thinking mentioning ordering as a side note would probably help ppl understand better how some operators work. Or maybe here's not the best place to put it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for ordering, I think people can look at
OrderedDistribution
?