Skip to content

Commit 11e4adc

Browse files
committed
@holdenk's comments around comments and naming
1 parent dfc124c commit 11e4adc

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ private[spark] class CoarseGrainedExecutorBackend(
294294
override def run(): Unit = {
295295
var lastTaskRunningTime = System.nanoTime()
296296
val sleep_time = 1000 // 1s
297+
// This config is internal and only used by unit tests to force an executor
298+
// to hang around for longer when decommissioned.
297299
val initialSleepMillis = env.conf.getInt(
298300
"spark.executor.decommission.initial.sleep.millis", sleep_time)
299301
if (initialSleepMillis > 0) {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,7 +1847,13 @@ private[spark] class DAGScheduler(
18471847
fileLost = true,
18481848
hostToUnregisterOutputs = hostToUnregisterOutputs,
18491849
maybeEpoch = Some(task.epoch),
1850-
ignoreShuffleVersion = isHostDecommissioned)
1850+
// shuffleFileLostEpoch is ignored when a host is decommissioned because some
1851+
// decommissioned executors on that host might have been removed before this fetch
1852+
// failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and
1853+
// proceed with unconditional removal of shuffle outputs from all executors on that
1854+
// host, including from those that we still haven't confirmed as lost due to heartbeat
1855+
// delays.
1856+
ignoreShuffleFileLostEpoch = isHostDecommissioned)
18511857
}
18521858
}
18531859

@@ -2014,7 +2020,7 @@ private[spark] class DAGScheduler(
20142020
fileLost: Boolean,
20152021
hostToUnregisterOutputs: Option[String],
20162022
maybeEpoch: Option[Long] = None,
2017-
ignoreShuffleVersion: Boolean = false): Unit = {
2023+
ignoreShuffleFileLostEpoch: Boolean = false): Unit = {
20182024
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
20192025
logDebug(s"Considering removal of executor $execId; " +
20202026
s"fileLost: $fileLost, currentEpoch: $currentEpoch")
@@ -2025,7 +2031,7 @@ private[spark] class DAGScheduler(
20252031
clearCacheLocs()
20262032
}
20272033
if (fileLost) {
2028-
val remove = if (ignoreShuffleVersion) {
2034+
val remove = if (ignoreShuffleFileLostEpoch) {
20292035
true
20302036
} else if (!shuffleFileLostEpoch.contains(execId) ||
20312037
shuffleFileLostEpoch(execId) < currentEpoch) {

core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ class DecommissionWorkerSuite
236236
val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((_, _) => {
237237
val executorId = SparkEnv.get.executorId
238238
val context = TaskContext.get()
239+
// Only sleep in the first attempt to create the required window for decommissioning.
240+
// Subsequent attempts don't need to be delayed to speed up the test.
239241
if (context.attemptNumber() == 0 && context.stageAttemptNumber() == 0) {
240242
val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1
241243
Thread.sleep(sleepTimeSeconds * 1000L)

0 commit comments

Comments
 (0)