Skip to content

Commit 34ac45a

Browse files
tejasapatilzsxwing
authored andcommitted
[SPARK-16230][CORE] CoarseGrainedExecutorBackend to self kill if there is an exception while creating an Executor
## What changes were proposed in this pull request? With the fix from SPARK-13112, I see that `LaunchTask` is always processed after `RegisteredExecutor` is done and so it gets chance to do all retries to startup an executor. There is still a problem that if `Executor` creation itself fails and there is some exception, it gets unnoticed and the executor is killed when it tries to process the `LaunchTask` as `executor` is null : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L88 So if one looks at the logs, it does not tell that there was problem during `Executor` creation and thats why it was killed. This PR explicitly catches exception in `Executor` creation, logs a proper message and then exits the JVM. Also, I have changed the `exitExecutor` method to accept `reason` so that backends can use that reason and do stuff like logging to a DB to get an aggregate of such exits at a cluster level ## How was this patch tested? I am relying on existing tests Author: Tejas Patil <tejasp@fb.com> Closes #14202 from tejasapatil/exit_executor_failure. (cherry picked from commit b2f24f9) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
1 parent e833c90 commit 34ac45a

File tree

1 file changed

+20
-12
lines changed

1 file changed

+20
-12
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2323

2424
import scala.collection.mutable
2525
import scala.util.{Failure, Success}
26+
import scala.util.control.NonFatal
2627

2728
import org.apache.spark._
2829
import org.apache.spark.TaskState.TaskState
@@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
6465
case Success(msg) =>
6566
// Always receive `true`. Just ignore it
6667
case Failure(e) =>
67-
logError(s"Cannot register with driver: $driverUrl", e)
68-
exitExecutor(1)
68+
exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
6969
}(ThreadUtils.sameThread)
7070
}
7171

@@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend(
7878
override def receive: PartialFunction[Any, Unit] = {
7979
case RegisteredExecutor =>
8080
logInfo("Successfully registered with driver")
81-
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
81+
try {
82+
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
83+
} catch {
84+
case NonFatal(e) =>
85+
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
86+
}
8287

8388
case RegisterExecutorFailed(message) =>
84-
logError("Slave registration failed: " + message)
85-
exitExecutor(1)
89+
exitExecutor(1, "Slave registration failed: " + message)
8690

8791
case LaunchTask(data) =>
8892
if (executor == null) {
89-
logError("Received LaunchTask command but executor was null")
90-
exitExecutor(1)
93+
exitExecutor(1, "Received LaunchTask command but executor was null")
9194
} else {
9295
val taskDesc = ser.deserialize[TaskDescription](data.value)
9396
logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend(
97100

98101
case KillTask(taskId, _, interruptThread) =>
99102
if (executor == null) {
100-
logError("Received KillTask command but executor was null")
101-
exitExecutor(1)
103+
exitExecutor(1, "Received KillTask command but executor was null")
102104
} else {
103105
executor.killTask(taskId, interruptThread)
104106
}
@@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend(
127129
if (stopping.get()) {
128130
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
129131
} else if (driver.exists(_.address == remoteAddress)) {
130-
logError(s"Driver $remoteAddress disassociated! Shutting down.")
131-
exitExecutor(1)
132+
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
132133
} else {
133134
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
134135
}
@@ -147,7 +148,14 @@ private[spark] class CoarseGrainedExecutorBackend(
147148
* executor exits differently. For e.g. when an executor goes down,
148149
* back-end may not want to take the parent process down.
149150
*/
150-
protected def exitExecutor(code: Int): Unit = System.exit(code)
151+
protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
152+
if (throwable != null) {
153+
logError(reason, throwable)
154+
} else {
155+
logError(reason)
156+
}
157+
System.exit(code)
158+
}
151159
}
152160

153161
private[spark] object CoarseGrainedExecutorBackend extends Logging {

0 commit comments

Comments
 (0)