Skip to content

[WIP][SPARK-20628][CORE] Blacklist nodes when they transition to DECOMMISSIONING state in YARN #19267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/HostState.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.hadoop.yarn.api.records.NodeState

private[spark] object HostState extends Enumeration {

type HostState = Value

val New, Running, Unhealthy, Decommissioning, Decommissioned, Lost, Rebooted = Value

def fromYarnState(state: String): Option[HostState] = {
HostState.values.find(_.toString.toUpperCase == state)
}

def toYarnState(state: HostState): Option[String] = {
NodeState.values.find(_.name == state.toString.toUpperCase).map(_.name)
}
}
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ package object config {
ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
.booleanConf
.createWithDefault(false)

private[spark] val BLACKLIST_DECOMMISSIONING_ENABLED =
ConfigBuilder("spark.blacklist.decommissioning.enabled")
.booleanConf
.createWithDefault(false)

private[spark] val BLACKLIST_DECOMMISSIONING_TIMEOUT_CONF =
ConfigBuilder("spark.blacklist.decommissioning.timeout")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
// End blacklist confs

private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
Expand Down
172 changes: 137 additions & 35 deletions core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ private[scheduler] class BlacklistTracker (
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)
val BLACKLIST_DECOMMISSIONING_TIMEOUT_MILLIS =
BlacklistTracker.getBlacklistDecommissioningTimeout(conf)
private val TASK_BLACKLISTING_ENABLED = BlacklistTracker.isTaskExecutionBlacklistingEnabled(conf)
private val DECOMMISSIONING_BLACKLISTING_ENABLED =
BlacklistTracker.isDecommissioningBlacklistingEnabled(conf)
private val BLACKLIST_FETCH_FAILURE_ENABLED =
BlacklistTracker.isFetchFailureBlacklistingEnabled(conf)

/**
* A map from executorId to information on task failures. Tracks the time of each task failure,
Expand Down Expand Up @@ -89,13 +95,13 @@ private[scheduler] class BlacklistTracker (
* successive blacklisted executors on one node. Nonetheless, it will not grow too large because
* there cannot be many blacklisted executors on one node, before we stop requesting more
* executors on that node, and we clean up the list of blacklisted executors once an executor has
* been blacklisted for BLACKLIST_TIMEOUT_MILLIS.
* been blacklisted for its configured blacklisting timeout.
*/
val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]()

/**
* Un-blacklists executors and nodes that have been blacklisted for at least
* BLACKLIST_TIMEOUT_MILLIS
* Un-blacklists executors and nodes that have been blacklisted for at least its configured
* blacklisting timeout
*/
def applyBlacklistTimeout(): Unit = {
val now = clock.getTimeMillis()
Expand All @@ -118,16 +124,9 @@ private[scheduler] class BlacklistTracker (
}
}
val nodesToUnblacklist = nodeIdToBlacklistExpiryTime.filter(_._2 < now).keys
if (nodesToUnblacklist.nonEmpty) {
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
s"has timed out")
nodesToUnblacklist.foreach { node =>
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
.map(node => (node, BlacklistTimedOut, Some(now)))
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
removeNodesFromBlacklist(nodesToUnblacklist)
updateNextExpiryTime()
}
}
Expand Down Expand Up @@ -190,14 +189,8 @@ private[scheduler] class BlacklistTracker (
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS

if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (!nodeIdToBlacklistExpiryTime.contains(host)) {
logInfo(s"blacklisting node $host due to fetch failure of external shuffle service")

nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
if (addNodeToBlacklist(host, FetchFailure(host), now)) {
killExecutorsOnBlacklistedNode(host)
updateNextExpiryTime()
}
} else if (!executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor $exec due to fetch failure")
Expand Down Expand Up @@ -249,21 +242,93 @@ private[scheduler] class BlacklistTracker (
// node, and potentially put the entire node into a blacklist as well.
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
blacklistedExecsOnNode += exec
// If the node is already in the blacklist, we avoid adding it again with a later expiry
// time.
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(node)
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) {
val blacklistSucceeded = addNodeToBlacklist(node,
ExecutorFailures(Set(blacklistedExecsOnNode.toList: _*)), now)
if (blacklistSucceeded) {
killExecutorsOnBlacklistedNode(node)
}
}
}
}
}

/**
* Add nodes to Blacklist, with a specific timeout depending upon the reason. If the node is
* already in the Blacklist, it is not added again.
* @param node Node to be blacklisted
* @param reason Reason for blacklisting the node
* @param time Optional start time on which to compute the blacklist expiry time
* @return boolean value indicating whether node was added to blacklist or not
*/
def addNodeToBlacklist(node: String, reason: NodeBlacklistReason,
time: Long = clock.getTimeMillis()): Boolean = {
// If the node is already in the blacklist, we avoid adding it again with a later expiry time.
if (!isNodeBlacklisted(node)) {
val blacklistExpiryTimeOpt = reason match {
case NodeDecommissioning if DECOMMISSIONING_BLACKLISTING_ENABLED =>
val expiryTime = time + BLACKLIST_DECOMMISSIONING_TIMEOUT_MILLIS
logInfo(s"Blacklisting node $node with timeout $expiryTime ms because ${reason.message}")
Some(expiryTime)

case ExecutorFailures(blacklistedExecutors) if TASK_BLACKLISTING_ENABLED =>
val expiryTime = time + BLACKLIST_TIMEOUT_MILLIS
logInfo(s"Blacklisting node $node with timeout $expiryTime ms because it " +
s"has ${blacklistedExecutors.size} executors blacklisted: ${blacklistedExecutors}")
Some(expiryTime)

case FetchFailure(host) if BLACKLIST_FETCH_FAILURE_ENABLED =>
val expiryTime = time + BLACKLIST_TIMEOUT_MILLIS
logInfo(s"Blacklisting node $host due to fetch failure of external shuffle service")
Some(expiryTime)

case _ => None
}

blacklistExpiryTimeOpt.fold(false) { blacklistExpiryTime =>
blacklistNodeHelper(node, blacklistExpiryTime)
listenerBus.post(SparkListenerNodeBlacklisted(time, node, reason))
updateNextExpiryTime()
true
}
}
else {
false
}
}

private def blacklistNodeHelper(node: String, blacklistExpiryTimeout: Long): Unit = {
nodeIdToBlacklistExpiryTime.put(node, blacklistExpiryTimeout)
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}

private def unblacklistNodesHelper(nodes: Iterable[String]): Unit = {
nodeIdToBlacklistExpiryTime --= nodes
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}

/**
* @param nodesToRemove List of nodes to unblacklist, with there reason for unblacklisting
* and an optional time to be passed to Spark Listener indicating the
* time of unblacklist.
*/
def removeNodesFromBlacklist(nodesToRemove: Iterable[(String, NodeUnblacklistReason,
Option[Long])]): Unit = {
if (nodesToRemove.nonEmpty) {
val blacklistNodesToRemove = nodesToRemove.filter{ case (node, reason, _) =>
(reason == BlacklistTimedOut ||
(reason == NodeRunning && DECOMMISSIONING_BLACKLISTING_ENABLED)) &&
isNodeBlacklisted(node)
}
unblacklistNodesHelper(blacklistNodesToRemove.map(_._1))
blacklistNodesToRemove.foreach(node => {
logInfo(s"Removing node $node from blacklist because ${node._2.message}")
listenerBus.post(SparkListenerNodeUnblacklisted(
node._3.getOrElse(clock.getTimeMillis()), node._1, node._2))
})
}
}

def isExecutorBlacklisted(executorId: String): Boolean = {
executorIdToBlacklistStatus.contains(executorId)
}
Expand Down Expand Up @@ -373,15 +438,39 @@ private[scheduler] class BlacklistTracker (
private[scheduler] object BlacklistTracker extends Logging {

private val DEFAULT_TIMEOUT = "1h"
private val DEFAULT_DECOMMISSIONING_TIMEOUT = "1h"

/**
* Returns true if the blacklist is enabled, based on checking the configuration in the following
* order:
* Returns true if the task execution blacklist, fetch failure blacklist,
* or decommission blacklisting are enabled
*/
def isBlacklistEnabled(conf: SparkConf): Boolean = {
isFetchFailureBlacklistingEnabled(conf) || isDecommissioningBlacklistingEnabled(conf) ||
isTaskExecutionBlacklistingEnabled(conf)
}

/**
* Returns true if the fetch failure blacklisting is enabled
*/
def isFetchFailureBlacklistingEnabled(conf: SparkConf): Boolean = {
conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)
}

/**
* Returns true if the decommission blacklisting is enabled
*/
def isDecommissioningBlacklistingEnabled(conf: SparkConf): Boolean = {
conf.get(config.BLACKLIST_DECOMMISSIONING_ENABLED)
}

/**
* Returns true if the task execution blacklist is enabled, based on checking the configuration
* in the following order:
* 1. Is it specifically enabled or disabled?
* 2. Is it enabled via the legacy timeout conf?
* 3. Default is off
*/
def isBlacklistEnabled(conf: SparkConf): Boolean = {
def isTaskExecutionBlacklistingEnabled(conf: SparkConf): Boolean = {
conf.get(config.BLACKLIST_ENABLED) match {
case Some(enabled) =>
enabled
Expand Down Expand Up @@ -409,6 +498,11 @@ private[scheduler] object BlacklistTracker extends Logging {
}
}

def getBlacklistDecommissioningTimeout(conf: SparkConf): Long = {
conf.get(config.BLACKLIST_DECOMMISSIONING_TIMEOUT_CONF)
.getOrElse(Utils.timeStringAsMs(DEFAULT_DECOMMISSIONING_TIMEOUT))
}

/**
* Verify that blacklist configurations are consistent; if not, throw an exception. Should only
* be called if blacklisting is enabled.
Expand Down Expand Up @@ -449,6 +543,12 @@ private[scheduler] object BlacklistTracker extends Logging {
}
}

val blacklistDecommissioningTimeout = getBlacklistDecommissioningTimeout(conf)
if (blacklistDecommissioningTimeout <= 0) {
mustBePos(config.BLACKLIST_DECOMMISSIONING_TIMEOUT_CONF.key,
blacklistDecommissioningTimeout.toString)
}

val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)

Expand All @@ -458,7 +558,9 @@ private[scheduler] object BlacklistTracker extends Logging {
s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
s"Spark will not be robust to one bad node. Decrease " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}, " +
s"${config.BLACKLIST_DECOMMISSIONING_ENABLED.key} " +
s"and ${config.BLACKLIST_FETCH_FAILURE_ENABLED.key}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

/**
* Represents an explanation for a Node being blacklisted for task scheduling
*/
@DeveloperApi
private[spark] sealed trait NodeBlacklistReason extends Serializable {
def message: String
}

@DeveloperApi
private[spark] case class ExecutorFailures(blacklistedExecutors: Set[String])
extends NodeBlacklistReason {
override def message: String = "Maximum number of executor failures allowed on Node exceeded."
}

@DeveloperApi
private[spark] case object NodeDecommissioning extends NodeBlacklistReason {
override def message: String = "Node is being decommissioned by Cluster Manager."
}

@DeveloperApi
private[spark] case class FetchFailure(host: String) extends NodeBlacklistReason {
override def message: String = s"Fetch failure for host $host"
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

/**
* Represents an explanation for a Node being unblacklisted for task scheduling.
*/
@DeveloperApi
private[spark] sealed trait NodeUnblacklistReason extends Serializable {
def message: String
}

@DeveloperApi
private[spark] object BlacklistTimedOut extends NodeUnblacklistReason {
override def message: String = "Blacklist timeout has reached."
}

@DeveloperApi
private[spark] object NodeRunning extends NodeUnblacklistReason {
override def message: String = "Node is active and back to Running state."
}
Loading