Skip to content

Commit d5bc756

Browse files
committed
impr
1 parent 5775073 commit d5bc756

21 files changed

+82
-142
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager(
580580
// when the task backlog decreased.
581581
if (decommissionEnabled) {
582582
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
583-
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
583+
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
584584
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
585585
} else {
586586
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private[deploy] object DeployMessages {
188188
}
189189

190190
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
191-
exitStatus: Option[Int], workerLost: Boolean)
191+
exitStatus: Option[Int], hostOpt: Option[String])
192192

193193
case class ApplicationRemoved(message: String)
194194

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient(
175175
cores))
176176
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
177177

178-
case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
178+
case ExecutorUpdated(id, state, message, exitStatus, hostOpt) =>
179179
val fullId = appId + "/" + id
180180
val messageText = message.map(s => " (" + s + ")").getOrElse("")
181181
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
182182
if (ExecutorState.isFinished(state)) {
183-
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
183+
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, hostOpt)
184184
} else if (state == ExecutorState.DECOMMISSIONED) {
185185
listener.executorDecommissioned(fullId,
186-
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
186+
ExecutorDecommissionInfo(message.getOrElse(""), hostOpt))
187187
}
188188

189189
case WorkerRemoved(id, host, message) =>

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener {
3939
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
4040

4141
def executorRemoved(
42-
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
42+
fullId: String, message: String, exitStatus: Option[Int], hostOpt: Option[String]): Unit
4343

4444
def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
4545

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ private[deploy] class Master(
308308
appInfo.resetRetryCount()
309309
}
310310

311-
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
311+
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))
312312

313313
if (ExecutorState.isFinished(state)) {
314314
// Remove this executor from the worker and app
@@ -909,9 +909,9 @@ private[deploy] class Master(
909909
exec.application.driver.send(ExecutorUpdated(
910910
exec.id, ExecutorState.DECOMMISSIONED,
911911
Some("worker decommissioned"), None,
912-
// workerLost is being set to true here to let the driver know that the host (aka. worker)
912+
// worker host is being set here to let the driver know that the host (aka. worker)
913913
// is also being decommissioned.
914-
workerLost = true))
914+
Some(worker.host)))
915915
exec.state = ExecutorState.DECOMMISSIONED
916916
exec.application.removeExecutor(exec)
917917
}
@@ -932,7 +932,7 @@ private[deploy] class Master(
932932
for (exec <- worker.executors.values) {
933933
logInfo("Telling app of lost executor: " + exec.id)
934934
exec.application.driver.send(ExecutorUpdated(
935-
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
935+
exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host)))
936936
exec.state = ExecutorState.LOST
937937
exec.application.removeExecutor(exec)
938938
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend(
172172
driver match {
173173
case Some(endpoint) =>
174174
logInfo("Sending DecommissionExecutor to driver.")
175-
endpoint.send(
176-
DecommissionExecutor(
177-
executorId,
178-
ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
175+
endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg)))
179176
case _ =>
180177
logError("No registered driver to send Decommission to.")
181178
}
@@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend(
275272
// Tell master we are are decommissioned so it stops trying to schedule us
276273
if (driver.nonEmpty) {
277274
driver.get.askSync[Boolean](DecommissionExecutor(
278-
executorId, ExecutorDecommissionInfo(msg, false)))
275+
executorId, ExecutorDecommissionInfo(msg)))
279276
} else {
280277
logError("No driver to message decommissioning.")
281278
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,16 +1890,6 @@ package object config {
18901890
.timeConf(TimeUnit.SECONDS)
18911891
.createOptional
18921892

1893-
private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL =
1894-
ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL")
1895-
.doc("Duration for which a decommissioned executor's information will be kept after its" +
1896-
"removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " +
1897-
"decommissioning even after the mapper executor has been decommissioned. This allows " +
1898-
"eager recovery from fetch failures caused by decommissioning, increasing job robustness.")
1899-
.version("3.1.0")
1900-
.timeConf(TimeUnit.SECONDS)
1901-
.createWithDefaultString("5m")
1902-
19031893
private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
19041894
.doc("Staging directory used while submitting applications.")
19051895
.version("2.0.0")

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler(
18261826
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
18271827
val isHostDecommissioned = taskScheduler
18281828
.getExecutorDecommissionState(bmAddress.executorId)
1829-
.exists(_.isHostDecommissioned)
1829+
.exists(_.hostOpt.isDefined)
18301830

18311831
// Shuffle output of all executors on host `bmAddress.host` may be lost if:
18321832
// - External shuffle service is enabled, so we assume that all shuffle data on node is
@@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler(
19891989
*/
19901990
private[scheduler] def handleExecutorLost(
19911991
execId: String,
1992-
workerLost: Boolean): Unit = {
1992+
hostOpt: Option[String]): Unit = {
19931993
// if the cluster manager explicitly tells us that the entire worker was lost, then
19941994
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
19951995
// from a Standalone cluster, where the shuffle service lives in the Worker.)
1996-
val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
1996+
val fileLost = hostOpt.isDefined || !env.blockManager.externalShuffleServiceEnabled
19971997
removeExecutorAndUnregisterOutputs(
19981998
execId = execId,
19991999
fileLost = fileLost,
2000-
hostToUnregisterOutputs = None,
2000+
hostToUnregisterOutputs = hostOpt,
20012001
maybeEpoch = None)
20022002
}
20032003

@@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
23662366
dagScheduler.handleExecutorAdded(execId, host)
23672367

23682368
case ExecutorLost(execId, reason) =>
2369-
val workerLost = reason match {
2370-
case ExecutorProcessLost(_, true, _) => true
2371-
case _ => false
2369+
val hostOpt = reason match {
2370+
case ExecutorProcessLost(_, host, _) => host
2371+
case ExecutorDecommission(host) => host
2372+
case _ => None
23722373
}
2373-
dagScheduler.handleExecutorLost(execId, workerLost)
2374+
dagScheduler.handleExecutorLost(execId, hostOpt)
23742375

23752376
case WorkerRemoved(workerId, host, message) =>
23762377
dagScheduler.handleWorkerRemoved(workerId, host, message)

core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler
2020
/**
2121
* Message providing more detail when an executor is being decommissioned.
2222
* @param message Human readable reason for why the decommissioning is happening.
23-
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
24-
* being decommissioned too. Used to infer if the shuffle data might
25-
* be lost even if the external shuffle service is enabled.
23+
* @param hostOpt When hostOpt is defined. It means the host (aka the `node` or `worker`
24+
* in other places) has been decommissioned too. Used to infer if the
25+
* shuffle data might be lost even if the external shuffle service is enabled.
2626
*/
2727
private[spark]
28-
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
28+
case class ExecutorDecommissionInfo(message: String, hostOpt: Option[String] = None)
2929

3030
/**
3131
* State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
@@ -37,4 +37,4 @@ case class ExecutorDecommissionState(
3737
// to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
3838
// is configured.
3939
startTime: Long,
40-
isHostDecommissioned: Boolean)
40+
hostOpt: Option[String] = None)

core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
5353

5454
/**
5555
* @param _message human readable loss reason
56-
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
56+
* @param hostOpt it's defined when the host is confirmed lost too (i.e. including shuffle service)
5757
* @param causedByApp whether the loss of the executor is the fault of the running app.
5858
* (assumed true by default unless known explicitly otherwise)
5959
*/
6060
private[spark]
6161
case class ExecutorProcessLost(
6262
_message: String = "Executor Process Lost",
63-
workerLost: Boolean = false,
63+
hostOpt: Option[String] = None,
6464
causedByApp: Boolean = true)
6565
extends ExecutorLossReason(_message)
6666

@@ -69,5 +69,8 @@ case class ExecutorProcessLost(
6969
*
7070
* This is used by the task scheduler to remove state associated with the executor, but
7171
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
72+
*
73+
* @param hostOpt it will be set by [[TaskSchedulerImpl]] when the host is decommissioned too
7274
*/
73-
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
75+
private [spark] case class ExecutorDecommission(var hostOpt: Option[String] = None)
76+
extends ExecutorLossReason("Executor decommission.")

0 commit comments

Comments
 (0)