Skip to content

Commit dca8380

Browse files
author
Marcelo Vanzin
committed
[SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation
The issue here is that when Spark is downscaling the application and deletes a few pod requests that aren't needed anymore, it may actually race with the K8S scheduler, who may be bringing up those executors. So they may have enough time to connect back to the driver, register, to just be deleted soon after. This wastes resources and causes misleading entries in the driver log. The change (ab)uses the blacklisting mechanism to consider the deleted excess pods as blacklisted, so that if they try to connect back, the driver will deny it. It also changes the executor registration slightly, since even with the above change there were misleading logs. That was because the executor registration message was an RPC that always succeeded (bar network issues), so the executor would always try to send an unregistration message to the driver, which would then log several messages about not knowing anything about the executor. The change makes the registration RPC succeed or fail directly, instead of using the separate failure message that would lead to this issue. Note the last change required some changes in a standalone test suite related to dynamic allocation, since it relied on the driver not throwing exceptions when a duplicate executor registration happened. Tested with existing unit tests, and with live cluster with dyn alloc on. Closes #26586 from vanzin/SPARK-29950. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 6e5b4bf commit dca8380

File tree

9 files changed

+105
-43
lines changed

9 files changed

+105
-43
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend(
5454
resourcesFileOpt: Option[String])
5555
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
5656

57+
import CoarseGrainedExecutorBackend._
58+
5759
private implicit val formats = DefaultFormats
5860

5961
private[this] val stopping = new AtomicBoolean(false)
@@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend(
8082
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
8183
extractAttributes, resources))
8284
}(ThreadUtils.sameThread).onComplete {
83-
// This is a very fast action so we can use "ThreadUtils.sameThread"
84-
case Success(msg) =>
85-
// Always receive `true`. Just ignore it
85+
case Success(_) =>
86+
self.send(RegisteredExecutor)
8687
case Failure(e) =>
8788
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
8889
}(ThreadUtils.sameThread)
@@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend(
133134
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
134135
}
135136

136-
case RegisterExecutorFailed(message) =>
137-
exitExecutor(1, "Slave registration failed: " + message)
138-
139137
case LaunchTask(data) =>
140138
if (executor == null) {
141139
exitExecutor(1, "Received LaunchTask command but executor was null")
@@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend(
226224

227225
private[spark] object CoarseGrainedExecutorBackend extends Logging {
228226

227+
// Message used internally to start the executor when the driver successfully accepted the
228+
// registration request.
229+
case object RegisteredExecutor
230+
229231
case class Arguments(
230232
driverUrl: String,
231233
executorId: String,

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
4848
case class KillExecutorsOnHost(host: String)
4949
extends CoarseGrainedClusterMessage
5050

51-
sealed trait RegisterExecutorResponse
52-
53-
case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
54-
55-
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
56-
with RegisterExecutorResponse
57-
5851
case class UpdateDelegationTokens(tokens: Array[Byte])
5952
extends CoarseGrainedClusterMessage
6053

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
207207
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
208208
attributes, resources) =>
209209
if (executorDataMap.contains(executorId)) {
210-
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
211-
context.reply(true)
212-
} else if (scheduler.nodeBlacklist.contains(hostname)) {
210+
context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
211+
} else if (scheduler.nodeBlacklist.contains(hostname) ||
212+
isBlacklisted(executorId, hostname)) {
213213
// If the cluster manager gives us an executor on a blacklisted node (because it
214214
// already started allocating those resources before we informed it of our blacklist,
215215
// or if it ignored our blacklist), then we reject that executor immediately.
216216
logInfo(s"Rejecting $executorId as it has been blacklisted.")
217-
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
218-
context.reply(true)
217+
context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
219218
} else {
220219
// If the executor's rpc env is not listening for incoming connections, `hostPort`
221220
// will be null, and the client connection should be used to contact the executor.
@@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
250249
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
251250
}
252251
}
253-
executorRef.send(RegisteredExecutor)
254252
// Note: some tests expect the reply to come after we put the executor in the map
255253
context.reply(true)
256254
listenerBus.post(
@@ -776,6 +774,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
776774

777775
protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
778776

777+
/**
778+
* Checks whether the executor is blacklisted. This is called when the executor tries to
779+
* register with the scheduler, and will deny registration if this method returns true.
780+
*
781+
* This is in addition to the blacklist kept by the task scheduler, so custom implementations
782+
* don't need to check there.
783+
*/
784+
protected def isBlacklisted(executorId: String, hostname: String): Boolean = false
785+
779786
// SPARK-27112: We need to ensure that there is ordering of lock acquisition
780787
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
781788
// the deadlock issue exposed in SPARK-27112

core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
2626
import org.scalatest.concurrent.Eventually._
2727

2828
import org.apache.spark._
29-
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
29+
import org.apache.spark.deploy.DeployMessages._
3030
import org.apache.spark.deploy.master.ApplicationInfo
3131
import org.apache.spark.deploy.master.Master
3232
import org.apache.spark.deploy.worker.Worker
3333
import org.apache.spark.internal.config
3434
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
3535
import org.apache.spark.scheduler.TaskSchedulerImpl
3636
import org.apache.spark.scheduler.cluster._
37-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed}
37+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor}
3838

3939
/**
4040
* End-to-end tests for dynamic allocation in standalone mode.
@@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite
482482
assert(apps.head.getExecutorLimit === Int.MaxValue)
483483
}
484484
val beforeList = getApplications().head.executors.keys.toSet
485-
assert(killExecutorsOnHost(sc, "localhost").equals(true))
486-
487485
syncExecutors(sc)
488-
val afterList = getApplications().head.executors.keys.toSet
486+
487+
sc.schedulerBackend match {
488+
case b: CoarseGrainedSchedulerBackend =>
489+
b.killExecutorsOnHost("localhost")
490+
case _ => fail("expected coarse grained scheduler")
491+
}
489492

490493
eventually(timeout(10.seconds), interval(100.millis)) {
494+
val afterList = getApplications().head.executors.keys.toSet
491495
assert(beforeList.intersect(afterList).size == 0)
492496
}
493497
}
@@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite
514518
val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv)
515519
try {
516520
scheduler.start()
517-
scheduler.driverEndpoint.ask[Boolean](message)
518-
eventually(timeout(10.seconds), interval(100.millis)) {
519-
verify(endpointRef).send(RegisterExecutorFailed(any()))
521+
val e = intercept[SparkException] {
522+
scheduler.driverEndpoint.askSync[Boolean](message)
520523
}
524+
assert(e.getCause().isInstanceOf[IllegalStateException])
525+
assert(scheduler.getExecutorIds().isEmpty)
521526
} finally {
522527
scheduler.stop()
523528
}
@@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite
536541
.setMaster(masterRpcEnv.address.toSparkURL)
537542
.setAppName("test")
538543
.set(config.EXECUTOR_MEMORY.key, "256m")
544+
// Because we're faking executor launches in the Worker, set the config so that the driver
545+
// will not timeout anything related to executors.
546+
.set(config.Network.NETWORK_TIMEOUT.key, "2h")
547+
.set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h")
548+
.set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h")
539549
}
540550

541551
/** Make a master to which our application will send executor requests. */
@@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite
549559
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
550560
(0 until numWorkers).map { i =>
551561
val rpcEnv = workerRpcEnvs(i)
552-
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
553-
Worker.ENDPOINT_NAME, null, conf, securityManager)
562+
val worker = new TestWorker(rpcEnv, cores, memory)
554563
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
555564
worker
556565
}
@@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite
588597
}
589598
}
590599

591-
/** Kill the executors on a given host. */
592-
private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
593-
syncExecutors(sc)
594-
sc.schedulerBackend match {
595-
case b: CoarseGrainedSchedulerBackend =>
596-
b.killExecutorsOnHost(host)
597-
case _ => fail("expected coarse grained scheduler")
598-
}
599-
}
600-
601600
/**
602601
* Return a list of executor IDs belonging to this application.
603602
*
@@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite
620619
* we submit a request to kill them. This must be called before each kill request.
621620
*/
622621
private def syncExecutors(sc: SparkContext): Unit = {
623-
val driverExecutors = sc.env.blockManager.master.getStorageStatus
624-
.map(_.blockManagerId.executorId)
625-
.filter { _ != SparkContext.DRIVER_IDENTIFIER}
622+
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
623+
val driverExecutors = backend.getExecutorIds()
626624
val masterExecutors = getExecutorIds(sc)
627625
val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
628626
missingExecutors.foreach { id =>
@@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite
632630
when(endpointRef.address).thenReturn(mockAddress)
633631
val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
634632
Map.empty)
635-
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
636633
backend.driverEndpoint.askSync[Boolean](message)
637634
backend.driverEndpoint.send(LaunchedExecutor(id))
638635
}
639636
}
640637

638+
/**
639+
* Worker implementation that does not actually launch any executors, but reports them as
640+
* running so the Master keeps track of them. This requires that `syncExecutors` be used
641+
* to make sure the Master instance and the SparkContext under test agree about what
642+
* executors are running.
643+
*/
644+
private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int)
645+
extends Worker(
646+
rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), Worker.ENDPOINT_NAME,
647+
null, conf, securityManager) {
648+
649+
override def receive: PartialFunction[Any, Unit] = testReceive.orElse(super.receive)
650+
651+
private def testReceive: PartialFunction[Any, Unit] = synchronized {
652+
case LaunchExecutor(_, appId, execId, _, _, _, _) =>
653+
self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None))
654+
}
655+
656+
}
657+
641658
}

core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
189189
val conf = new SparkConf()
190190
.set(EXECUTOR_CORES, 1)
191191
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
192+
.set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
192193
.setMaster(
193194
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
194195
.setAppName("test")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator(
7272

7373
private var lastSnapshot = ExecutorPodsSnapshot(Nil)
7474

75+
// Executors that have been deleted by this allocator but not yet detected as deleted in
76+
// a snapshot from the API server. This is used to deny registration from these executors
77+
// if they happen to come up before the deletion takes effect.
78+
@volatile private var deletedExecutorIds = Set.empty[Long]
79+
7580
def start(applicationId: String): Unit = {
7681
snapshotsStore.addSubscriber(podAllocationDelay) {
7782
onNewSnapshots(applicationId, _)
@@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator(
8590
}
8691
}
8792

93+
def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)
94+
8895
private def onNewSnapshots(
8996
applicationId: String,
9097
snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
@@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator(
141148
}
142149
.map { case (id, _) => id }
143150

151+
// Make a local, non-volatile copy of the reference since it's used multiple times. This
152+
// is the only method that modifies the list, so this is safe.
153+
var _deletedExecutorIds = deletedExecutorIds
154+
144155
if (snapshots.nonEmpty) {
145156
logDebug(s"Pod allocation status: $currentRunningCount running, " +
146157
s"${currentPendingExecutors.size} pending, " +
147158
s"${newlyCreatedExecutors.size} unacknowledged.")
159+
160+
val existingExecs = lastSnapshot.executorPods.keySet
161+
_deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
148162
}
149163

150164
val currentTotalExpectedExecutors = totalExpectedExecutors.get
@@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator(
169183

170184
if (toDelete.nonEmpty) {
171185
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
186+
_deletedExecutorIds = _deletedExecutorIds ++ toDelete
187+
172188
Utils.tryLogNonFatalError {
173189
kubernetesClient
174190
.pods()
@@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator(
209225
}
210226
}
211227

228+
deletedExecutorIds = _deletedExecutorIds
229+
212230
// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
213231
// update method when not needed.
214232
hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
181181
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
182182
}
183183

184+
override protected def isBlacklisted(executorId: String, hostname: String): Boolean = {
185+
podAllocator.isDeleted(executorId)
186+
}
187+
184188
private class KubernetesDriverEndpoint extends DriverEndpoint {
185189

186190
override def onDisconnected(rpcAddress: RpcAddress): Unit = {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
4848
currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
4949
snapshotsBuffer += currentSnapshot
5050
}
51+
52+
def removeDeletedExecutors(): Unit = {
53+
val nonDeleted = currentSnapshot.executorPods.filter {
54+
case (_, PodDeleted(_)) => false
55+
case _ => true
56+
}
57+
currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
58+
snapshotsBuffer += currentSnapshot
59+
}
5160
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
189189
verify(podOperations, times(4)).create(any())
190190
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
191191
verify(podOperations).delete()
192+
assert(podsAllocatorUnderTest.isDeleted("3"))
193+
assert(podsAllocatorUnderTest.isDeleted("4"))
194+
195+
// Update the snapshot to not contain the deleted executors, make sure the
196+
// allocator cleans up internal state.
197+
snapshotsStore.updatePod(deletedExecutor(3))
198+
snapshotsStore.updatePod(deletedExecutor(4))
199+
snapshotsStore.removeDeletedExecutors()
200+
snapshotsStore.notifySubscribers()
201+
assert(!podsAllocatorUnderTest.isDeleted("3"))
202+
assert(!podsAllocatorUnderTest.isDeleted("4"))
192203
}
193204

194205
private def executorPodAnswer(): Answer[SparkPod] =

0 commit comments

Comments
 (0)