-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark #27636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0ffdcbb
c745519
f69c217
c5ce9a7
e56ac55
d77248e
5dd1dce
64f5402
1acf665
e642754
c476e52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1873,4 +1873,67 @@ package object config { | |
| .version("3.1.0") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val GRACEFUL_DECOMMISSION_ENABLE = | ||
| ConfigBuilder("spark.graceful.decommission.enable") | ||
| .doc("Whether to enable the node graceful decommissioning handling") | ||
| .version("3.1.0") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = | ||
|
||
| ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold") | ||
| .doc("Threshold of number of times fetchfailed ignored due to node" + | ||
| "decommission.This is configurable as per the need of the user and" + | ||
| "depending upon type of the cloud. If we keep this a large value and " + | ||
| "there is continuous decommission of nodes, in those scenarios stage" + | ||
| "will never abort and keeps on retrying in an unbounded manner.") | ||
| .version("3.1.0") | ||
| .intConf | ||
| .createWithDefault(8) | ||
|
|
||
| private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT = | ||
| ConfigBuilder("spark.graceful.decommission.executor.leasetimePct") | ||
| .doc("Percentage of time to expiry after which executors are killed " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what this mean. percentage of what time to expiry? We get a notification that node being decommissioned and I assume this is how long to let executor run before killing it, but it doesn't say what this is a percentage of |
||
| "(if enabled) on the node. Value ranges between (0-100)") | ||
| .version("3.1.0") | ||
| .intConf | ||
| .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") | ||
| .createWithDefault(50) // Pulled out of thin air. | ||
|
|
||
| private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = | ||
|
||
| ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") | ||
| .doc("Percentage of time to expiry after which shuffle data " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar here we should describe what this is a percentage of. |
||
| "cleaned up (if enabled) on the node. Value ranges between (0-100)" + | ||
| "This value is always greater than or equal to executor" + | ||
| "leaseTime (is set to be equal if incorrectly configured)." + | ||
| "Near 0% would mean generated data is marked as lost too early." + | ||
| "Too close to 100 would shuffle data may not get cleared proactively" + | ||
| "leading to tasks going into fetchFail scenarios") | ||
| .version("3.1.0") | ||
| .intConf | ||
| .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") | ||
| .createWithDefault(90) // Pulled out of thin air. | ||
|
|
||
| private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC = | ||
|
||
| ConfigBuilder("spark.graceful.decommission.min.termination.time") | ||
| .doc("Minimum time to termination below which node decommissioning is performed " + | ||
| "immediately. If decommissioning time is less than the " + | ||
| "configured time(spark.graceful.decommission.min.termination.time)," + | ||
| "than in that scenario the executor decommissioning and shuffle data clean up will " + | ||
| "take place immediately.First the executor decommission than the " + | ||
| "shuffle data clean up.") | ||
| .version("3.1.0") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createWithDefaultString("60s") | ||
|
|
||
| private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT = | ||
| ConfigBuilder("spark.graceful.decommission.node.timeout") | ||
|
||
| .doc("Interval in seconds after which the node is decommissioned in case aws spotloss" + | ||
| "the time is approximately 110s and in case of GCP preemptible VMs this is around 30s" + | ||
| "this config can be changed according to node type in the public cloud. This will" + | ||
| "be applied if the decommission timeout is not sent by the Resource Manager") | ||
| .version("3.1.0") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createWithDefaultString("110s") | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -222,6 +222,13 @@ private[spark] class DAGScheduler( | |
| private val maxFailureNumTasksCheck = sc.getConf | ||
| .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) | ||
|
|
||
| /** | ||
| * Threshold to try number of times the ignore the fetch failed | ||
| * due to decommissioning of nodes | ||
| */ | ||
| private val maxIgnoredFailedStageAttempts = sc.getConf | ||
| .get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD) | ||
|
|
||
| private val messageScheduler = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") | ||
|
|
||
|
|
@@ -289,6 +296,13 @@ private[spark] class DAGScheduler( | |
| eventProcessLoop.post(WorkerRemoved(workerId, host, message)) | ||
| } | ||
|
|
||
| /** | ||
| * Called by DecommissionTracker when node is decommissioned | ||
| */ | ||
| def nodeDecommissioned(host: String): Unit = { | ||
| eventProcessLoop.post(NodeDecommissioned(host)) | ||
| } | ||
|
|
||
| /** | ||
| * Called by TaskScheduler implementation when a host is added. | ||
| */ | ||
|
|
@@ -1617,10 +1631,27 @@ private[spark] class DAGScheduler( | |
| s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + | ||
| s"(attempt ${failedStage.latestInfo.attemptNumber}) running") | ||
| } else { | ||
| // Gracefully handling the stage abort due to fetch failure in the | ||
| // decommission nodes | ||
| if (!event.reason.asInstanceOf[FetchFailed].countTowardsDecommissionStageFailures) { | ||
| // Ignore stage attempts due to fetch failed only | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think comment should specifically say for decommission tracking.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| // once per attempt due to nodes decommissioning event | ||
| if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) { | ||
| failedStage.ignoredDecommissionFailedStage += 1 | ||
| DecommissionTracker.incrFetchFailIgnoreCnt() | ||
|
|
||
| logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + | ||
| s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + | ||
| s""""totalIgnoredAttempts":"${failedStage.ignoredDecommissionFailedStage}",""" + | ||
| s""""node":"$bmAddress"}""") | ||
| } | ||
| } | ||
| failedStage.failedAttemptIds.add(task.stageAttemptId) | ||
| val shouldAbortStage = | ||
| failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || | ||
| disallowStageRetryForTest | ||
| val shouldAbortStage = failedStage.failedAttemptIds.size >= | ||
| (maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || | ||
| disallowStageRetryForTest || | ||
| failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts | ||
|
|
||
|
|
||
| // It is likely that we receive multiple FetchFailed for a single stage (because we have | ||
| // multiple tasks running concurrently on different executors). In that case, it is | ||
|
|
@@ -1661,6 +1692,10 @@ private[spark] class DAGScheduler( | |
| } | ||
|
|
||
| if (shouldAbortStage) { | ||
| if (failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts | ||
| && DecommissionTracker.isDecommissionEnabled(sc.getConf)) { | ||
| DecommissionTracker.setFetchFailIgnoreCntThresholdFlag(true) | ||
| } | ||
| val abortMessage = if (disallowStageRetryForTest) { | ||
| "Fetch failure will not retry stage due to testing config" | ||
| } else { | ||
|
|
@@ -1823,9 +1858,10 @@ private[spark] class DAGScheduler( | |
| failedStage.failedAttemptIds.add(task.stageAttemptId) | ||
| // TODO Refactor the failure handling logic to combine similar code with that of | ||
| // FetchFailed. | ||
| val shouldAbortStage = | ||
| failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || | ||
| disallowStageRetryForTest | ||
| val shouldAbortStage = failedStage.failedAttemptIds.size >= | ||
| (maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || | ||
| disallowStageRetryForTest || | ||
| failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts | ||
|
|
||
| if (shouldAbortStage) { | ||
| val abortMessage = if (disallowStageRetryForTest) { | ||
|
|
@@ -1980,6 +2016,18 @@ private[spark] class DAGScheduler( | |
| clearCacheLocs() | ||
| } | ||
|
|
||
| /** | ||
| * Remove shuffle data mapping when node is decomissioned. | ||
| * | ||
| * @param host host of the node that is decommissioned | ||
| */ | ||
| private[scheduler] def handleNodeDecommissioned(host: String) { | ||
|
||
| logInfo(s"Marking shuffle files lost on the decommissioning host $host") | ||
| mapOutputTracker.removeOutputsOnHost(host) | ||
| clearCacheLocs() | ||
| } | ||
|
|
||
|
|
||
| private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { | ||
| // remove from failedEpoch(execId) ? | ||
| if (failedEpoch.contains(execId)) { | ||
|
|
@@ -2281,6 +2329,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler | |
| case WorkerRemoved(workerId, host, message) => | ||
| dagScheduler.handleWorkerRemoved(workerId, host, message) | ||
|
|
||
| case NodeDecommissioned(host) => | ||
| dagScheduler.handleNodeDecommissioned(host) | ||
|
|
||
| case BeginEvent(task, taskInfo) => | ||
| dagScheduler.handleBeginEvent(task, taskInfo) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to handle this with
spark.worker.decommission.enabledor not? I'm not sure just for discussion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this specific to yarn or been tied in with previous work for all cluster managers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also all the configs need to be documented in configuration.md