Skip to content

Commit c88adbb

Browse files
aarondavpwendell
authored andcommitted
SPARK-1772 Stop catching Throwable, let Executors die
The main issue this patch fixes is [SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772), in which Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch causes Executors to delegate to the ExecutorUncaughtExceptionHandler when a fatal exception is thrown. This patch also continues the fight in the neverending war against `case t: Throwable =>`, by only catching Exceptions in many places, and adding a wrapper for Threads and Runnables to make sure any uncaught exceptions are at least printed to the logs. It also turns out that it is unlikely that the IndestructibleActorSystem actually works, given testing ([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The uncaughtExceptionHandler is not called from the places that we expected it would be. [SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part of this issue, but refactoring our Actor Systems to ensure that exceptions are dealt with properly is a much bigger change, outside the scope of this PR. Author: Aaron Davidson <aaron@databricks.com> Closes #715 from aarondav/throwable and squashes the following commits: f9b9bfe [Aaron Davidson] Remove other redundant 'throw e' e937a0a [Aaron Davidson] Address Prashant and Matei's comments 1867867 [Aaron Davidson] [RFC] SPARK-1772 Stop catching Throwable, let Executors die (cherry picked from commit 3af1f38) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
1 parent 19ccf20 commit c88adbb

19 files changed

+127
-140
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2323

2424
import org.apache.spark.broadcast.Broadcast
2525
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.util.Utils
2627

2728
/**
2829
* Classes that represent cleaning tasks.
@@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
110111
}
111112

112113
/** Keep cleaning RDD, shuffle, and broadcast state. */
113-
private def keepCleaning() {
114+
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
114115
while (!stopped) {
115116
try {
116117
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
@@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
128129
}
129130
}
130131
} catch {
131-
case t: Throwable => logError("Error in cleaning thread", t)
132+
case e: Exception => logError("Error in cleaning thread", e)
132133
}
133134
}
134135
}
@@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
141142
listeners.foreach(_.rddCleaned(rddId))
142143
logInfo("Cleaned RDD " + rddId)
143144
} catch {
144-
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
145+
case e: Exception => logError("Error cleaning RDD " + rddId, e)
145146
}
146147
}
147148

@@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
154155
listeners.foreach(_.shuffleCleaned(shuffleId))
155156
logInfo("Cleaned shuffle " + shuffleId)
156157
} catch {
157-
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
158+
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
158159
}
159160
}
160161

@@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
166167
listeners.foreach(_.broadcastCleaned(broadcastId))
167168
logInfo("Cleaned broadcast " + broadcastId)
168169
} catch {
169-
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
170+
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
170171
}
171172
}
172173

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
14941494
} catch {
14951495
// TODO: Enumerate the exact reasons why it can fail
14961496
// But irrespective of it, it means we cannot proceed !
1497-
case th: Throwable => {
1498-
throw new SparkException("YARN mode not available ?", th)
1497+
case e: Exception => {
1498+
throw new SparkException("YARN mode not available ?", e)
14991499
}
15001500
}
15011501
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
@@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
15101510
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
15111511

15121512
} catch {
1513-
case th: Throwable => {
1514-
throw new SparkException("YARN mode not available ?", th)
1513+
case e: Exception => {
1514+
throw new SparkException("YARN mode not available ?", e)
15151515
}
15161516
}
15171517

@@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
15211521
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
15221522
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
15231523
} catch {
1524-
case th: Throwable => {
1525-
throw new SparkException("YARN mode not available ?", th)
1524+
case e: Exception => {
1525+
throw new SparkException("YARN mode not available ?", e)
15261526
}
15271527
}
15281528

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
171171
this.interrupt()
172172
}
173173

174-
override def run() {
174+
override def run(): Unit = Utils.logUncaughtExceptions {
175175
try {
176176
SparkEnv.set(env)
177177
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
@@ -282,7 +282,6 @@ private[spark] object PythonRDD {
282282
}
283283
} catch {
284284
case eof: EOFException => {}
285-
case e: Throwable => throw e
286285
}
287286
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
288287
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
7171
stopDaemon()
7272
startDaemon()
7373
new Socket(daemonHost, daemonPort)
74-
case e: Throwable => throw e
7574
}
7675
}
7776
}

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ object Client {
157157
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
158158
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
159159
val (actorSystem, _) = AkkaUtils.createActorSystem(
160-
"driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
160+
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
161161

162162
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
163163

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object SparkHadoopUtil {
103103
.newInstance()
104104
.asInstanceOf[SparkHadoopUtil]
105105
} catch {
106-
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
106+
case e: Exception => throw new SparkException("Unable to load YARN support", e)
107107
}
108108
} else {
109109
new SparkHadoopUtil

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class HistoryServer(
7070
* TODO: Add a mechanism to update manually.
7171
*/
7272
private val logCheckingThread = new Thread {
73-
override def run() {
73+
override def run(): Unit = Utils.logUncaughtExceptions {
7474
while (!stopped) {
7575
val now = System.currentTimeMillis
7676
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
@@ -154,7 +154,7 @@ class HistoryServer(
154154
numCompletedApplications = logInfos.size
155155

156156
} catch {
157-
case t: Throwable => logError("Exception in checking for event log updates", t)
157+
case e: Exception => logError("Exception in checking for event log updates", e)
158158
}
159159
} else {
160160
logWarning("Attempted to check for event log updates before binding the server.")
@@ -231,8 +231,8 @@ class HistoryServer(
231231
dir.getModificationTime
232232
}
233233
} catch {
234-
case t: Throwable =>
235-
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
234+
case e: Exception =>
235+
logError("Exception in accessing modification time of %s".format(dir.getPath), e)
236236
-1L
237237
}
238238
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -684,8 +684,8 @@ private[spark] class Master(
684684
webUi.attachSparkUI(ui)
685685
return true
686686
} catch {
687-
case t: Throwable =>
688-
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
687+
case e: Exception =>
688+
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
689689
}
690690
} else {
691691
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ object DriverWrapper {
3131
case workerUrl :: mainClass :: extraArgs =>
3232
val conf = new SparkConf()
3333
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
34-
Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
34+
Utils.localHostName(), 0, conf, new SecurityManager(conf))
3535
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
3636

3737
// Delegate to supplied main class

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
105105
// Create a new ActorSystem to run the backend, because we can't create a
106106
// SparkEnv / Executor before getting started with all our system properties, etc
107107
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
108-
indestructible = true, conf = conf, new SecurityManager(conf))
108+
conf, new SecurityManager(conf))
109109
// set it
110110
val sparkHostPort = hostname + ":" + boundPort
111111
actorSystem.actorOf(

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,7 @@ private[spark] class Executor(
7474
// Setup an uncaught exception handler for non-local mode.
7575
// Make any thread terminations due to uncaught exceptions kill the entire
7676
// executor process to avoid surprising stalls.
77-
Thread.setDefaultUncaughtExceptionHandler(
78-
new Thread.UncaughtExceptionHandler {
79-
override def uncaughtException(thread: Thread, exception: Throwable) {
80-
try {
81-
logError("Uncaught exception in thread " + thread, exception)
82-
83-
// We may have been called from a shutdown hook. If so, we must not call System.exit().
84-
// (If we do, we will deadlock.)
85-
if (!Utils.inShutdown()) {
86-
if (exception.isInstanceOf[OutOfMemoryError]) {
87-
System.exit(ExecutorExitCode.OOM)
88-
} else {
89-
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
90-
}
91-
}
92-
} catch {
93-
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
94-
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
95-
}
96-
}
97-
}
98-
)
77+
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
9978
}
10079

10180
val executorSource = new ExecutorSource(this, executorId)
@@ -259,6 +238,11 @@ private[spark] class Executor(
259238
}
260239

261240
case t: Throwable => {
241+
// Attempt to exit cleanly by informing the driver of our failure.
242+
// If anything goes wrong (or this was a fatal exception), we will delegate to
243+
// the default uncaught exception handler, which will terminate the Executor.
244+
logError("Exception in task ID " + taskId, t)
245+
262246
val serviceTime = System.currentTimeMillis() - taskStart
263247
val metrics = attemptedTask.flatMap(t => t.metrics)
264248
for (m <- metrics) {
@@ -268,10 +252,11 @@ private[spark] class Executor(
268252
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
269253
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
270254

271-
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
272-
// have left some weird state around depending on when the exception was thrown, but on
273-
// the other hand, maybe we could detect that when future tasks fail and exit then.
274-
logError("Exception in task ID " + taskId, t)
255+
// Don't forcibly exit unless the exception was inherently fatal, to avoid
256+
// stopping other tasks unnecessarily.
257+
if (Utils.isFatalError(t)) {
258+
ExecutorUncaughtExceptionHandler.uncaughtException(t)
259+
}
275260
}
276261
} finally {
277262
// TODO: Unregister shuffle memory only for ResultTask
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.Logging
21+
import org.apache.spark.util.Utils
22+
23+
/**
24+
* The default uncaught exception handler for Executors terminates the whole process, to avoid
25+
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
26+
* to fail fast when things go wrong.
27+
*/
28+
private[spark] object ExecutorUncaughtExceptionHandler
29+
extends Thread.UncaughtExceptionHandler with Logging {
30+
31+
override def uncaughtException(thread: Thread, exception: Throwable) {
32+
try {
33+
logError("Uncaught exception in thread " + thread, exception)
34+
35+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
36+
// (If we do, we will deadlock.)
37+
if (!Utils.inShutdown()) {
38+
if (exception.isInstanceOf[OutOfMemoryError]) {
39+
System.exit(ExecutorExitCode.OOM)
40+
} else {
41+
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
42+
}
43+
}
44+
} catch {
45+
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
46+
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
47+
}
48+
}
49+
50+
def uncaughtException(exception: Throwable) {
51+
uncaughtException(Thread.currentThread(), exception)
52+
}
53+
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ private[spark] object EventLoggingListener extends Logging {
206206
applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
207207
)
208208
} catch {
209-
case t: Throwable =>
210-
logError("Exception in parsing logging info from directory %s".format(logDir), t)
209+
case e: Exception =>
210+
logError("Exception in parsing logging info from directory %s".format(logDir), e)
211211
EventLoggingInfo.empty
212212
}
213213
}

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
4343
def enqueueSuccessfulTask(
4444
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
4545
getTaskResultExecutor.execute(new Runnable {
46-
override def run() {
46+
override def run(): Unit = Utils.logUncaughtExceptions {
4747
try {
4848
val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
4949
case directResult: DirectTaskResult[_] => directResult
@@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
7070
case cnf: ClassNotFoundException =>
7171
val loader = Thread.currentThread.getContextClassLoader
7272
taskSetManager.abort("ClassNotFound with classloader: " + loader)
73-
case ex: Throwable =>
73+
case ex: Exception =>
7474
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
7575
}
7676
}
@@ -81,7 +81,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
8181
serializedData: ByteBuffer) {
8282
var reason : TaskEndReason = UnknownReason
8383
getTaskResultExecutor.execute(new Runnable {
84-
override def run() {
84+
override def run(): Unit = Utils.logUncaughtExceptions {
8585
try {
8686
if (serializedData != null && serializedData.limit() > 0) {
8787
reason = serializer.get().deserialize[TaskEndReason](
@@ -94,7 +94,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
9494
val loader = Utils.getContextOrSparkClassLoader
9595
logError(
9696
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
97-
case ex: Throwable => {}
97+
case ex: Exception => {}
9898
}
9999
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
100100
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
148148
private def addShutdownHook() {
149149
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
150150
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
151-
override def run() {
151+
override def run(): Unit = Utils.logUncaughtExceptions {
152152
logDebug("Shutdown hook called")
153153
DiskBlockManager.this.stop()
154154
}
@@ -162,8 +162,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
162162
try {
163163
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
164164
} catch {
165-
case t: Throwable =>
166-
logError("Exception while deleting local spark dir: " + localDir, t)
165+
case e: Exception =>
166+
logError("Exception while deleting local spark dir: " + localDir, e)
167167
}
168168
}
169169
}

core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import tachyon.client.TachyonFile
2525

2626
import org.apache.spark.Logging
2727
import org.apache.spark.executor.ExecutorExitCode
28-
import org.apache.spark.network.netty.ShuffleSender
2928
import org.apache.spark.util.Utils
3029

3130

@@ -137,16 +136,16 @@ private[spark] class TachyonBlockManager(
137136
private def addShutdownHook() {
138137
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
139138
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
140-
override def run() {
139+
override def run(): Unit = Utils.logUncaughtExceptions {
141140
logDebug("Shutdown hook called")
142141
tachyonDirs.foreach { tachyonDir =>
143142
try {
144143
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
145144
Utils.deleteRecursively(tachyonDir, client)
146145
}
147146
} catch {
148-
case t: Throwable =>
149-
logError("Exception while deleting tachyon spark dir: " + tachyonDir, t)
147+
case e: Exception =>
148+
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
150149
}
151150
}
152151
}

0 commit comments

Comments
 (0)