File tree Expand file tree Collapse file tree 3 files changed +19
-4
lines changed
main/scala/org/apache/spark
test/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 3 files changed +19
-4
lines changed Original file line number Diff line number Diff line change @@ -2438,6 +2438,14 @@ package object config {
24382438 .stringConf
24392439 .createWithDefaultString(" PWR" )
24402440
2441+ private [spark] val EXECUTOR_DECOMMISSION_IGNORE_SHUFFLE_DATA_MIGRATED =
2442+ ConfigBuilder (" spark.executor.decommission.ignoreShuffleDataMigrated" )
2443+ .doc(" This configuration determines whether should ignore the shuffle data that " +
2444+ " has been migrated when it starts the decommissioning process." )
2445+ .version(" 3.5.0" )
2446+ .booleanConf
2447+ .createWithDefault(true )
2448+
24412449 private [spark] val STAGING_DIR = ConfigBuilder (" spark.yarn.stagingDir" )
24422450 .doc(" Staging directory used while submitting applications." )
24432451 .version(" 2.0.0" )
Original file line number Diff line number Diff line change @@ -1180,9 +1180,15 @@ private[spark] class TaskSetManager(
11801180 // could serve the shuffle outputs or the executor lost is caused by decommission (which
11811181 // can destroy the whole host). The reason is the next stage wouldn't be able to fetch the
11821182 // data from this dead executor so we would need to rerun these tasks on other executors.
1183- val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
1184- ! sched.sc.shuffleDriverComponents.supportsReliableStorage() &&
1185- (reason.isInstanceOf [ExecutorDecommission ] || ! env.blockManager.externalShuffleServiceEnabled)
1183+ val maybeShuffleMapOutputLoss =
1184+ if (conf.get(config.EXECUTOR_DECOMMISSION_IGNORE_SHUFFLE_DATA_MIGRATED )) {
1185+ isShuffleMapTasks && ! sched.sc.shuffleDriverComponents.supportsReliableStorage() &&
1186+ ! env.blockManager.externalShuffleServiceEnabled
1187+ } else {
1188+ isShuffleMapTasks && ! sched.sc.shuffleDriverComponents.supportsReliableStorage() &&
1189+ (reason.isInstanceOf [ExecutorDecommission ] ||
1190+ ! env.blockManager.externalShuffleServiceEnabled)
1191+ }
11861192 if (maybeShuffleMapOutputLoss && ! isZombie) {
11871193 for ((tid, info) <- taskInfos if info.executorId == execId) {
11881194 val index = info.index
Original file line number Diff line number Diff line change @@ -861,7 +861,8 @@ class TaskSetManagerSuite
861861 }
862862
863863 test(" SPARK-32653: Decommissioned host should not be used to calculate locality levels" ) {
864- sc = new SparkContext (" local" , " test" )
864+ val conf = new SparkConf ().set(config.EXECUTOR_DECOMMISSION_IGNORE_SHUFFLE_DATA_MIGRATED , false )
865+ sc = new SparkContext (" local" , " test" , conf)
865866 sched = new FakeTaskScheduler (sc)
866867 val backend = mock(classOf [SchedulerBackend ])
867868 doNothing().when(backend).reviveOffers()
You can’t perform that action at this time.
0 commit comments