Skip to content

SPY-350 Don't let Master remove RUNNING Applications #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ private[spark] class ApplicationInfo(

def retryCount = _retryCount

def incrementRetryCount = {
def incrementRetryCount() = {
_retryCount += 1
_retryCount
}

def resetRetryCount() = _retryCount = 0

def markFinished(endState: ApplicationState.Value) {
state = endState
endTime = System.currentTimeMillis()
Expand Down
25 changes: 16 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,27 +264,34 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val appInfo = idToApp(appId)
// Remove this executor from the worker and app
logInfo("Removing executor " + exec.fullId + " because it is " + state)
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
if (!normalExit) {
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
val execs = idToApp(appId).executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a transient state in which executors are "RUNNING" event though they will immediately fail? If so, the application could be immortal (not such a big problem for us as the result is the same: restart sparky)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immortal RUNNING is definitely my biggest concern with this patch. I'll take another look today and see if I can find anything that I missed. Should also get some more eyes on it with the PR against the Apache repo.

removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning("Got status update for unknown executor " + appId + "/" + execId)
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)

// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
state = ExecutorState.RUNNING
worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.FAILED
state = ExecutorState.EXITED

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state need to check,because ExecutorExitCode has other state. in class ExecutorExitCode there are many others and their belong to ExecutorState.FAILED, if(exitCode !=0 ) state=ExecutorState.FAILED else state=ExecutorState.EXITED.

val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private[spark] class Worker(
// ApplicationDescription to be more explicit about this.
val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down