Skip to content

Commit 0678afe

Browse files
prakharjain09holdenk
authored andcommitted
[SPARK-21040][CORE] Speculate tasks which are running on decommission executors
### What changes were proposed in this pull request? This PR adds functionality to consider the running tasks on decommission executors based on some config. In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds. So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation. ### Why are the changes needed? Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments and take better decisions about speculating the already running tasks on decommission executors. ### Does this PR introduce _any_ user-facing change? Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running. ### How was this patch tested? Added UT. Closes #28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks. Authored-by: Prakhar Jain <prakharjain09@gmail.com> Signed-off-by: Holden Karau <hkarau@apple.com>
1 parent 7dc1d89 commit 0678afe

File tree

3 files changed

+141
-4
lines changed

3 files changed

+141
-4
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,6 +1843,17 @@ package object config {
18431843
.timeConf(TimeUnit.MILLISECONDS)
18441844
.createOptional
18451845

1846+
private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
1847+
ConfigBuilder("spark.executor.decommission.killInterval")
1848+
.doc("Duration after which a decommissioned executor will be killed forcefully." +
1849+
"This config is useful for cloud environments where we know in advance when " +
1850+
"an executor is going to go down after decommissioning signal i.e. around 2 mins " +
1851+
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
1852+
"used to decide what tasks running on decommission executors to speculate.")
1853+
.version("3.1.0")
1854+
.timeConf(TimeUnit.SECONDS)
1855+
.createOptional
1856+
18461857
private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
18471858
.doc("Staging directory used while submitting applications.")
18481859
.version("2.0.0")

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
22-
import java.util.concurrent.ConcurrentLinkedQueue
22+
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
2323

2424
import scala.collection.immutable.Map
2525
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -102,6 +102,8 @@ private[spark] class TaskSetManager(
102102
}
103103
numTasks <= slots
104104
}
105+
val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(
106+
TimeUnit.SECONDS.toMillis)
105107

106108
// For each task, tracks whether a copy of the task has succeeded. A task will also be
107109
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
@@ -165,6 +167,7 @@ private[spark] class TaskSetManager(
165167

166168
// Task index, start and finish time for each task attempt (indexed by task ID)
167169
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
170+
private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long]
168171

169172
// Use a MedianHeap to record durations of successful tasks so we know when to launch
170173
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
@@ -933,6 +936,7 @@ private[spark] class TaskSetManager(
933936

934937
/** If the given task ID is in the set of running tasks, removes it. */
935938
def removeRunningTask(tid: Long): Unit = {
939+
tidToExecutorKillTimeMapping.remove(tid)
936940
if (runningTasksSet.remove(tid) && parent != null) {
937941
parent.decreaseRunningTasks(1)
938942
}
@@ -1042,7 +1046,19 @@ private[spark] class TaskSetManager(
10421046
// bound based on that.
10431047
logDebug("Task length threshold for speculation: " + threshold)
10441048
for (tid <- runningTasksSet) {
1045-
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
1049+
var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
1050+
if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
1051+
// Check whether this task will finish before the exectorKillTime assuming
1052+
// it will take medianDuration overall. If this task cannot finish within
1053+
// executorKillInterval, then this task is a candidate for speculation
1054+
val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
1055+
val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
1056+
taskEndTimeBasedOnMedianDuration
1057+
if (canExceedDeadline) {
1058+
speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
1059+
}
1060+
}
1061+
foundTasks |= speculated
10461062
}
10471063
} else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
10481064
val time = clock.getTimeMillis()
@@ -1100,8 +1116,12 @@ private[spark] class TaskSetManager(
11001116

11011117
def executorDecommission(execId: String): Unit = {
11021118
recomputeLocality()
1103-
// Future consideration: if an executor is decommissioned it may make sense to add the current
1104-
// tasks to the spec exec queue.
1119+
executorDecommissionKillInterval.foreach { interval =>
1120+
val executorKillTime = clock.getTimeMillis() + interval
1121+
runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid =>
1122+
tidToExecutorKillTimeMapping(tid) = executorKillTime
1123+
}
1124+
}
11051125
}
11061126

11071127
def recomputeLocality(): Unit = {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,6 +1911,112 @@ class TaskSetManagerSuite
19111911
testSpeculationDurationThreshold(true, 2, 1)
19121912
}
19131913

1914+
test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
1915+
" and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
1916+
sc = new SparkContext("local", "test")
1917+
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
1918+
val taskSet = FakeTask.createTaskSet(4)
1919+
sc.conf.set(config.SPECULATION_ENABLED, true)
1920+
sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
1921+
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
1922+
sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
1923+
val clock = new ManualClock()
1924+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
1925+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
1926+
task.metrics.internalAccums
1927+
}
1928+
1929+
// Start TASK 0,1 on exec1, TASK 2 on exec2
1930+
(0 until 2).foreach { _ =>
1931+
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
1932+
assert(taskOption.isDefined)
1933+
assert(taskOption.get.executorId === "exec1")
1934+
}
1935+
val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
1936+
assert(taskOption2.isDefined)
1937+
assert(taskOption2.get.executorId === "exec2")
1938+
1939+
clock.advance(6*1000) // time = 6s
1940+
// Start TASK 3 on exec2 after some delay
1941+
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
1942+
assert(taskOption3.isDefined)
1943+
assert(taskOption3.get.executorId === "exec2")
1944+
1945+
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
1946+
1947+
clock.advance(4*1000) // time = 10s
1948+
// Complete the first 2 tasks and leave the other 2 tasks in running
1949+
for (id <- Set(0, 1)) {
1950+
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1951+
assert(sched.endedTasks(id) === Success)
1952+
}
1953+
1954+
// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
1955+
// speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
1956+
// > 15s for speculation
1957+
assert(!manager.checkSpeculatableTasks(0))
1958+
assert(sched.speculativeTasks.toSet === Set())
1959+
1960+
// decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to
1961+
// executorDecommissionSpeculationTriggerTimeoutOpt
1962+
// (TASK 2 -> 15, TASK 3 -> 15)
1963+
manager.executorDecommission("exec2")
1964+
assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
1965+
assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
1966+
assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)
1967+
1968+
assert(manager.checkSpeculatableTasks(0))
1969+
// TASK 2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s)
1970+
// TASK 3 started at t=6s, so it might not finish before t=15s. So TASK 3 should be part
1971+
// of speculativeTasks
1972+
assert(sched.speculativeTasks.toSet === Set(3))
1973+
assert(manager.copiesRunning(3) === 1)
1974+
1975+
// Offer resource to start the speculative attempt for the running task
1976+
val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
1977+
// Offer more resources. Nothing should get scheduled now.
1978+
assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
1979+
assert(taskOption3New.isDefined)
1980+
1981+
// Assert info about the newly launched speculative task
1982+
val speculativeTask3 = taskOption3New.get
1983+
assert(speculativeTask3.index === 3)
1984+
assert(speculativeTask3.taskId === 4)
1985+
assert(speculativeTask3.executorId === "exec3")
1986+
assert(speculativeTask3.attemptNumber === 1)
1987+
1988+
clock.advance(1*1000) // time = 11s
1989+
// Running checkSpeculatableTasks again should return false
1990+
assert(!manager.checkSpeculatableTasks(0))
1991+
assert(manager.copiesRunning(2) === 1)
1992+
assert(manager.copiesRunning(3) === 2)
1993+
1994+
clock.advance(5*1000) // time = 16s
1995+
// At t=16s, TASK 2 has been running for 16s. It is more than the
1996+
// SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now TASK 2 will
1997+
// be selected for speculation. Here we are verifying that regular speculation configs
1998+
// should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
1999+
// corresponding executor is decommissioned
2000+
assert(manager.checkSpeculatableTasks(0))
2001+
assert(sched.speculativeTasks.toSet === Set(2, 3))
2002+
assert(manager.copiesRunning(2) === 1)
2003+
assert(manager.copiesRunning(3) === 2)
2004+
val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
2005+
assert(taskOption2New.isDefined)
2006+
val speculativeTask2 = taskOption2New.get
2007+
// Ensure that TASK 2 is re-launched on exec3, host3
2008+
assert(speculativeTask2.index === 2)
2009+
assert(speculativeTask2.taskId === 5)
2010+
assert(speculativeTask2.executorId === "exec3")
2011+
assert(speculativeTask2.attemptNumber === 1)
2012+
2013+
assert(manager.copiesRunning(2) === 2)
2014+
assert(manager.copiesRunning(3) === 2)
2015+
2016+
// Offering additional resources should not lead to any speculative tasks being respawned
2017+
assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
2018+
}
2019+
19142020
test("SPARK-29976 Regular speculation configs should still take effect even when a " +
19152021
"threshold is provided") {
19162022
val (manager, clock) = testSpeculationDurationSetup(

0 commit comments

Comments
 (0)