@@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
7575 def clustering : Set [Expression ] = ordering.map(_.child).toSet
7676}
7777
78+ /**
79+ * Describes how an operator's output is split across partitions. The `compatibleWith`,
80+ * `guarantees`, and `satisfies` methods describe relationships between child partitionings,
81+ * target partitionings, and [[Distribution ]]s. These relations are described more precisely in
82+ * their individual method docs, but at a high level:
83+ *
84+ * - `satisfies` is a relationship between partitionings and distributions.
85+ * - `compatibleWith` is relationships between an operator's child output partitionings.
86+ * - `guarantees` is a relationship between a child's existing output partitioning and a target
87+ * output partitioning.
88+ *
89+ * Diagrammatically:
90+ *
91+ * +--------------+
92+ * | Distribution |
93+ * +--------------+
94+ * ^
95+ * |
96+ * satisfies
97+ * |
98+ * +--------------+ +--------------+
99+ * | Child | | Target |
100+ * +----| Partitioning |----guarantees--->| Partitioning |
101+ * | +--------------+ +--------------+
102+ * | ^
103+ * | |
104+ * | compatibleWith
105+ * | |
106+ * +------------+
107+ *
108+ */
78109sealed trait Partitioning {
79110 /** Returns the number of partitions that the data is split across */
80111 val numPartitions : Int
@@ -90,9 +121,66 @@ sealed trait Partitioning {
90121 /**
91122 * Returns true iff we can say that the partitioning scheme of this [[Partitioning ]]
92123 * guarantees the same partitioning scheme described by `other`.
124+ *
125+ * Compatibility of partitionings is only checked for operators that have multiple children
126+ * and that require a specific child output [[Distribution ]], such as joins.
127+ *
128+ * Intuitively, partitionings are compatible if they route the same partitioning key to the same
129+ * partition. For instance, two hash partitionings are only compatible if they produce the same
130+ * number of output partitionings and hash records according to the same hash function and
131+ * same partitioning key schema.
132+ *
133+ * Put another way, two partitionings are compatible with each other if they satisfy all of the
134+ * same distribution guarantees.
93135 */
94- // TODO: Add an example once we have the `nullSafe` concept.
95- def guarantees (other : Partitioning ): Boolean
136+ def compatibleWith (other : Partitioning ): Boolean
137+
138+ /**
139+ * Returns true iff we can say that the partitioning scheme of this [[Partitioning ]] guarantees
140+ * the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
141+ * the child's output according to `B` will be unnecessary. `guarantees` is used as a performance
142+ * optimization to allow the exchange planner to avoid redundant repartitionings. By default,
143+ * a partitioning only guarantees partitionings that are equal to itself (i.e. the same number
144+ * of partitions, same strategy (range or hash), etc).
145+ *
146+ * In order to enable more aggressive optimization, this strict equality check can be relaxed.
147+ * For example, say that the planner needs to repartition all of an operator's children so that
148+ * they satisfy the [[AllTuples ]] distribution. One way to do this is to repartition all children
149+ * to have the [[SinglePartition ]] partitioning. If one of the operator's children already happens
150+ * to be hash-partitioned with a single partition then we do not need to re-shuffle this child;
151+ * this repartitioning can be avoided if a single-partition [[HashPartitioning ]] `guarantees`
152+ * [[SinglePartition ]].
153+ *
154+ * The SinglePartition example given above is not particularly interesting; guarantees' real
155+ * value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion
156+ * of null-safe partitionings, under which partitionings can specify whether rows whose
157+ * partitioning keys contain null values will be grouped into the same partition or whether they
158+ * will have an unknown / random distribution. If a partitioning does not require nulls to be
159+ * clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered
160+ * partitioning. The converse is not true, however: a partitioning which clusters nulls cannot
161+ * be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a
162+ * symmetric relation.
163+ *
164+ * Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows
165+ * produced by `A` could have also been produced by `B`.
166+ */
167+ def guarantees (other : Partitioning ): Boolean = this == other
168+ }
169+
170+ object Partitioning {
171+ def allCompatible (partitionings : Seq [Partitioning ]): Boolean = {
172+ // Note: this assumes transitivity
173+ partitionings.sliding(2 ).map {
174+ case Seq (a) => true
175+ case Seq (a, b) =>
176+ if (a.numPartitions != b.numPartitions) {
177+ assert(! a.compatibleWith(b) && ! b.compatibleWith(a))
178+ false
179+ } else {
180+ a.compatibleWith(b) && b.compatibleWith(a)
181+ }
182+ }.forall(_ == true )
183+ }
96184}
97185
98186case class UnknownPartitioning (numPartitions : Int ) extends Partitioning {
@@ -101,6 +189,8 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
101189 case _ => false
102190 }
103191
192+ override def compatibleWith (other : Partitioning ): Boolean = false
193+
104194 override def guarantees (other : Partitioning ): Boolean = false
105195}
106196
@@ -109,21 +199,9 @@ case object SinglePartition extends Partitioning {
109199
110200 override def satisfies (required : Distribution ): Boolean = true
111201
112- override def guarantees (other : Partitioning ): Boolean = other match {
113- case SinglePartition => true
114- case _ => false
115- }
116- }
117-
118- case object BroadcastPartitioning extends Partitioning {
119- val numPartitions = 1
202+ override def compatibleWith (other : Partitioning ): Boolean = other.numPartitions == 1
120203
121- override def satisfies (required : Distribution ): Boolean = true
122-
123- override def guarantees (other : Partitioning ): Boolean = other match {
124- case BroadcastPartitioning => true
125- case _ => false
126- }
204+ override def guarantees (other : Partitioning ): Boolean = other.numPartitions == 1
127205}
128206
129207/**
@@ -147,6 +225,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
147225 case _ => false
148226 }
149227
228+ override def compatibleWith (other : Partitioning ): Boolean = other match {
229+ case o : HashPartitioning =>
230+ this .clusteringSet == o.clusteringSet && this .numPartitions == o.numPartitions
231+ case _ => false
232+ }
233+
150234 override def guarantees (other : Partitioning ): Boolean = other match {
151235 case o : HashPartitioning =>
152236 this .clusteringSet == o.clusteringSet && this .numPartitions == o.numPartitions
@@ -185,6 +269,11 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
185269 case _ => false
186270 }
187271
272+ override def compatibleWith (other : Partitioning ): Boolean = other match {
273+ case o : RangePartitioning => this == o
274+ case _ => false
275+ }
276+
188277 override def guarantees (other : Partitioning ): Boolean = other match {
189278 case o : RangePartitioning => this == o
190279 case _ => false
@@ -228,6 +317,13 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
228317 override def satisfies (required : Distribution ): Boolean =
229318 partitionings.exists(_.satisfies(required))
230319
320+ /**
321+ * Returns true if any `partitioning` of this collection is compatible with
322+ * the given [[Partitioning ]].
323+ */
324+ override def compatibleWith (other : Partitioning ): Boolean =
325+ partitionings.exists(_.compatibleWith(other))
326+
231327 /**
232328 * Returns true if any `partitioning` of this collection guarantees
233329 * the given [[Partitioning ]].
0 commit comments