Skip to content

[SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,11 @@ private[spark] class Master(
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tricky part is in L370, when you run a standalone application, it exists as as normal, but will trigger a DisassociatedEvent to the master, the current implementation set the Executor status to KILLED no matter it is exists normally or is due to the ctrl+c.....(I didn't test it carefully...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L370?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 370

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My line 370 is " addressToWorker.get(address).foreach(removeWorker)". I'm not sure this is what you meant.

the current implementation set the Executor status to KILLED no matter it is exists normally or is due to the ctrl+c

the current implementation sets Executor status to FAILED when it exits normally and that's what I wanted to correct here, by introducing EXITED status, which could mean either normal or abnormal exit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, L367, no...I mean when the driver program exists normally or due to ctrl + c

// Only retry certain number of times so we don't go into an infinite loop.
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)

// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.FAILED
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
Expand Down