Skip to content

Commit 609e5ff

Browse files
kanzhangaarondav
authored andcommitted
[SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors
There seems to be 2 issues. 1. When job is done, driver asks executor to shutdown. However, this clean exit was assigned FAILED executor state by Worker. I introduced EXITED executor state for executors who voluntarily exit (both normal and abnormal exit depending on the exit code). 2. When Master gets notified an executor has exited, it launches another one to replace it, regardless of reason why the executor had exited. When the reason was job has finished, the unnecessary replacement got subsequently killed when App disassociates. This launching and killing of unnecessary executors shows up in the log and is confusing to users. I added check for executor exit status and avoid launching (and subsequent killing) of unnecessary replacements when executors exit cleanly. One could ask the scheduler to tell Master job is done so that Master wouldn't launch the replacement executor. However, there is a race condition between App telling Master job is done and Worker telling Master an executor had exited. There is no guarantee the former will happen before the later. Instead, I chose to check the exit code when executor exits. If the exit code is 0, I assume executor has been asked to shutdown by driver and Master will not launch replacements. Due to race condition, it could also happen that (although didn't happen on my local cluster), Master detects App disassociation event before the executor exits by itself. In such cases, the executor will be rightfully killed and labeled as KILLED, while the App state will show FINISHED. Author: Kan Zhang <kzhang@apache.org> Closes #306 from kanzhang/SPARK-1118 and squashes the following commits: cb0cc86 [Kan Zhang] [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors (cherry picked from commit ca5d9d4) Signed-off-by: Aaron Davidson <aaron@databricks.com>
1 parent 868cf42 commit 609e5ff

File tree

3 files changed

+8
-8
lines changed

3 files changed

+8
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,10 @@ private[spark] class ExecutorRunner(
143143
Files.write(header, stderr, Charsets.UTF_8)
144144
CommandUtils.redirectStream(process.getErrorStream, stderr)
145145

146-
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
147-
// long-lived processes only. However, in the future, we might restart the executor a few
148-
// times on the same machine.
146+
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
147+
// or with nonzero exit code
149148
val exitCode = process.waitFor()
150-
state = ExecutorState.FAILED
149+
state = ExecutorState.EXITED
151150
val message = "Command exited with code " + exitCode
152151
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
153152
} catch {

0 commit comments

Comments
 (0)