Skip to content

Commit ec2ac34

Browse files
author
Sital Kedia
committed
[SPARK-20163] Kill all running tasks in a stage in case of fetch failure
1 parent c622a87 commit ec2ac34

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ private[spark] class TaskSetManager(
110110
// task set is aborted (for example, because it was killed). TaskSetManagers remain in the zombie
111111
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
112112
// state in order to continue to track and account for the running tasks.
113-
// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
114113
private[scheduler] var isZombie = false
115114

116115
// Set of pending tasks for each executor. These collections are actually
@@ -768,6 +767,19 @@ private[spark] class TaskSetManager(
768767
s" executor ${info.executorId}): ${reason.toErrorString}"
769768
val failureException: Option[Throwable] = reason match {
770769
case fetchFailed: FetchFailed =>
770+
if (!isZombie) {
771+
for (i <- 0 until numTasks if i != index) {
772+
// Only for the first occurance of the fetch failure, kill all running
773+
// tasks in the task set
774+
for (attemptInfo <- taskAttempts(i) if attemptInfo.running) {
775+
sched.backend.killTask(
776+
attemptInfo.taskId,
777+
attemptInfo.executorId,
778+
interruptThread = true,
779+
reason = "another attempt succeeded")
780+
}
781+
}
782+
}
771783
logWarning(failureReason)
772784
if (!successful(index)) {
773785
successful(index) = true

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,53 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
417417
}
418418
}
419419

420+
test("Running tasks should be killed after first fetch failure") {
421+
val rescheduleDelay = 300L
422+
val conf = new SparkConf().
423+
set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
424+
// don't wait to jump locality levels in this test
425+
set("spark.locality.wait", "0")
426+
427+
val killedTasks = new ArrayBuffer[Long]
428+
sc = new SparkContext("local", "test", conf)
429+
// two executors on same host, one on different.
430+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
431+
("exec1.1", "host1"), ("exec2", "host2"))
432+
sched.initialize(new FakeSchedulerBackend() {
433+
override def killTask(
434+
taskId: Long,
435+
executorId: String,
436+
interruptThread: Boolean,
437+
reason: String): Unit = {
438+
killedTasks += taskId
439+
}
440+
})
441+
// affinity to exec1 on host1 - which we will fail.
442+
val taskSet = FakeTask.createTaskSet(4)
443+
val clock = new ManualClock
444+
clock.advance(1)
445+
val manager = new TaskSetManager(sched, taskSet, 4, None, clock)
446+
447+
val offerResult1 = manager.resourceOffer("exec1", "host1", ANY)
448+
assert(offerResult1.isDefined, "Expect resource offer to return a task")
449+
450+
assert(offerResult1.get.index === 0)
451+
assert(offerResult1.get.executorId === "exec1")
452+
453+
val offerResult2 = manager.resourceOffer("exec2", "host2", ANY)
454+
assert(offerResult2.isDefined, "Expect resource offer to return a task")
455+
456+
assert(offerResult2.get.index === 1)
457+
assert(offerResult2.get.executorId === "exec2")
458+
// At this point, we have 2 tasks running and 2 pending. First fetch failure should
459+
// abort all the pending tasks but the running tasks should not be aborted.
460+
assert(killedTasks.isEmpty)
461+
manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED,
462+
FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored"))
463+
assert(killedTasks.size === 1)
464+
assert(killedTasks(0) === offerResult2.get.taskId)
465+
}
466+
420467
test("executors should be blacklisted after task failure, in spite of locality preferences") {
421468
val rescheduleDelay = 300L
422469
val conf = new SparkConf().
@@ -1107,6 +1154,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
11071154
set(config.BLACKLIST_ENABLED, true)
11081155
sc = new SparkContext("local", "test", conf)
11091156
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
1157+
sched.initialize(new FakeSchedulerBackend() {
1158+
override def killTask(
1159+
taskId: Long,
1160+
executorId: String,
1161+
interruptThread: Boolean,
1162+
reason: String): Unit = {}
1163+
})
11101164
val taskSet = FakeTask.createTaskSet(4)
11111165
val tsm = new TaskSetManager(sched, taskSet, 4)
11121166
// we need a spy so we can attach our mock blacklist

0 commit comments

Comments
 (0)