Skip to content

Commit 1cb444d

Browse files
committed
[SPARK-3469] Call all TaskCompletionListeners even if some fail.
Note that this also changes the fault semantics of TaskCompletionListener. Previously failures in TaskCompletionListeners would result in the task being reported as failed. With this change, tasks won't be reported as failed simply because the execution of TaskCompletionListener fails.
1 parent 25b5b86 commit 1cb444d

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class TaskContext(
7777
/**
7878
* Add a listener in the form of a Scala closure to be executed on task completion.
7979
* This will be called in all situation - success, failure, or cancellation.
80+
* Exceptions in callbacks are however not thrown back upstream, i.e. tasks won't marked as
81+
* failed even if completion callbacks fail to execute.
8082
*
8183
* An example use is for HadoopRDD to register a callback to close the input stream.
8284
*/
@@ -104,7 +106,14 @@ class TaskContext(
104106
private[spark] def markTaskCompleted(): Unit = {
105107
completed = true
106108
// Process complete callbacks in the reverse order of registration
107-
onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
109+
onCompleteCallbacks.reverse.foreach { listener =>
110+
try {
111+
listener.onTaskCompletion(this)
112+
} catch {
113+
case e: Throwable =>
114+
logError("Error in TaskCompletionListener", e)
115+
}
116+
}
108117
}
109118

110119
/** Marks the task for interruption, i.e. cancellation. */

core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
2626

2727
class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
2828

29-
test("Calls executeOnCompleteCallbacks after failure") {
29+
test("calls TaskCompletionListener after failure") {
3030
TaskContextSuite.completed = false
3131
sc = new SparkContext("local", "test")
3232
val rdd = new RDD[String](sc, List()) {
@@ -45,10 +45,21 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
4545
}
4646
assert(TaskContextSuite.completed === true)
4747
}
48+
49+
test("all TaskCompletionListeners should be called even if some fail") {
50+
val context = new TaskContext(0, 0, 0)
51+
context.addTaskCompletionListener(_ => throw new Exception("blah"))
52+
context.addTaskCompletionListener(_ => TaskContextSuite.callbackInvoked = true)
53+
context.addTaskCompletionListener(_ => throw new Exception("blah"))
54+
context.markTaskCompleted()
55+
assert(TaskContextSuite.callbackInvoked === true)
56+
}
4857
}
4958

5059
private object TaskContextSuite {
5160
@volatile var completed = false
61+
62+
var callbackInvoked = false
5263
}
5364

5465
private case class StubPartition(index: Int) extends Partition

0 commit comments

Comments
 (0)