@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType}
22
22
23
23
/**
24
24
* Specifies how tuples that share common expressions will be distributed when a query is executed
25
- * in parallel on many machines. Distribution can be used to refer to two distinct physical
26
- * properties:
27
- * - Inter-node partitioning of data: In this case the distribution describes how tuples are
28
- * partitioned across physical machines in a cluster. Knowing this property allows some
29
- * operators (e.g., Aggregate) to perform partition local operations instead of global ones.
30
- * - Intra-partition ordering of data: In this case the distribution describes guarantees made
31
- * about how tuples are distributed within a single partition.
25
+ * in parallel on many machines.
26
+ *
27
+ * Distribution here refers to inter-node partitioning of data:
28
+ * - The distribution describes how tuples are partitioned across physical machines in a cluster.
29
+ * Knowing this property allows some operators (e.g., Aggregate) to perform partition local
30
+ * operations instead of global ones.
32
31
*/
33
32
sealed trait Distribution {
34
33
/**
@@ -70,9 +69,7 @@ case object AllTuples extends Distribution {
70
69
71
70
/**
72
71
* Represents data where tuples that share the same values for the `clustering`
73
- * [[Expression Expressions ]] will be co-located. Based on the context, this
74
- * can mean such tuples are either co-located in the same partition or they will be contiguous
75
- * within a single partition.
72
+ * [[Expression Expressions ]] will be co-located in the same partition.
76
73
*/
77
74
case class ClusteredDistribution (
78
75
clustering : Seq [Expression ],
@@ -118,10 +115,12 @@ case class HashClusteredDistribution(
118
115
119
116
/**
120
117
* Represents data where tuples have been ordered according to the `ordering`
121
- * [[Expression Expressions ]]. This is a strictly stronger guarantee than
122
- * [[ClusteredDistribution ]] as an ordering will ensure that tuples that share the
123
- * same value for the ordering expressions are contiguous and will never be split across
124
- * partitions.
118
+ * [[Expression Expressions ]]. Its requirement is defined as the following:
119
+ * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or
120
+ * equal to any row in the first partition, according to the `ordering` expressions.
121
+ *
122
+ * In other words, this distribution requires the rows to be ordered across partitions, but not
123
+ * necessarily within a partition.
125
124
*/
126
125
case class OrderedDistribution (ordering : Seq [SortOrder ]) extends Distribution {
127
126
require(
@@ -241,12 +240,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
241
240
242
241
/**
243
242
* Represents a partitioning where rows are split across partitions based on some total ordering of
244
- * the expressions specified in `ordering`. When data is partitioned in this manner the following
245
- * two conditions are guaranteed to hold:
246
- * - All row where the expressions in `ordering` evaluate to the same values will be in the same
247
- * partition.
248
- * - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
249
- * that are in between `min` and `max` in this `ordering` will reside in this partition .
243
+ * the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees:
244
+ * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than
245
+ * any row in the first partition, according to the `ordering` expressions.
246
+ *
247
+ * This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as
248
+ * there is no overlap between partitions .
250
249
*
251
250
* This class extends expression primarily so that transformations over expression will descend
252
251
* into its child.
@@ -262,6 +261,15 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
262
261
super .satisfies0(required) || {
263
262
required match {
264
263
case OrderedDistribution (requiredOrdering) =>
264
+ // If `ordering` is a prefix of `requiredOrdering`:
265
+ // - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is
266
+ // larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So
267
+ // `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, b, c)`.
268
+ //
269
+ // If `requiredOrdering` is a prefix of `ordering`:
270
+ // - Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. If a row is
271
+ // larger than another row w.r.t. [a, b, c], this row will not be smaller w.r.t.
272
+ // [a. b]. So `RangePartitioning(a, b, c)` satisfy `OrderedDistribution(a, b)`.
265
273
val minSize = Seq (requiredOrdering.size, ordering.size).min
266
274
requiredOrdering.take(minSize) == ordering.take(minSize)
267
275
case ClusteredDistribution (requiredClustering, _) =>
0 commit comments