Skip to content

[SPARK-32217] Plumb whether a worker would also be decommissioned along with executor #29032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.util.{RpcUtils, ThreadUtils}

/**
Expand Down Expand Up @@ -181,7 +182,8 @@ private[spark] class StandaloneAppClient(
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId, message.getOrElse(""))
listener.executorDecommissioned(fullId,
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy.client

import org.apache.spark.scheduler.ExecutorDecommissionInfo

/**
* Callbacks invoked by deploy client when various events happen. There are currently five events:
* connecting to the cluster, disconnecting, being given an executor, having an executor removed
Expand All @@ -39,7 +41,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit

def executorDecommissioned(fullId: String, message: String): Unit
def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

def workerRemoved(workerId: String, host: String, message: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,10 @@ private[deploy] class Master(
logInfo("Telling app of decommission executors")
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None, workerLost = false))
Some("worker decommissioned"), None,
// workerLost is being set to true here to let the driver know that the host (aka. worker)
// is also being decommissioned.
workerLost = true))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
Expand Down Expand Up @@ -166,11 +166,15 @@ private[spark] class CoarseGrainedExecutorBackend(
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
if (decommissioned) {
logError("Asked to launch a task while decommissioned.")
val msg = "Asked to launch a task while decommissioned."
logError(msg)
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(DecommissionExecutor(executorId))
endpoint.send(
DecommissionExecutor(
executorId,
ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
case _ =>
logError("No registered driver to send Decommission to.")
}
Expand Down Expand Up @@ -259,12 +263,14 @@ private[spark] class CoarseGrainedExecutorBackend(
}

private def decommissionSelf(): Boolean = {
logInfo("Decommissioning self w/sync")
val msg = "Decommissioning self w/sync"
logInfo(msg)
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
driver.get.askSync[Boolean](DecommissionExecutor(
executorId, ExecutorDecommissionInfo(msg, false)))
} else {
logError("No driver to message decommissioning.")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

/**
* Provides more detail when an executor is being decommissioned.
* @param message Human readable reason for why the decommissioning is happening.
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
* being decommissioned too. Used to infer if the shuffle data might
* be lost even if the external shuffle service is enabled.
*/
private[spark]
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[spark] trait TaskScheduler {
/**
* Process a decommissioning executor.
*/
def executorDecommission(executorId: String): Unit
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

/**
* Process a lost executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,8 @@ private[spark] class TaskSchedulerImpl(
}
}

override def executorDecommission(executorId: String): Unit = {
override def executorDecommission(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
rootPool.executorDecommission(executorId)
backend.reviveOffers()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.util.SerializableBuffer

Expand Down Expand Up @@ -94,7 +95,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage

case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage
case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo)
extends CoarseGrainedClusterMessage

case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)

case DecommissionExecutor(executorId) =>
logError(s"Received decommission executor message ${executorId}.")
decommissionExecutor(executorId)
case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
decommissionExecutor(executorId, decommissionInfo)

case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
Expand Down Expand Up @@ -272,9 +272,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeWorker(workerId, host, message)
context.reply(true)

case DecommissionExecutor(executorId) =>
logError(s"Received decommission executor message ${executorId}.")
decommissionExecutor(executorId)
case DecommissionExecutor(executorId, decommissionInfo) =>
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
decommissionExecutor(executorId, decommissionInfo)
context.reply(true)

case RetrieveSparkAppConfig(resourceProfileId) =>
Expand Down Expand Up @@ -422,7 +422,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Mark a given executor as decommissioned and stop making resource offers for it.
*/
private def decommissionExecutor(executorId: String): Boolean = {
private def decommissionExecutor(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
Expand All @@ -436,7 +437,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (shouldDisable) {
logInfo(s"Starting decommissioning executor $executorId.")
try {
scheduler.executorDecommission(executorId)
scheduler.executorDecommission(executorId, decommissionInfo)
} catch {
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
Expand Down Expand Up @@ -590,10 +591,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Called by subclasses when notified of a decommissioning executor.
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
private[spark] def decommissionExecutor(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
if (driverEndpoint != null) {
logInfo("Propagating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId))
driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ private[spark] class StandaloneSchedulerBackend(
removeExecutor(fullId.split("/")(1), reason)
}

override def executorDecommissioned(fullId: String, message: String) {
override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
logInfo("Asked to decommission executor")
decommissionExecutor(fullId.split("/")(1))
logInfo("Executor %s decommissioned: %s".format(fullId, message))
decommissionExecutor(fullId.split("/")(1), decommissionInfo)
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
}

override def workerRemoved(workerId: String, host: String, message: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.client

import java.io.Closeable
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}

import scala.concurrent.duration._

Expand All @@ -32,6 +32,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, Master}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -126,7 +127,10 @@ class AppClientSuite
// Decommissioning is async.
eventually(timeout(1.seconds), interval(10.millis)) {
// We only record decommissioning for the executor we've requested
assert(ci.listener.execDecommissionedList.size === 1)
assert(ci.listener.execDecommissionedMap.size === 1)
val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId)
assert(decommissionInfo != null && decommissionInfo.isHostDecommissioned,
s"$executorId should have been decommissioned along with its worker")
}

// Send request to kill executor, verify request was made
Expand Down Expand Up @@ -215,7 +219,7 @@ class AppClientSuite
val deadReasonList = new ConcurrentLinkedQueue[String]()
val execAddedList = new ConcurrentLinkedQueue[String]()
val execRemovedList = new ConcurrentLinkedQueue[String]()
val execDecommissionedList = new ConcurrentLinkedQueue[String]()
val execDecommissionedMap = new ConcurrentHashMap[String, ExecutorDecommissionInfo]()

def connected(id: String): Unit = {
connectedIdList.add(id)
Expand Down Expand Up @@ -245,8 +249,9 @@ class AppClientSuite
execRemovedList.add(id)
}

def executorDecommissioned(id: String, message: String): Unit = {
execDecommissionedList.add(id)
def executorDecommissioned(id: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
val previousDecommissionInfo = execDecommissionedMap.putIfAbsent(id, decommissionInfo)
assert(previousDecommissionInfo === null, s"Expected no previous decommission info for $id")
}

def workerRemoved(workerId: String, host: String, message: String): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorDecommission(executorId: String) = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def executorDecommission(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo): Unit = {}
}

/**
Expand Down Expand Up @@ -715,10 +717,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true
override def executorDecommission(executorId: String): Unit = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def executorDecommission(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ private class DummyTaskScheduler extends TaskScheduler {
override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorDecommission(executorId: String): Unit = {}
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
Expand All @@ -99,4 +98,7 @@ private class DummyTaskScheduler extends TaskScheduler {
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorMetrics: Map[(Int, Int), ExecutorMetrics]): Boolean = true
override def executorDecommission(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte

val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
sc.getExecutorIds().tail.foreach { id =>
sched.decommissionExecutor(id)
sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false))
assert(rdd3.sortByKey().collect().length === 100)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
execs.foreach(execId => sched.decommissionExecutor(execId))
execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false)))
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
// Try and launch task after decommissioning, this should fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS

val execToDecommission = execs.head
logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false))

// Wait for job to finish.
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
Expand Down