-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26297][SQL] improve the doc of Distribution/Partitioning #23249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* partition. They can also across partitions, but these partitions must be contiguous. For example, | ||
* if value `v` is the biggest values in partition 3, it can also be in partition 4 as the smallest | ||
* value. If all the values in partition 4 are `v`, it can also be in partition 5 as the smallest | ||
* value. | ||
*/ | ||
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition.
Actually we already use this knowledge to optimize RangePartitioning.satisfy
* Distribution here refers to inter-node partitioning of data: | ||
* The distribution describes how tuples are partitioned across physical machines in a cluster. | ||
* Knowing this property allows some operators (e.g., Aggregate) to perform partition local | ||
* operations instead of global ones. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to mention that there's another related but orthogonal physical property, i.e., the intra-partition ordering and maybe list an example here how operators take advantage of these two physical properties together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally remove everything about intra-partition, as we never leverage it and no partitioning provides this property. Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand that partitioning has nothing to do with intra-partition ordering at all. And it was wrong to include intra-partition ordering as part of the distribution properties. But I was thinking mentioning ordering as a side note would probably help ppl understand better how some operators work. Or maybe here's not the best place to put it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for ordering, I think people can look at OrderedDistribution
?
Test build #99775 has finished for PR 23249 at commit
|
Test build #99779 has finished for PR 23249 at commit
|
@@ -243,10 +248,19 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) | |||
* Represents a partitioning where rows are split across partitions based on some total ordering of | |||
* the expressions specified in `ordering`. When data is partitioned in this manner the following |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add "," after "this manner".
e20dba6
to
130bc95
Compare
Test build #99814 has finished for PR 23249 at commit
|
Test build #99812 has finished for PR 23249 at commit
|
retest this please |
* partitions. | ||
* [[Expression Expressions]]. Its requirement is defined as the following: | ||
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or | ||
* equal to any row in the first partition, according to the `ordering` expressions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why here we need this equality? Can we just have all the rows in the second partition must be larger than any row in the first partition? Do we need or use such equality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that, only sort requires OrderedDistribution
, and global sort doesn't care if there are equal-rows across partitions.
Here is a definition of the requirement. When designing protocols, it's important to make the requirement as weak as possible, and make guarantees as strong as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global sort (actually the RangePartitioner
) currently guarantees that all rows in partition p + 1
are larger than the rows in partition p
. I don't think we should relax this, besides collect limit there aren't any use cases I can think of that could work with this relaxed requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us keep the semantics unchanged at this moment. If needed, in the future, we can either introduce a new distribution type or change the existing types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell We need this relaxed requirement, otherwise we have to remove the optimization here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not change the semantic, I just correct the comment to represent what the current semantic is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, @cloud-fan is right about the "or equal to" part is necessary for RangePartitioning(a, b, c) satisfying OrderedDistribution(a, b).
Test build #99821 has finished for PR 23249 at commit
|
// If `ordering` is a prefix of `requiredOrdering`: | ||
// - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is | ||
// larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So | ||
// `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, b, c)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit satisfy
-> satisfies
* [[Expression Expressions]] will be co-located. Based on the context, this | ||
* can mean such tuples are either co-located in the same partition or they will be contiguous | ||
* within a single partition. | ||
* [[Expression Expressions]] will be co-located in the same partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is [[Expression Expressions]]
mean? Should it be [[Expression]]s
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, this is actually a way to name the link. I have learned something here :)...
Test build #99912 has finished for PR 23249 at commit
|
* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows | ||
* that are in between `min` and `max` in this `ordering` will reside in this partition. | ||
* the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees: | ||
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit don't use bullets if you have only one of them
* Distribution here refers to inter-node partitioning of data: | ||
* - The distribution describes how tuples are partitioned across physical machines in a cluster. | ||
* Knowing this property allows some operators (e.g., Aggregate) to perform partition local | ||
* operations instead of global ones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about?
Distribution here refers to inter-node partitioning of data. That is, it describes how tuples are partitioned across physical machines in a cluster. Knowing this property allows some operators (e.g., Aggregate) to perform partition local operations instead of global ones.
Test build #99941 has finished for PR 23249 at commit
|
// `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`. | ||
// | ||
// If `requiredOrdering` is a prefix of `ordering`: | ||
// - Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. If a row is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"If a row is ... satisfies ..." => According to the RangePartitioning definition, any [a, b, c] in a previous partition must be smaller than any [a, b, c] in the following partition, which means for any [a1, b1, c1] in the previous partition, [a2, b2, c2] in the following partition, either 1) [a1, b1] is smaller than [a2, b2]; or 2) [a1, b1] is equal to [a2, b2] and c1 smaller is than c2. So RangePartitioning(a, b, c)
satisfies OrderedDistribution(a, b)
which requires any [a1, b1] from a previous partition smaller than any [a2, b2] from a following partition."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except one comment.
Test build #100005 has finished for PR 23249 at commit
|
retest this please. |
Test build #100012 has finished for PR 23249 at commit
|
retest this please |
Test build #100021 has finished for PR 23249 at commit
|
retest this please |
Test build #100029 has finished for PR 23249 at commit
|
This is a comment-only PR and the Spark R test is a known issue, I'm merging it to master, thanks! |
## What changes were proposed in this pull request? Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them: 1. `Distribution` never have intra-partition requirement 2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition. 3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions. ## How was this patch tested? comment-only PR. Closes apache#23249 from cloud-fan/doc. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them: 1. `Distribution` never have intra-partition requirement 2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition. 3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions. ## How was this patch tested? comment-only PR. Closes apache#23249 from cloud-fan/doc. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Some documents of
Distribution/Partitioning
are stale and misleading, this PR fixes them:Distribution
never have intra-partition requirementOrderedDistribution
does not require tuples that share the same value being colocated in the same partition.RangePartitioning
can provide a weaker guarantee for a prefix of itsordering
expressions.How was this patch tested?
comment-only PR.