Skip to content

Commit 5e36ed3

Browse files
committed
Make Spark's dynamic allocation use decommissioning
1 parent 8897d74 commit 5e36ed3

File tree

5 files changed

+126
-56
lines changed

5 files changed

+126
-56
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ private[spark] trait ExecutorAllocationClient {
8181
countFailures: Boolean,
8282
force: Boolean = false): Seq[String]
8383

84+
/**
85+
* Request that the cluster manager decommission the specified executors.
86+
* Default implementation delegates to kill, scheduler must override
87+
* if it supports graceful decommissioning.
88+
*
89+
* @param executorIds identifiers of executors to decommission
90+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
91+
*/
92+
def decommissionExecutors(executorIds: Seq[String]): Seq[String]
93+
8494
/**
8595
* Request that the cluster manager kill every executor on the specified host.
8696
*

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
31+
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
3132
import org.apache.spark.metrics.source.Source
3233
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3334
import org.apache.spark.resource.ResourceProfileManager
@@ -204,7 +205,11 @@ private[spark] class ExecutorAllocationManager(
204205
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
205206
}
206207
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
207-
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
208+
// If dynamic allocation shuffle tracking or worker decommissioning along with
209+
// storage shuffle decommissioning is enabled we have *experimental* support for
210+
// decommissioning without a shuffle service.
211+
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
212+
(conf.get(WORKER_DECOMMISSION_ENABLED) && conf.get(STORAGE_SHUFFLE_DECOMMISSION_ENABLED))) {
208213
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
209214
} else if (!testing) {
210215
throw new SparkException("Dynamic allocation of executors requires the external " +
@@ -550,8 +555,12 @@ private[spark] class ExecutorAllocationManager(
550555
} else {
551556
// We don't want to change our target number of executors, because we already did that
552557
// when the task backlog decreased.
553-
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
554-
countFailures = false, force = false)
558+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
559+
client.decommissionExecutors(executorIdsToBeRemoved)
560+
} else {
561+
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
562+
countFailures = false, force = false)
563+
}
555564
}
556565

557566
// [SPARK-21834] killExecutors api reduces the target number of executors.

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 64 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -416,54 +416,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
416416
}
417417

418418
/**
419-
* Mark a given executor as decommissioned and stop making resource offers for it.
419+
* Mark given executors as decommissioned and stop making resource offers for it.
420420
*/
421421
private def decommissionExecutor(executorId: String): Boolean = {
422-
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
423-
// Only bother decommissioning executors which are alive.
424-
if (isExecutorActive(executorId)) {
425-
executorsPendingDecommission += executorId
426-
true
427-
} else {
428-
false
429-
}
430-
}
431-
432-
if (shouldDisable) {
433-
logInfo(s"Starting decommissioning executor $executorId.")
434-
try {
435-
scheduler.executorDecommission(executorId)
436-
} catch {
437-
case e: Exception =>
438-
logError(s"Unexpected error during decommissioning ${e.toString}", e)
439-
}
440-
// Send decommission message to the executor (it could have originated on the executor
441-
// but not necessarily.
442-
executorDataMap.get(executorId) match {
443-
case Some(executorInfo) =>
444-
executorInfo.executorEndpoint.send(DecommissionSelf)
445-
case None =>
446-
// Ignoring the executor since it is not registered.
447-
logWarning(s"Attempted to decommission unknown executor $executorId.")
448-
}
449-
logInfo(s"Finished decommissioning executor $executorId.")
450-
451-
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
452-
try {
453-
logInfo("Starting decommissioning block manager corresponding to " +
454-
s"executor $executorId.")
455-
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
456-
} catch {
457-
case e: Exception =>
458-
logError("Unexpected error during block manager " +
459-
s"decommissioning for executor $executorId: ${e.toString}", e)
460-
}
461-
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
462-
}
463-
} else {
464-
logInfo(s"Skipping decommissioning of executor $executorId.")
465-
}
466-
shouldDisable
422+
(! decommissionExecutors(List(executorId)).isEmpty)
467423
}
468424

469425
/**
@@ -497,6 +453,68 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
497453

498454
protected def minRegisteredRatio: Double = _minRegisteredRatio
499455

456+
/**
457+
* Request that the cluster manager decommission the specified executors.
458+
*
459+
* @param executorIds identifiers of executors to decommission
460+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
461+
*/
462+
override def decommissionExecutors(executorIds: Seq[String]): Seq[String] = {
463+
val executorsToDecommission = executorIds.filter{executorId =>
464+
CoarseGrainedSchedulerBackend.this.synchronized {
465+
// Only bother decommissioning executors which are alive.
466+
if (isExecutorActive(executorId)) {
467+
executorsPendingDecommission += executorId
468+
true
469+
} else {
470+
false
471+
}
472+
}
473+
}
474+
475+
executorsToDecommission.filter{executorId =>
476+
doDecommission(executorId)
477+
}
478+
}
479+
480+
private def doDecommission(executorId: String): Boolean = {
481+
logInfo(s"Starting decommissioning executor $executorId.")
482+
try {
483+
scheduler.executorDecommission(executorId)
484+
} catch {
485+
case e: Exception =>
486+
logError(s"Unexpected error during decommissioning ${e.toString}", e)
487+
return false
488+
}
489+
// Send decommission message to the executor (it could have originated on the executor
490+
// but not necessarily.
491+
executorDataMap.get(executorId) match {
492+
case Some(executorInfo) =>
493+
executorInfo.executorEndpoint.send(DecommissionSelf)
494+
case None =>
495+
// Ignoring the executor since it is not registered.
496+
logWarning(s"Attempted to decommission unknown executor $executorId.")
497+
return false
498+
}
499+
logInfo(s"Finished decommissioning executor $executorId.")
500+
501+
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
502+
try {
503+
logInfo("Starting decommissioning block manager corresponding to " +
504+
s"executor $executorId.")
505+
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
506+
} catch {
507+
case e: Exception =>
508+
logError("Unexpected error during block manager " +
509+
s"decommissioning for executor $executorId: ${e.toString}", e)
510+
return false
511+
}
512+
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
513+
}
514+
true
515+
}
516+
517+
500518
override def start(): Unit = {
501519
if (UserGroupInformation.isSecurityEnabled()) {
502520
delegationTokenManager = createTokenManager()

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config._
2929
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3030
import org.apache.spark.scheduler._
31-
import org.apache.spark.storage.RDDBlockId
31+
import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId}
3232
import org.apache.spark.util.Clock
3333

3434
/**
@@ -135,6 +135,7 @@ private[spark] class ExecutorMonitor(
135135

136136
/**
137137
* Mark the given executors as pending to be removed. Should only be called in the EAM thread.
138+
* This covers both kills and decommissions.
138139
*/
139140
def executorsKilled(ids: Seq[String]): Unit = {
140141
ids.foreach { id =>
@@ -298,6 +299,7 @@ private[spark] class ExecutorMonitor(
298299
//
299300
// This means that an executor may be marked as having shuffle data, and thus prevented
300301
// from being removed, even though the data may not be used.
302+
// TODO: Only track used files (SPARK-31974)
301303
if (shuffleTrackingEnabled && event.reason == Success) {
302304
stageToShuffleID.get(event.stageId).foreach { shuffleId =>
303305
exec.addShuffle(shuffleId)
@@ -333,11 +335,26 @@ private[spark] class ExecutorMonitor(
333335
}
334336

335337
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
336-
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
337-
return
338-
}
339338
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
340339
UNKNOWN_RESOURCE_PROFILE_ID)
340+
341+
// Check if it is a shuffle file, or RDD to pick the correct codepath for update
342+
if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) {
343+
/**
344+
* The executor monitor keeps track of locations of cache and shuffle blocks and this can be
345+
* used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks
346+
* around now this wires it up so that it keeps track of it. We only do this for data blocks
347+
* as index and other blocks blocks do not necessarily mean the entire block has been
348+
* committed.
349+
*/
350+
event.blockUpdatedInfo.blockId match {
351+
case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
352+
case _ => // For now we only update on data blocks
353+
}
354+
return
355+
} else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
356+
return
357+
}
341358
val storageLevel = event.blockUpdatedInfo.storageLevel
342359
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
343360

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
4444
}
4545

4646
test("basic functionality") {
47+
basicTest(decommissioning = false)
48+
}
49+
50+
test("basic decommissioning") {
51+
basicTest(decommissioning = true)
52+
}
53+
54+
def basicTest(decommissioning: Boolean): Unit = {
4755
// Test that adding batch processing time info to allocation manager
4856
// causes executors to be requested and killed accordingly
4957

@@ -83,12 +91,20 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
8391
Map.empty)}
8492
}
8593

86-
/** Verify that a particular executor was killed */
94+
/** Verify that particular executors was killed */
8795
def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
8896
if (expectedKilledExec.nonEmpty) {
89-
verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get))
97+
if (decommissioning) {
98+
verify(allocationClient, times(1)).decommissionExecutors(meq(expectedKilledExec.toSeq))
99+
} else {
100+
verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get))
101+
}
90102
} else {
91-
verify(allocationClient, never).killExecutor(null)
103+
if (decommissioning) {
104+
verify(allocationClient, never).decommissionExecutors(null)
105+
} else {
106+
verify(allocationClient, never).killExecutor(null)
107+
}
92108
}
93109
}
94110

0 commit comments

Comments
 (0)