@@ -21,52 +21,27 @@ import scala.collection.mutable.ArrayBuffer
2121
2222import org .apache .spark .MapOutputStatistics
2323import org .apache .spark .internal .Logging
24- import org .apache .spark .rdd .RDD
25- import org .apache .spark .sql .execution .{ShuffledRowRDD , SparkPlan }
2624
2725/**
2826 * A coordinator used to determines how we shuffle data between stages generated by Spark SQL.
2927 * Right now, the work of this coordinator is to determine the number of post-shuffle partitions
3028 * for a stage that needs to fetch shuffle data from one or multiple stages.
3129 *
32- * A coordinator is constructed with three parameters, `numExchanges`,
33- * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
34- * - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec ]]s that will be
35- * registered to this coordinator. So, when we start to do any actual work, we have a way to
36- * make sure that we have got expected number of [[ShuffleExchangeExec ]]s.
30+ * A coordinator is constructed with two parameters, `targetPostShuffleInputSize`,
31+ * and `minNumPostShufflePartitions`.
3732 * - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
3833 * input data size. With this parameter, we can estimate the number of post-shuffle partitions.
3934 * This parameter is configured through
4035 * `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`.
41- * - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator
42- * will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle
43- * partitions.
44- *
45- * The workflow of this coordinator is described as follows:
46- * - Before the execution of a [[SparkPlan ]], for a [[ShuffleExchangeExec ]] operator,
47- * if an [[ExchangeCoordinator ]] is assigned to it, it registers itself to this coordinator.
48- * This happens in the `doPrepare` method.
49- * - Once we start to execute a physical plan, a [[ShuffleExchangeExec ]] registered to this
50- * coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
51- * [[ShuffledRowRDD ]].
52- * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec ]]
53- * will immediately get its corresponding post-shuffle [[ShuffledRowRDD ]].
54- * - If this coordinator has not made the decision on how to shuffle data, it will ask those
55- * registered [[ShuffleExchangeExec ]]s to submit their pre-shuffle stages. Then, based on the
56- * size statistics of pre-shuffle partitions, this coordinator will determine the number of
57- * post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
58- * to a single post-shuffle partition whenever necessary.
59- * - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD ]]s for all registered
60- * [[ShuffleExchangeExec ]]s. So, when a [[ShuffleExchangeExec ]] calls `postShuffleRDD`, this
61- * coordinator can lookup the corresponding [[RDD ]].
36+ * - `minNumPostShufflePartitions` is used to make sure that there are at least
37+ * `minNumPostShufflePartitions` post-shuffle partitions.
6238 *
6339 * The strategy used to determine the number of post-shuffle partitions is described as follows.
6440 * To determine the number of post-shuffle partitions, we have a target input size for a
65- * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
66- * corresponding to the registered [[ShuffleExchangeExec ]]s, we will do a pass of those statistics
67- * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
68- * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
69- * greater than the target size.
41+ * post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do
42+ * a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single
43+ * post-shuffle partition until adding another pre-shuffle partition would cause the size of a
44+ * post-shuffle partition to be greater than the target size.
7045 *
7146 * For example, we have two stages with the following pre-shuffle partition size statistics:
7247 * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB]
0 commit comments