Skip to content

[SPARK-26297][SQL] improve the doc of Distribution/Partitioning #23249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Dec 6, 2018

What changes were proposed in this pull request?

Some documents of Distribution/Partitioning are stale and misleading, this PR fixes them:

  1. Distribution never have intra-partition requirement
  2. OrderedDistribution does not require tuples that share the same value being colocated in the same partition.
  3. RangePartitioning can provide a weaker guarantee for a prefix of its ordering expressions.

How was this patch tested?

comment-only PR.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Dec 6, 2018

* partition. They can also across partitions, but these partitions must be contiguous. For example,
* if value `v` is the biggest values in partition 3, it can also be in partition 4 as the smallest
* value. If all the values in partition 4 are `v`, it can also be in partition 5 as the smallest
* value.
*/
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition.

Actually we already use this knowledge to optimize RangePartitioning.satisfy

* Distribution here refers to inter-node partitioning of data:
* The distribution describes how tuples are partitioned across physical machines in a cluster.
* Knowing this property allows some operators (e.g., Aggregate) to perform partition local
* operations instead of global ones.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to mention that there's another related but orthogonal physical property, i.e., the intra-partition ordering and maybe list an example here how operators take advantage of these two physical properties together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally remove everything about intra-partition, as we never leverage it and no partitioning provides this property. Did I miss something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that partitioning has nothing to do with intra-partition ordering at all. And it was wrong to include intra-partition ordering as part of the distribution properties. But I was thinking mentioning ordering as a side note would probably help ppl understand better how some operators work. Or maybe here's not the best place to put it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for ordering, I think people can look at OrderedDistribution?

@SparkQA
Copy link

SparkQA commented Dec 6, 2018

Test build #99775 has finished for PR 23249 at commit 24ea28a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 6, 2018

Test build #99779 has finished for PR 23249 at commit 3df1e44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -243,10 +248,19 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
* Represents a partitioning where rows are split across partitions based on some total ordering of
* the expressions specified in `ordering`. When data is partitioned in this manner the following
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add "," after "this manner".

@cloud-fan cloud-fan force-pushed the doc branch 3 times, most recently from e20dba6 to 130bc95 Compare December 7, 2018 07:41
@SparkQA
Copy link

SparkQA commented Dec 7, 2018

Test build #99814 has finished for PR 23249 at commit 130bc95.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2018

Test build #99812 has finished for PR 23249 at commit 04be19e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

* partitions.
* [[Expression Expressions]]. Its requirement is defined as the following:
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or
* equal to any row in the first partition, according to the `ordering` expressions.
Copy link
Member

@viirya viirya Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here we need this equality? Can we just have all the rows in the second partition must be larger than any row in the first partition? Do we need or use such equality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that, only sort requires OrderedDistribution, and global sort doesn't care if there are equal-rows across partitions.

Here is a definition of the requirement. When designing protocols, it's important to make the requirement as weak as possible, and make guarantees as strong as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global sort (actually the RangePartitioner) currently guarantees that all rows in partition p + 1 are larger than the rows in partition p. I don't think we should relax this, besides collect limit there aren't any use cases I can think of that could work with this relaxed requirement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us keep the semantics unchanged at this moment. If needed, in the future, we can either introduce a new distribution type or change the existing types.

Copy link
Contributor Author

@cloud-fan cloud-fan Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell We need this relaxed requirement, otherwise we have to remove the optimization here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change the semantic, I just correct the comment to represent what the current semantic is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @cloud-fan is right about the "or equal to" part is necessary for RangePartitioning(a, b, c) satisfying OrderedDistribution(a, b).

@SparkQA
Copy link

SparkQA commented Dec 7, 2018

Test build #99821 has finished for PR 23249 at commit 130bc95.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// If `ordering` is a prefix of `requiredOrdering`:
// - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is
// larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So
// `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, b, c)`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit satisfy -> satisfies

* [[Expression Expressions]] will be co-located. Based on the context, this
* can mean such tuples are either co-located in the same partition or they will be contiguous
* within a single partition.
* [[Expression Expressions]] will be co-located in the same partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is [[Expression Expressions]] mean? Should it be [[Expression]]s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, this is actually a way to name the link. I have learned something here :)...

@SparkQA
Copy link

SparkQA commented Dec 10, 2018

Test build #99912 has finished for PR 23249 at commit adfcec4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
* that are in between `min` and `max` in this `ordering` will reside in this partition.
* the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees:
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit don't use bullets if you have only one of them

* Distribution here refers to inter-node partitioning of data:
* - The distribution describes how tuples are partitioned across physical machines in a cluster.
* Knowing this property allows some operators (e.g., Aggregate) to perform partition local
* operations instead of global ones.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

Distribution here refers to inter-node partitioning of data. That is, it describes how tuples are partitioned across physical machines in a cluster. Knowing this property allows some operators (e.g., Aggregate) to perform partition local operations instead of global ones.

@SparkQA
Copy link

SparkQA commented Dec 11, 2018

Test build #99941 has finished for PR 23249 at commit ddb82c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
//
// If `requiredOrdering` is a prefix of `ordering`:
// - Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. If a row is
Copy link
Contributor

@maryannxue maryannxue Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If a row is ... satisfies ..." => According to the RangePartitioning definition, any [a, b, c] in a previous partition must be smaller than any [a, b, c] in the following partition, which means for any [a1, b1, c1] in the previous partition, [a2, b2, c2] in the following partition, either 1) [a1, b1] is smaller than [a2, b2]; or 2) [a1, b1] is equal to [a2, b2] and c1 smaller is than c2. So RangePartitioning(a, b, c) satisfies OrderedDistribution(a, b) which requires any [a1, b1] from a previous partition smaller than any [a2, b2] from a following partition."

Copy link
Contributor

@maryannxue maryannxue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except one comment.

@SparkQA
Copy link

SparkQA commented Dec 12, 2018

Test build #100005 has finished for PR 23249 at commit cb94add.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Dec 12, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 12, 2018

Test build #100012 has finished for PR 23249 at commit cb94add.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 12, 2018

Test build #100021 has finished for PR 23249 at commit cb94add.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 12, 2018

Test build #100029 has finished for PR 23249 at commit cb94add.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

This is a comment-only PR and the Spark R test is a known issue, I'm merging it to master, thanks!

@asfgit asfgit closed this in 05b68d5 Dec 13, 2018
holdenk pushed a commit to holdenk/spark that referenced this pull request Jan 5, 2019
## What changes were proposed in this pull request?

Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them:
1. `Distribution` never have intra-partition requirement
2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition.
3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions.

## How was this patch tested?

comment-only PR.

Closes apache#23249 from cloud-fan/doc.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them:
1. `Distribution` never have intra-partition requirement
2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition.
3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions.

## How was this patch tested?

comment-only PR.

Closes apache#23249 from cloud-fan/doc.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants