Skip to content

[SPARK-26297][SQL] improve the doc of Distribution/Partitioning #23249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import org.apache.spark.sql.types.{DataType, IntegerType}

/**
* Specifies how tuples that share common expressions will be distributed when a query is executed
* in parallel on many machines. Distribution can be used to refer to two distinct physical
* properties:
* - Inter-node partitioning of data: In this case the distribution describes how tuples are
* partitioned across physical machines in a cluster. Knowing this property allows some
* operators (e.g., Aggregate) to perform partition local operations instead of global ones.
* - Intra-partition ordering of data: In this case the distribution describes guarantees made
* about how tuples are distributed within a single partition.
* in parallel on many machines.
*
* Distribution here refers to inter-node partitioning of data. That is, it describes how tuples
* are partitioned across physical machines in a cluster. Knowing this property allows some
* operators (e.g., Aggregate) to perform partition local operations instead of global ones.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to mention that there's another related but orthogonal physical property, i.e., the intra-partition ordering and maybe list an example here how operators take advantage of these two physical properties together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally remove everything about intra-partition, as we never leverage it and no partitioning provides this property. Did I miss something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that partitioning has nothing to do with intra-partition ordering at all. And it was wrong to include intra-partition ordering as part of the distribution properties. But I was thinking mentioning ordering as a side note would probably help ppl understand better how some operators work. Or maybe here's not the best place to put it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for ordering, I think people can look at OrderedDistribution?

sealed trait Distribution {
/**
Expand Down Expand Up @@ -70,9 +68,7 @@ case object AllTuples extends Distribution {

/**
* Represents data where tuples that share the same values for the `clustering`
* [[Expression Expressions]] will be co-located. Based on the context, this
* can mean such tuples are either co-located in the same partition or they will be contiguous
* within a single partition.
* [[Expression Expressions]] will be co-located in the same partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is [[Expression Expressions]] mean? Should it be [[Expression]]s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, this is actually a way to name the link. I have learned something here :)...

*/
case class ClusteredDistribution(
clustering: Seq[Expression],
Expand Down Expand Up @@ -118,10 +114,12 @@ case class HashClusteredDistribution(

/**
* Represents data where tuples have been ordered according to the `ordering`
* [[Expression Expressions]]. This is a strictly stronger guarantee than
* [[ClusteredDistribution]] as an ordering will ensure that tuples that share the
* same value for the ordering expressions are contiguous and will never be split across
* partitions.
* [[Expression Expressions]]. Its requirement is defined as the following:
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or
* equal to any row in the first partition, according to the `ordering` expressions.
Copy link
Member

@viirya viirya Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here we need this equality? Can we just have all the rows in the second partition must be larger than any row in the first partition? Do we need or use such equality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that, only sort requires OrderedDistribution, and global sort doesn't care if there are equal-rows across partitions.

Here is a definition of the requirement. When designing protocols, it's important to make the requirement as weak as possible, and make guarantees as strong as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global sort (actually the RangePartitioner) currently guarantees that all rows in partition p + 1 are larger than the rows in partition p. I don't think we should relax this, besides collect limit there aren't any use cases I can think of that could work with this relaxed requirement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us keep the semantics unchanged at this moment. If needed, in the future, we can either introduce a new distribution type or change the existing types.

Copy link
Contributor Author

@cloud-fan cloud-fan Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell We need this relaxed requirement, otherwise we have to remove the optimization here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change the semantic, I just correct the comment to represent what the current semantic is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @cloud-fan is right about the "or equal to" part is necessary for RangePartitioning(a, b, c) satisfying OrderedDistribution(a, b).

*
* In other words, this distribution requires the rows to be ordered across partitions, but not
* necessarily within a partition.
*/
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition.

Actually we already use this knowledge to optimize RangePartitioning.satisfy

require(
Expand Down Expand Up @@ -241,12 +239,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)

/**
* Represents a partitioning where rows are split across partitions based on some total ordering of
* the expressions specified in `ordering`. When data is partitioned in this manner the following
* two conditions are guaranteed to hold:
* - All row where the expressions in `ordering` evaluate to the same values will be in the same
* partition.
* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
* that are in between `min` and `max` in this `ordering` will reside in this partition.
* the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees:
* Given any 2 adjacent partitions, all the rows of the second partition must be larger than any row
* in the first partition, according to the `ordering` expressions.
*
* This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as
* there is no overlap between partitions.
*
* This class extends expression primarily so that transformations over expression will descend
* into its child.
Expand All @@ -262,6 +260,22 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
// If `ordering` is a prefix of `requiredOrdering`:
// Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the
// RangePartitioning definition, any [a, b] in a previous partition must be smaller
// than any [a, b] in the following partition. This also means any [a, b, c] in a
// previous partition must be smaller than any [a, b, c] in the following partition.
// Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
//
// If `requiredOrdering` is a prefix of `ordering`:
// Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the
// RangePartitioning definition, any [a, b, c] in a previous partition must be smaller
// than any [a, b, c] in the following partition. If there is a [a1, b1] from a previous
// partition which is larger than a [a2, b2] from the following partition, then there
// must be a [a1, b1 c1] larger than [a2, b2, c2], which violates RangePartitioning
// definition. So it's guaranteed that, any [a, b] in a previous partition must not be
// greater(i.e. smaller or equal to) than any [a, b] in the following partition. Thus
// `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`.
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering, _) =>
Expand Down