Skip to content

[SPARK-28356][SQL] Do not reduce the number of partitions for repartition in adaptive execution #25121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

carsonwang
Copy link
Contributor

What changes were proposed in this pull request?

Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition.

How was this patch tested?

New tests added.

@carsonwang
Copy link
Contributor Author

cc @cloud-fan , @gczsjdy , @justinuang

@SparkQA
Copy link

SparkQA commented Jul 12, 2019

Test build #107555 has finished for PR 25121 at commit 19e86bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -312,6 +312,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val REDUCE_POST_SHUFFLE_PARTITIONS_FOR_REPARTITION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably won't add this config. If users call Dataset#repartition, we have to respect it and not change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially didn't add this config. Later I found with that change, the test("Union two datasets with different pre-shuffle partition number") won't test what we want to test, because it was written using repartition. Probably I can rewrite that test without using repartition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove that test. Excluding repartition, the pre-shuffle num partitions are always the same (200 by default).

We do need to have a test for repartition though.

@@ -43,7 +43,8 @@ import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordCo
*/
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan) extends Exchange {
child: SparkPlan,
supportAdaptive: Boolean = true) extends Exchange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we remove the config, I'd probably call this canChangeNumPartition

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -140,6 +140,8 @@ case class ShuffleQueryStageExec(
case _ =>
}
}

def supportAdaptive: Boolean = plan.supportAdaptive
Copy link

@gczsjdy gczsjdy Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nit -> exchangeSupportAdaptive? Or canChangeNumPartition
QueryStage is already a part of adaptive execution

@carsonwang
Copy link
Contributor Author

@cloud-fan @gczsjdy , updated based on the comments. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 12, 2019

Test build #107613 has finished for PR 25121 at commit 3a2717e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


checkAnswer(resultDf,
Seq((0), (0), (1), (1), (2), (2)).map(i => Row(i)))
Seq((0), (1), (2)).map(i => Row(i)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just Seq(0, 1, 2), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Good catch.

case stage: ShuffleQueryStageExec => stage
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
}
if (!shuffleStages.forall(_.canChangeNumPartition)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment to explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@@ -140,6 +140,8 @@ case class ShuffleQueryStageExec(
case _ =>
}
}

def canChangeNumPartition: Boolean = plan.canChangeNumPartition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this? It only saves typing 4 characters....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

@gczsjdy
Copy link

gczsjdy commented Jul 15, 2019

LGTM

@SparkQA
Copy link

SparkQA commented Jul 15, 2019

Test build #107696 has finished for PR 25121 at commit ba1bda8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d1a1376 Jul 16, 2019
vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
…tion in adaptive execution

## What changes were proposed in this pull request?
Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition.

## How was this patch tested?
New tests added.

Closes apache#25121 from carsonwang/AE_repartition.

Authored-by: Carson Wang <carson.wang@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct

if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
if (validMetrics.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonwang is it safe to remove distinctNumPreShufflePartitions.length == 1 from here? I think @hvanhovell's comment (https://github.com/apache/spark/pull/24978/files#r299396944) still applies here about Union. I run into an issue with my plan:

Union
:- Project [id_key#236, true AS row_type#249, link#232]
:  +- Filter (isnotnull(min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246) AND (id_key#236 = min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246))
:     +- Window [min(id_key#236) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246]
:        +- ShuffleQueryStage 5
:           +- Exchange SinglePartition, true
:              +- *(7) Project [id_key#236, link#232]
:                 +- *(7) SortMergeJoin [link#237], [link#232], Inner, (id_key#236 > id_key#230)
:                    :- *(5) Sort [link#237 ASC NULLS FIRST], false, 0
:                    :  +- CoalescedShuffleReader [0]
:                    :     +- ShuffleQueryStage 0
:                    :        +- Exchange hashpartitioning(link#237, 5), true
:                    :           +- *(1) Project [col1#224 AS id_key#236, col2#225 AS link#237]
:                    :              +- *(1) LocalTableScan [col1#224, col2#225]
:                    +- *(6) Sort [link#232 ASC NULLS FIRST], false, 0
:                       +- CoalescedShuffleReader [0]
:                          +- ShuffleQueryStage 1
:                             +- Exchange hashpartitioning(link#232, 5), true
:                                +- *(2) Project [id_key#230, link#232]
:                                   +- *(2) Filter (isnotnull(link#232) AND isnotnull(id_key#230))
:                                      +- *(2) Scan RecursiveReference iter[id_key#230,row_type#231,link#232]
+- Project [id_key#240, new AS new#256, link#241]
   +- SortMergeJoin [id_key#238], [id_key#240], Inner
      :- Sort [id_key#238 ASC NULLS FIRST], false, 0
      :  +- ShuffleQueryStage 4
      :     +- Exchange hashpartitioning(id_key#238, 5), true
      :        +- *(4) Project [id_key#238]
      :           +- *(4) Filter (isnotnull(min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247) AND (id_key#238 = min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247))
      :              +- Window [min(id_key#238) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247]
      :                 +- CoalescedShuffleReader [0]
      :                    +- ShuffleQueryStage 2
      :                       +- Exchange SinglePartition, true
      :                          +- LocalTableScan <empty>, [id_key#238]
      +- Sort [id_key#240 ASC NULLS FIRST], false, 0
         +- ShuffleQueryStage 3
            +- Exchange hashpartitioning(id_key#240, 5), true
               +- *(3) Project [col1#228 AS id_key#240, col2#229 AS link#241]
                  +- *(3) LocalTableScan [col1#228, col2#229]

where ShuffleQueryStage 5 conflicts with ShuffleQueryStage 4 and ShuffleQueryStage 3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah SinglePartition is an exception. So it's still possible to hit distinctNumPreShufflePartitions.length > 1 here. Let's add back this check @carsonwang @maryannxue

thanks for reporting it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened a small PR here: #25479 as a follow-up, please let me know if this requires a new ticket.

j-baker pushed a commit to palantir/spark that referenced this pull request Jan 25, 2020
…tion in adaptive execution

## What changes were proposed in this pull request?
Adaptive execution reduces the number of post-shuffle partitions at runtime, even for shuffles caused by repartition. However, the user likely wants to get the desired number of partition when he calls repartition even in adaptive execution. This PR adds an internal config to control this and by default adaptive execution will not change the number of post-shuffle partition for repartition.

## How was this patch tested?
New tests added.

Closes apache#25121 from carsonwang/AE_repartition.

Authored-by: Carson Wang <carson.wang@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants