Skip to content

[SPARK-32199][SPARK-32198] Reduce job failures during decommissioning #29014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1821,10 +1821,19 @@ private[spark] class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled &&
unRegisterOutputOnHostOnFetchFailure) {
// We had a fetch failure with the external shuffle service, so we
// assume all shuffle data on the node is bad.
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionInfo(bmAddress.executorId)
.exists(_.isHostDecommissioned)

// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
// bad.
// - Host is decommissioned, thus all executors on that host will die.
val shuffleOutputOfEntireHostLost = externalShuffleServiceEnabled ||
isHostDecommissioned
val hostToUnregisterOutputs = if (shuffleOutputOfEntireHostLost
&& unRegisterOutputOnHostOnFetchFailure) {
Some(bmAddress.host)
} else {
// Unregister shuffle data just for one executor (we don't have any
Expand Down Expand Up @@ -2339,7 +2348,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case ExecutorProcessLost(_, true) => true
case ExecutorProcessLost(_, true, _) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
/**
* @param _message human readable loss reason
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
* @param causedByApp whether the loss of the executor is the fault of the running app.
* (assumed true by default unless known explicitly otherwise)
*/
private[spark]
case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)
case class ExecutorProcessLost(
_message: String = "Executor Process Lost",
workerLost: Boolean = false,
causedByApp: Boolean = true)
extends ExecutorLossReason(_message)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ private[spark] trait TaskScheduler {
*/
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

/**
* If an executor is decommissioned, return its corresponding decommission info
*/
def getExecutorDecommissionInfo(executorId: String): Option[ExecutorDecommissionInfo]

/**
* Process a lost executor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ private[spark] class TaskSchedulerImpl(
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]

private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]

def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
Expand Down Expand Up @@ -939,12 +941,43 @@ private[spark] class TaskSchedulerImpl(

override def executorDecommission(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
synchronized {
// Don't bother noting decommissioning for executors that we don't know about
if (executorIdToHost.contains(executorId)) {
// The scheduler can get multiple decommission updates from multiple sources,
// and some of those can have isHostDecommissioned false. We merge them such that
// if we heard isHostDecommissioned ever true, then we keep that one since it is
// most likely coming from the cluster manager and thus authoritative
val oldDecomInfo = executorsPendingDecommission.get(executorId)
if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) {
executorsPendingDecommission(executorId) = decommissionInfo
}
}
}
rootPool.executorDecommission(executorId)
backend.reviveOffers()
}

override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
override def getExecutorDecommissionInfo(executorId: String)
: Option[ExecutorDecommissionInfo] = synchronized {
executorsPendingDecommission.get(executorId)
}

override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
val reason = givenReason match {
// Handle executor process loss due to decommissioning
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
val executorDecommissionInfo = getExecutorDecommissionInfo(executorId)
ExecutorProcessLost(
message,
// Also mark the worker lost if we know that the host was decommissioned
origWorkerLost || executorDecommissionInfo.exists(_.isHostDecommissioned),
// Executor loss is certainly not caused by app if we knew that this executor is being
// decommissioned
causedByApp = executorDecommissionInfo.isEmpty && origCausedByApp)
case e => e
}

synchronized {
if (executorIdToRunningTaskIds.contains(executorId)) {
Expand Down Expand Up @@ -1033,6 +1066,8 @@ private[spark] class TaskSchedulerImpl(
}
}

executorsPendingDecommission -= executorId

if (reason != LossReasonPending) {
executorIdToHost -= executorId
rootPool.executorLost(executorId, host, reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ private[spark] class TaskSetManager(
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
case ExecutorKilled => false
case ExecutorProcessLost(_, _, false) => false
case _ => true
}
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,
Expand Down
Loading