-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-28356][SQL] Do not reduce the number of partitions for repartition in adaptive execution #25121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan , @gczsjdy , @justinuang |
Test build #107555 has finished for PR 25121 at commit
|
@@ -312,6 +312,16 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val REDUCE_POST_SHUFFLE_PARTITIONS_FOR_REPARTITION = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably won't add this config. If users call Dataset#repartition
, we have to respect it and not change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially didn't add this config. Later I found with that change, the test("Union two datasets with different pre-shuffle partition number")
won't test what we want to test, because it was written using repartition. Probably I can rewrite that test without using repartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove that test. Excluding repartition, the pre-shuffle num partitions are always the same (200 by default).
We do need to have a test for repartition though.
@@ -43,7 +43,8 @@ import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordCo | |||
*/ | |||
case class ShuffleExchangeExec( | |||
override val outputPartitioning: Partitioning, | |||
child: SparkPlan) extends Exchange { | |||
child: SparkPlan, | |||
supportAdaptive: Boolean = true) extends Exchange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we remove the config, I'd probably call this canChangeNumPartition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -140,6 +140,8 @@ case class ShuffleQueryStageExec( | |||
case _ => | |||
} | |||
} | |||
|
|||
def supportAdaptive: Boolean = plan.supportAdaptive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:nit -> exchangeSupportAdaptive
? Or canChangeNumPartition
QueryStage
is already a part of adaptive execution
@cloud-fan @gczsjdy , updated based on the comments. Thanks! |
Test build #107613 has finished for PR 25121 at commit
|
|
||
checkAnswer(resultDf, | ||
Seq((0), (0), (1), (1), (2), (2)).map(i => Row(i))) | ||
Seq((0), (1), (2)).map(i => Row(i))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just Seq(0, 1, 2)
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Good catch.
case stage: ShuffleQueryStageExec => stage | ||
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage | ||
} | ||
if (!shuffleStages.forall(_.canChangeNumPartition)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment to explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
@@ -140,6 +140,8 @@ case class ShuffleQueryStageExec( | |||
case _ => | |||
} | |||
} | |||
|
|||
def canChangeNumPartition: Boolean = plan.canChangeNumPartition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? It only saves typing 4 characters....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
LGTM |
Test build #107696 has finished for PR 25121 at commit
|
thanks, merging to master! |
…tion in adaptive execution ## What changes were proposed in this pull request? Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition. ## How was this patch tested? New tests added. Closes apache#25121 from carsonwang/AE_repartition. Authored-by: Carson Wang <carson.wang@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct | ||
|
||
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { | ||
if (validMetrics.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carsonwang is it safe to remove distinctNumPreShufflePartitions.length == 1
from here? I think @hvanhovell's comment (https://github.com/apache/spark/pull/24978/files#r299396944) still applies here about Union
. I run into an issue with my plan:
Union
:- Project [id_key#236, true AS row_type#249, link#232]
: +- Filter (isnotnull(min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246) AND (id_key#236 = min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246))
: +- Window [min(id_key#236) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246]
: +- ShuffleQueryStage 5
: +- Exchange SinglePartition, true
: +- *(7) Project [id_key#236, link#232]
: +- *(7) SortMergeJoin [link#237], [link#232], Inner, (id_key#236 > id_key#230)
: :- *(5) Sort [link#237 ASC NULLS FIRST], false, 0
: : +- CoalescedShuffleReader [0]
: : +- ShuffleQueryStage 0
: : +- Exchange hashpartitioning(link#237, 5), true
: : +- *(1) Project [col1#224 AS id_key#236, col2#225 AS link#237]
: : +- *(1) LocalTableScan [col1#224, col2#225]
: +- *(6) Sort [link#232 ASC NULLS FIRST], false, 0
: +- CoalescedShuffleReader [0]
: +- ShuffleQueryStage 1
: +- Exchange hashpartitioning(link#232, 5), true
: +- *(2) Project [id_key#230, link#232]
: +- *(2) Filter (isnotnull(link#232) AND isnotnull(id_key#230))
: +- *(2) Scan RecursiveReference iter[id_key#230,row_type#231,link#232]
+- Project [id_key#240, new AS new#256, link#241]
+- SortMergeJoin [id_key#238], [id_key#240], Inner
:- Sort [id_key#238 ASC NULLS FIRST], false, 0
: +- ShuffleQueryStage 4
: +- Exchange hashpartitioning(id_key#238, 5), true
: +- *(4) Project [id_key#238]
: +- *(4) Filter (isnotnull(min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247) AND (id_key#238 = min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247))
: +- Window [min(id_key#238) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247]
: +- CoalescedShuffleReader [0]
: +- ShuffleQueryStage 2
: +- Exchange SinglePartition, true
: +- LocalTableScan <empty>, [id_key#238]
+- Sort [id_key#240 ASC NULLS FIRST], false, 0
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(id_key#240, 5), true
+- *(3) Project [col1#228 AS id_key#240, col2#229 AS link#241]
+- *(3) LocalTableScan [col1#228, col2#229]
where ShuffleQueryStage 5
conflicts with ShuffleQueryStage 4
and ShuffleQueryStage 3
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah SinglePartition
is an exception. So it's still possible to hit distinctNumPreShufflePartitions.length > 1
here. Let's add back this check @carsonwang @maryannxue
thanks for reporting it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened a small PR here: #25479 as a follow-up, please let me know if this requires a new ticket.
…tion in adaptive execution ## What changes were proposed in this pull request? Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition. ## How was this patch tested? New tests added. Closes apache#25121 from carsonwang/AE_repartition. Authored-by: Carson Wang <carson.wang@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition.
How was this patch tested?
New tests added.