Skip to content

Commit ebd8408

Browse files
committed
@Ngone51's comments
1 parent aca23bc commit ebd8408

File tree

4 files changed

+10
-8
lines changed

4 files changed

+10
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boole
3434
*/
3535
case class ExecutorDecommissionState(
3636
message: String,
37-
// Timestamp in milliseconds when decommissioning was triggered
38-
tsMillis: Long,
37+
// Timestamp the decommissioning commenced in millis since epoch of the driver's clock
38+
startTime: Long,
3939
isHostDecommissioned: Boolean)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ private[spark] class TaskSchedulerImpl(
930930
if (!oldDecomState.exists(_.isHostDecommissioned)) {
931931
executorsPendingDecommission(executorId) = ExecutorDecommissionState(
932932
decommissionInfo.message,
933-
oldDecomState.map(_.tsMillis).getOrElse(clock.getTimeMillis()),
933+
oldDecomState.map(_.startTime).getOrElse(clock.getTimeMillis()),
934934
decommissionInfo.isHostDecommissioned)
935935
}
936936
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,11 +1056,13 @@ private[spark] class TaskSetManager(
10561056
val taskInfo = taskInfos(tid)
10571057
val decomState = sched.getExecutorDecommissionState(taskInfo.executorId)
10581058
if (decomState.nonEmpty) {
1059-
// Check whether this task will finish before the exectorKillTime assuming
1060-
// it will take medianDuration overall. If this task cannot finish within
1061-
// executorKillInterval, then this task is a candidate for speculation
1059+
// Check if this task might finish after this executor is decommissioned.
1060+
// We estimate the task's finish time by using the median task duration.
1061+
// Whereas the time when the executor might be decommissioned is estimated using the
1062+
// config executorDecommissionKillInterval. If the task is going to finish after
1063+
// decommissioning, then we will eagerly speculate the task.
10621064
val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
1063-
val executorDecomTime = decomState.get.tsMillis + executorDecommissionKillInterval.get
1065+
val executorDecomTime = decomState.get.startTime + executorDecommissionKillInterval.get
10641066
val canExceedDeadline = executorDecomTime < taskEndTimeBasedOnMedianDuration
10651067
if (canExceedDeadline) {
10661068
speculated = checkAndSubmitSpeculatableTask(tid, time, 0)

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1977,7 +1977,7 @@ class TaskSetManagerSuite
19771977
// (TASK 2 -> 15, TASK 3 -> 15)
19781978
sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom",
19791979
isHostDecommissioned = false))
1980-
assert(sched.getExecutorDecommissionState("exec2").map(_.tsMillis) ===
1980+
assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) ===
19811981
Some(clock.getTimeMillis()))
19821982

19831983
assert(manager.checkSpeculatableTasks(0))

0 commit comments

Comments
 (0)