Skip to content

Commit 9bb0293

Browse files
committed
Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission
Fix up executor add for resource profile
1 parent 7691d2d commit 9bb0293

File tree

7 files changed

+184
-40
lines changed

7 files changed

+184
-40
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,28 @@ private[spark] trait ExecutorAllocationClient {
8787
* if it supports graceful decommissioning.
8888
*
8989
* @param executorIds identifiers of executors to decommission
90+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
91+
* after these executors have been decommissioned.
9092
* @return the ids of the executors acknowledged by the cluster manager to be removed.
9193
*/
92-
def decommissionExecutors(executorIds: Seq[String]): Seq[String]
94+
def decommissionExecutors(
95+
executorIds: Seq[String],
96+
adjustTargetNumExecutors: Boolean): Seq[String]
97+
98+
99+
/**
100+
* Request that the cluster manager decommission the specified executors.
101+
* Default implementation delegates to kill, scheduler must override
102+
* if it supports graceful decommissioning.
103+
*
104+
* @param executorIds identifiers of executors to decommission
105+
* @return whether the request is acknowledged by the cluster manager.
106+
*/
107+
def decommissionExecutor(executorId: String): Boolean = {
108+
val decommissionedExecutors = decommissionExecutors(Seq(executorId),
109+
adjustTargetNumExecutors = true)
110+
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
111+
}
93112

94113
/**
95114
* Request that the cluster manager kill every executor on the specified host.

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ private[spark] class ExecutorAllocationManager(
529529
// get the running total as we remove or initialize it to the count - pendingRemoval
530530
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
531531
(executorMonitor.executorCountWithResourceProfile(rpId) -
532-
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
532+
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
533+
executorMonitor.pendingDecommissioningPerResourceProfileId(rpId)
534+
))
533535
if (newExecutorTotal - 1 < minNumExecutors) {
534536
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
535537
s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
@@ -556,7 +558,7 @@ private[spark] class ExecutorAllocationManager(
556558
// We don't want to change our target number of executors, because we already did that
557559
// when the task backlog decreased.
558560
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
559-
client.decommissionExecutors(executorIdsToBeRemoved)
561+
client.decommissionExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false)
560562
} else {
561563
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
562564
countFailures = false, force = false)
@@ -572,7 +574,11 @@ private[spark] class ExecutorAllocationManager(
572574

573575
// reset the newExecutorTotal to the existing number of executors
574576
if (testing || executorsRemoved.nonEmpty) {
575-
executorMonitor.executorsKilled(executorsRemoved)
577+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
578+
executorMonitor.executorsDecommissioned(executorsRemoved)
579+
} else {
580+
executorMonitor.executorsKilled(executorsRemoved)
581+
}
576582
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
577583
executorsRemoved
578584
} else {

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -415,13 +415,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
415415
scheduler.workerRemoved(workerId, host, message)
416416
}
417417

418-
/**
419-
* Mark given executors as decommissioned and stop making resource offers for it.
420-
*/
421-
private def decommissionExecutor(executorId: String): Boolean = {
422-
(! decommissionExecutors(List(executorId)).isEmpty)
423-
}
424-
425418
/**
426419
* Stop making resource offers for the given executor. The executor is marked as lost with
427420
* the loss reason still pending.
@@ -457,30 +450,58 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
457450
* Request that the cluster manager decommission the specified executors.
458451
*
459452
* @param executorIds identifiers of executors to decommission
453+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
454+
* after these executors have been decommissioned.
460455
* @return the ids of the executors acknowledged by the cluster manager to be removed.
461456
*/
462-
override def decommissionExecutors(executorIds: Seq[String]): Seq[String] = {
463-
val executorsToDecommission = executorIds.filter{executorId =>
464-
CoarseGrainedSchedulerBackend.this.synchronized {
465-
// Only bother decommissioning executors which are alive.
466-
if (isExecutorActive(executorId)) {
467-
executorsPendingDecommission += executorId
468-
true
469-
} else {
470-
false
457+
override def decommissionExecutors(executorIds: Seq[String],
458+
adjustTargetNumExecutors: Boolean): Seq[String] = {
459+
460+
withLock {
461+
val executorsToDecommission = executorIds.filter{executorId =>
462+
CoarseGrainedSchedulerBackend.this.synchronized {
463+
// Only bother decommissioning executors which are alive.
464+
if (isExecutorActive(executorId)) {
465+
executorsPendingDecommission += executorId
466+
true
467+
} else {
468+
false
469+
}
471470
}
472471
}
473-
}
474472

475-
executorsToDecommission.filter{executorId =>
476-
doDecommission(executorId)
473+
// If we don't want to replace the executors we are decommissioning
474+
if (adjustTargetNumExecutors) {
475+
executorsToDecommission.foreach { exec =>
476+
val rpId = executorDataMap(exec).resourceProfileId
477+
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
478+
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
479+
// Assume that we are killing an executor that was started by default and
480+
// not through the request api
481+
requestedTotalExecutorsPerResourceProfile(rp) = 0
482+
} else {
483+
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
484+
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
485+
}
486+
}
487+
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
488+
}
489+
490+
val decommissioned = executorsToDecommission.filter{executorId =>
491+
doDecommission(executorId)
492+
}
493+
decommissioned
477494
}
478495
}
479496

480497
private def doDecommission(executorId: String): Boolean = {
481498
logInfo(s"Starting decommissioning executor $executorId.")
482499
try {
483500
scheduler.executorDecommission(executorId)
501+
if (driverEndpoint != null) {
502+
logInfo("Propagating executor decommission to driver.")
503+
driverEndpoint.send(DecommissionExecutor(executorId))
504+
}
484505
} catch {
485506
case e: Exception =>
486507
logError(s"Unexpected error during decommissioning ${e.toString}", e)
@@ -609,16 +630,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
609630
driverEndpoint.send(RemoveWorker(workerId, host, message))
610631
}
611632

612-
/**
613-
* Called by subclasses when notified of a decommissioning executor.
614-
*/
615-
private[spark] def decommissionExecutor(executorId: String): Unit = {
616-
if (driverEndpoint != null) {
617-
logInfo("Propagating executor decommission to driver.")
618-
driverEndpoint.send(DecommissionExecutor(executorId))
619-
}
620-
}
621-
622633
def sufficientResourcesRegistered(): Boolean = true
623634

624635
override def isReady(): Boolean = {

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
114114

115115
var newNextTimeout = Long.MaxValue
116116
timedOutExecs = executors.asScala
117-
.filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
117+
.filter { case (_, exec) =>
118+
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.pendingDecommissioning}
118119
.filter { case (_, exec) =>
119120
val deadline = exec.timeoutAt
120121
if (deadline > now) {
@@ -150,6 +151,19 @@ private[spark] class ExecutorMonitor(
150151
nextTimeout.set(Long.MinValue)
151152
}
152153

154+
private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = {
155+
ids.foreach { id =>
156+
val tracker = executors.get(id)
157+
if (tracker != null) {
158+
tracker.pendingDecommissioning = true
159+
}
160+
}
161+
162+
// Recompute timed out executors in the next EAM callback, since this call invalidates
163+
// the current list.
164+
nextTimeout.set(Long.MinValue)
165+
}
166+
153167
def executorCount: Int = executors.size()
154168

155169
def executorCountWithResourceProfile(id: Int): Int = {
@@ -172,6 +186,16 @@ private[spark] class ExecutorMonitor(
172186
executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size
173187
}
174188

189+
def pendingDecommissioningCount: Int = executors.asScala.count { case (_, exec) =>
190+
exec.pendingDecommissioning
191+
}
192+
193+
def pendingDecommissioningPerResourceProfileId(id: Int): Int = {
194+
executors.asScala.filter { case (k, v) =>
195+
v.resourceProfileId == id && v.pendingDecommissioning
196+
}.size
197+
}
198+
175199
override def onJobStart(event: SparkListenerJobStart): Unit = {
176200
if (!shuffleTrackingEnabled) {
177201
return
@@ -328,7 +352,7 @@ private[spark] class ExecutorMonitor(
328352
val removed = executors.remove(event.executorId)
329353
if (removed != null) {
330354
decrementExecResourceProfileCount(removed.resourceProfileId)
331-
if (!removed.pendingRemoval) {
355+
if (!removed.pendingRemoval || !removed.pendingDecommissioning) {
332356
nextTimeout.set(Long.MinValue)
333357
}
334358
}
@@ -431,6 +455,11 @@ private[spark] class ExecutorMonitor(
431455
executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet
432456
}
433457

458+
// Visible for testing
459+
def executorsDecommissioning(): Set[String] = {
460+
executors.asScala.filter { case (_, exec) => exec.pendingDecommissioning }.keys.toSet
461+
}
462+
434463
/**
435464
* This method should be used when updating executor state. It guards against a race condition in
436465
* which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
@@ -483,6 +512,7 @@ private[spark] class ExecutorMonitor(
483512
@volatile var timedOut: Boolean = false
484513

485514
var pendingRemoval: Boolean = false
515+
var pendingDecommissioning: Boolean = false
486516
var hasActiveShuffle: Boolean = false
487517

488518
private var idleStart: Long = -1

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester
2828
import org.apache.spark.executor.ExecutorMetrics
2929
import org.apache.spark.internal.config
3030
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
31+
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
3132
import org.apache.spark.metrics.MetricsSystem
3233
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, ResourceProfileManager, TaskResourceRequests}
3334
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
@@ -1101,6 +1102,68 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
11011102
assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining)
11021103
}
11031104

1105+
test("mock polling loop remove with decommissioning") {
1106+
val clock = new ManualClock(2020L)
1107+
val manager = createManager(createConf(1, 20, 1, true), clock = clock)
1108+
1109+
// Remove idle executors on timeout
1110+
onExecutorAddedDefaultProfile(manager, "executor-1")
1111+
onExecutorAddedDefaultProfile(manager, "executor-2")
1112+
onExecutorAddedDefaultProfile(manager, "executor-3")
1113+
assert(executorsDecommissioning(manager).isEmpty)
1114+
assert(executorsPendingToRemove(manager).isEmpty)
1115+
1116+
// idle threshold not reached yet
1117+
clock.advance(executorIdleTimeout * 1000 / 2)
1118+
schedule(manager)
1119+
assert(manager.executorMonitor.timedOutExecutors().isEmpty)
1120+
assert(executorsPendingToRemove(manager).isEmpty)
1121+
assert(executorsDecommissioning(manager).isEmpty)
1122+
1123+
// idle threshold exceeded
1124+
clock.advance(executorIdleTimeout * 1000)
1125+
assert(manager.executorMonitor.timedOutExecutors().size === 3)
1126+
schedule(manager)
1127+
assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 executor remaining)
1128+
assert(executorsDecommissioning(manager).size === 2) // limit reached (1 executor remaining)
1129+
1130+
// Mark a subset as busy - only idle executors should be removed
1131+
onExecutorAddedDefaultProfile(manager, "executor-4")
1132+
onExecutorAddedDefaultProfile(manager, "executor-5")
1133+
onExecutorAddedDefaultProfile(manager, "executor-6")
1134+
onExecutorAddedDefaultProfile(manager, "executor-7")
1135+
assert(manager.executorMonitor.executorCount === 7)
1136+
assert(executorsPendingToRemove(manager).isEmpty) // no pending to be removed
1137+
assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning
1138+
onExecutorBusy(manager, "executor-4")
1139+
onExecutorBusy(manager, "executor-5")
1140+
onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones)
1141+
1142+
// after scheduling, the previously timed out executor should be removed, since
1143+
// there are new active ones.
1144+
schedule(manager)
1145+
assert(executorsDecommissioning(manager).size === 3)
1146+
1147+
// advance the clock so that idle executors should time out and move to the pending list
1148+
clock.advance(executorIdleTimeout * 1000)
1149+
schedule(manager)
1150+
assert(executorsPendingToRemove(manager).size === 0)
1151+
assert(executorsDecommissioning(manager).size === 4)
1152+
assert(!executorsDecommissioning(manager).contains("executor-4"))
1153+
assert(!executorsDecommissioning(manager).contains("executor-5"))
1154+
assert(!executorsDecommissioning(manager).contains("executor-6"))
1155+
1156+
// Busy executors are now idle and should be removed
1157+
onExecutorIdle(manager, "executor-4")
1158+
onExecutorIdle(manager, "executor-5")
1159+
onExecutorIdle(manager, "executor-6")
1160+
schedule(manager)
1161+
assert(executorsDecommissioning(manager).size === 4)
1162+
clock.advance(executorIdleTimeout * 1000)
1163+
schedule(manager)
1164+
assert(executorsDecommissioning(manager).size === 6) // limit reached (1 executor remaining)
1165+
}
1166+
11041167
test("listeners trigger add executors correctly") {
11051168
val manager = createManager(createConf(1, 20, 1))
11061169
assert(addTime(manager) === NOT_SET)
@@ -1419,7 +1482,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
14191482
private def createConf(
14201483
minExecutors: Int = 1,
14211484
maxExecutors: Int = 5,
1422-
initialExecutors: Int = 1): SparkConf = {
1485+
initialExecutors: Int = 1,
1486+
decommissioningEnabled: Boolean = false): SparkConf = {
14231487
val sparkConf = new SparkConf()
14241488
.set(config.DYN_ALLOCATION_ENABLED, true)
14251489
.set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
@@ -1435,6 +1499,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
14351499
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
14361500
// really long value.
14371501
.set(TEST_SCHEDULE_INTERVAL, 10000L)
1502+
.set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled)
14381503
sparkConf
14391504
}
14401505

@@ -1501,6 +1566,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
15011566
private def executorsPendingToRemove(manager: ExecutorAllocationManager): Set[String] = {
15021567
manager.executorMonitor.executorsPendingToRemove()
15031568
}
1569+
1570+
private def executorsDecommissioning(manager: ExecutorAllocationManager): Set[String] = {
1571+
manager.executorMonitor.executorsDecommissioning()
1572+
}
15041573
}
15051574

15061575
/**

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.util.Random
2323
import org.apache.spark.{ExecutorAllocationClient, SparkConf}
2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.internal.config.Streaming._
26+
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
2627
import org.apache.spark.resource.ResourceProfile
2728
import org.apache.spark.streaming.util.RecurringTimer
2829
import org.apache.spark.util.{Clock, Utils}
@@ -133,7 +134,11 @@ private[streaming] class ExecutorAllocationManager(
133134
logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
134135
if (removableExecIds.nonEmpty) {
135136
val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
136-
client.killExecutor(execIdToRemove)
137+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
138+
client.decommissionExecutor(execIdToRemove)
139+
} else {
140+
client.killExecutor(execIdToRemove)
141+
}
137142
logInfo(s"Requested to kill executor $execIdToRemove")
138143
} else {
139144
logInfo(s"No non-receiver executors to kill")

0 commit comments

Comments
 (0)