Skip to content

Commit 806630a

Browse files
committed
Reduce job failures during decommissioning
This PR reduces the prospect of a job loss during decommissioning. It fixes two holes in the current decommissioning framework: - (a) Loss of decommissioned executors is not treated as a job failure: We know that the decom'd executor would be dying soon, so its death is clearly not caused by the application. - (b) Shuffle files on the decommissioned host are cleared when the first fetch failure is detected from a decommissioned host: This is a bit tricky in terms of when to clear the shuffle state ? Ideally you want to clear it the millisecond before the shuffle service on the node dies (or the executor dies when there is no external shuffle service) -- too soon and it could lead to some wasteage and too late would lead to fetch failures. The approach here is to do this clearing when the very first fetch failure is observed on the decom'd block manager, without waiting for other blocks to also signal a failure. (The commits changes for these two features share some common code and hence I am packing both of them into the same commit.) Added a new unit test `DecommissionWorkerSuite` to test the new behavior. TODO: - Should I add a feature flag to guard these two behaviors ? - Should this to be two separate PR's or two commits in the same PR ?
1 parent d315ebf commit 806630a

File tree

9 files changed

+535
-7
lines changed

9 files changed

+535
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,10 +1821,19 @@ private[spark] class DAGScheduler(
18211821

18221822
// TODO: mark the executor as failed only if there were lots of fetch failures on it
18231823
if (bmAddress != null) {
1824-
val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled &&
1825-
unRegisterOutputOnHostOnFetchFailure) {
1826-
// We had a fetch failure with the external shuffle service, so we
1827-
// assume all shuffle data on the node is bad.
1824+
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
1825+
val isHostDecommissioned = taskScheduler
1826+
.getExecutorDecommissionInfo(bmAddress.executorId)
1827+
.exists(_.isHostDecommissioned)
1828+
1829+
// Shuffle output of all executors on host `bmAddress.host` may be lost if:
1830+
// - External shuffle service is enabled, so we assume that all shuffle data on node is
1831+
// bad.
1832+
// - Host is decommissioned, thus all executors on that host will die.
1833+
val shuffleOutputOfEntireHostLost = externalShuffleServiceEnabled ||
1834+
isHostDecommissioned
1835+
val hostToUnregisterOutputs = if (shuffleOutputOfEntireHostLost
1836+
&& unRegisterOutputOnHostOnFetchFailure) {
18281837
Some(bmAddress.host)
18291838
} else {
18301839
// Unregister shuffle data just for one executor (we don't have any
@@ -2339,7 +2348,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
23392348

23402349
case ExecutorLost(execId, reason) =>
23412350
val workerLost = reason match {
2342-
case ExecutorProcessLost(_, true) => true
2351+
case ExecutorProcessLost(_, true, _) => true
23432352
case _ => false
23442353
}
23452354
dagScheduler.handleExecutorLost(execId, workerLost)

core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
5454
/**
5555
* @param _message human readable loss reason
5656
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
57+
* @param causedByApp whether the loss of the executor is the fault of the running app.
58+
* (assumed true by default unless known explicitly otherwise)
5759
*/
5860
private[spark]
59-
case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)
61+
case class ExecutorProcessLost(
62+
_message: String = "Executor Process Lost",
63+
workerLost: Boolean = false,
64+
causedByApp: Boolean = true)
6065
extends ExecutorLossReason(_message)
6166

6267
/**

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ private[spark] trait TaskScheduler {
103103
*/
104104
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
105105

106+
/**
107+
* If an executor is decommissioned, return its corresponding decommission info
108+
*/
109+
def getExecutorDecommissionInfo(executorId: String): Option[ExecutorDecommissionInfo]
110+
106111
/**
107112
* Process a lost executor
108113
*/

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ private[spark] class TaskSchedulerImpl(
136136
// IDs of the tasks running on each executor
137137
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
138138

139+
private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]
140+
139141
def runningTasksByExecutors: Map[String, Int] = synchronized {
140142
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
141143
}
@@ -939,12 +941,43 @@ private[spark] class TaskSchedulerImpl(
939941

940942
override def executorDecommission(
941943
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
944+
synchronized {
945+
// Don't bother noting decommissioning for executors that we don't know about
946+
if (executorIdToHost.contains(executorId)) {
947+
// The scheduler can get multiple decommission updates from multiple sources,
948+
// and some of those can have isHostDecommissioned false. We merge them such that
949+
// if we heard isHostDecommissioned ever true, then we keep that one since it is
950+
// most likely coming from the cluster manager and thus authoritative
951+
val oldDecomInfo = executorsPendingDecommission.get(executorId)
952+
if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) {
953+
executorsPendingDecommission(executorId) = decommissionInfo
954+
}
955+
}
956+
}
942957
rootPool.executorDecommission(executorId)
943958
backend.reviveOffers()
944959
}
945960

946-
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
961+
override def getExecutorDecommissionInfo(executorId: String)
962+
: Option[ExecutorDecommissionInfo] = synchronized {
963+
executorsPendingDecommission.get(executorId)
964+
}
965+
966+
override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
947967
var failedExecutor: Option[String] = None
968+
val reason = givenReason match {
969+
// Handle executor process loss due to decommissioning
970+
case ExecutorProcessLost(message, workerLost, causedByApp) =>
971+
val executorDecommissionInfo = getExecutorDecommissionInfo(executorId)
972+
ExecutorProcessLost(
973+
message,
974+
// Also mark the worker lost if we know that the host was decommissioned
975+
workerLost || executorDecommissionInfo.exists(_.isHostDecommissioned),
976+
// Executor loss is certainly not caused by app if we knew that this executor is being
977+
// decommissioned
978+
causedByApp = executorDecommissionInfo.isEmpty && causedByApp)
979+
case e => e
980+
}
948981

949982
synchronized {
950983
if (executorIdToRunningTaskIds.contains(executorId)) {
@@ -1033,6 +1066,8 @@ private[spark] class TaskSchedulerImpl(
10331066
}
10341067
}
10351068

1069+
executorsPendingDecommission -= executorId
1070+
10361071
if (reason != LossReasonPending) {
10371072
executorIdToHost -= executorId
10381073
rootPool.executorLost(executorId, host, reason)

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,7 @@ private[spark] class TaskSetManager(
985985
val exitCausedByApp: Boolean = reason match {
986986
case exited: ExecutorExited => exited.exitCausedByApp
987987
case ExecutorKilled => false
988+
case ExecutorProcessLost(_, _, false) => false
988989
case _ => true
989990
}
990991
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,

0 commit comments

Comments
 (0)