Skip to content

Commit b2ebadf

Browse files
ericlyhuai
authored andcommitted
[SPARK-20358][CORE] Executors failing stage on interrupted exception thrown by cancelled tasks
## What changes were proposed in this pull request? This was a regression introduced by my earlier PR here: #17531 It turns out NonFatal() does not in fact catch InterruptedException. ## How was this patch tested? Extended cancellation unit test coverage. The first test fails before this patch. cc JoshRosen mridulm Author: Eric Liang <ekl@databricks.com> Closes #17659 from ericl/spark-20358.
1 parent c5a31d1 commit b2ebadf

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,8 @@ private[spark] class Executor(
432432
setTaskFinishedAndClearInterruptStatus()
433433
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))
434434

435-
case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
435+
case _: InterruptedException | NonFatal(_) if
436+
task != null && task.reasonIfKilled.isDefined =>
436437
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
437438
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
438439
setTaskFinishedAndClearInterruptStatus()

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
540540
}
541541
}
542542

543-
// Launches one task that will run forever. Once the SparkListener detects the task has
543+
testCancellingTasks("that raise interrupted exception on cancel") {
544+
Thread.sleep(9999999)
545+
}
546+
547+
// SPARK-20217 should not fail stage if task throws non-interrupted exception
548+
testCancellingTasks("that raise runtime exception on cancel") {
549+
try {
550+
Thread.sleep(9999999)
551+
} catch {
552+
case t: Throwable =>
553+
throw new RuntimeException("killed")
554+
}
555+
}
556+
557+
// Launches one task that will block forever. Once the SparkListener detects the task has
544558
// started, kill and re-schedule it. The second run of the task will complete immediately.
545559
// If this test times out, then the first version of the task wasn't killed successfully.
546-
test("Killing tasks") {
560+
def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") {
547561
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
548562

549563
SparkContextSuite.isTaskStarted = false
@@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
572586
// first attempt will hang
573587
if (!SparkContextSuite.isTaskStarted) {
574588
SparkContextSuite.isTaskStarted = true
575-
try {
576-
Thread.sleep(9999999)
577-
} catch {
578-
case t: Throwable =>
579-
// SPARK-20217 should not fail stage if task throws non-interrupted exception
580-
throw new RuntimeException("killed")
581-
}
589+
blockFn
582590
}
583591
// second attempt succeeds immediately
584592
}

0 commit comments

Comments
 (0)