Skip to content

Commit 375d348

Browse files
committed
[SPARK-31197][CORE] Shutdown executor once we are done decommissioning
### What changes were proposed in this pull request? Exit the executor when it has been asked to decommission and there is nothing left for it to do. This is a rebase of #28817 ### Why are the changes needed? If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished. Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible. ### Does this PR introduce _any_ user-facing change? The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet. ### How was this patch tested? I changed the unit test to not send the executor exit message and still wait on the executor exited message. Closes #29211 from holdenk/SPARK-31197-exit-execs-redone. Authored-by: Holden Karau <hkarau@apple.com> Signed-off-by: Holden Karau <hkarau@apple.com>
1 parent c1d17df commit 375d348

File tree

10 files changed

+310
-44
lines changed

10 files changed

+310
-44
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,6 @@ private[deploy] object DeployMessages {
165165

166166
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
167167

168-
case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.
169-
170168
// AppClient to Master
171169

172170
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ private[deploy] class Worker(
668668
finishedApps += id
669669
maybeCleanupApplication(id)
670670

671-
case DecommissionSelf =>
671+
case WorkerDecommission(_, _) =>
672672
decommissionSelf()
673673
}
674674

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(
6464

6565
private[this] val stopping = new AtomicBoolean(false)
6666
var executor: Executor = null
67-
@volatile private var decommissioned = false
6867
@volatile var driver: Option[RpcEndpointRef] = None
6968

7069
// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
@@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend(
8079
*/
8180
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
8281

82+
@volatile private var decommissioned = false
83+
8384
override def onStart(): Unit = {
8485
logInfo("Registering PWR handler.")
8586
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
@@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend(
214215
case UpdateDelegationTokens(tokenBytes) =>
215216
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
216217
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
218+
219+
case DecommissionSelf =>
220+
logInfo("Received decommission self")
221+
decommissionSelf()
217222
}
218223

219224
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend(
277282
if (executor != null) {
278283
executor.decommission()
279284
}
280-
logInfo("Done decommissioning self.")
285+
// Shutdown the executor once all tasks are gone & any configured migrations completed.
286+
// Detecting migrations completion doesn't need to be perfect and we want to minimize the
287+
// overhead for executors that are not in decommissioning state as overall that will be
288+
// more of the executors. For example, this will not catch a block which is already in
289+
// the process of being put from a remote executor before migration starts. This trade-off
290+
// is viewed as acceptable to minimize introduction of any new locking structures in critical
291+
// code paths.
292+
293+
val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
294+
override def run(): Unit = {
295+
var lastTaskRunningTime = System.nanoTime()
296+
val sleep_time = 1000 // 1s
297+
298+
while (true) {
299+
logInfo("Checking to see if we can shutdown.")
300+
Thread.sleep(sleep_time)
301+
if (executor == null || executor.numRunningTasks == 0) {
302+
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
303+
logInfo("No running tasks, checking migrations")
304+
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
305+
// We can only trust allBlocksMigrated boolean value if there were no tasks running
306+
// since the start of computing it.
307+
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
308+
logInfo("No running tasks, all blocks migrated, stopping.")
309+
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
310+
} else {
311+
logInfo("All blocks not yet migrated.")
312+
}
313+
} else {
314+
logInfo("No running tasks, no block migration configured, stopping.")
315+
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
316+
}
317+
} else {
318+
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
319+
// If there is a running task it could store blocks, so make sure we wait for a
320+
// migration loop to complete after the last task is done.
321+
// Note: this is only advanced if there is a running task, if there
322+
// is no running task but the blocks are not done migrating this does not
323+
// move forward.
324+
lastTaskRunningTime = System.nanoTime()
325+
}
326+
}
327+
}
328+
}
329+
shutdownThread.setDaemon(true)
330+
shutdownThread.start()
331+
332+
logInfo("Will exit when finished decommissioning")
281333
// Return true since we are handling a signal
282334
true
283335
} catch {
284336
case e: Exception =>
285-
logError(s"Error ${e} during attempt to decommission self")
337+
logError("Unexpected error while decommissioning self", e)
286338
false
287339
}
288340
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
136136

137137
// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
138138
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
139+
140+
// Used to ask an executor to decommission itself. (Can be an internal message)
141+
case object DecommissionSelf extends CoarseGrainedClusterMessage
139142
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
442442
case e: Exception =>
443443
logError(s"Unexpected error during decommissioning ${e.toString}", e)
444444
}
445+
// Send decommission message to the executor, this may be a duplicate since the executor
446+
// could have been the one to notify us. But it's also possible the notification came from
447+
// elsewhere and the executor does not yet know.
448+
executorDataMap.get(executorId) match {
449+
case Some(executorInfo) =>
450+
executorInfo.executorEndpoint.send(DecommissionSelf)
451+
case None =>
452+
// Ignoring the executor since it is not registered.
453+
logWarning(s"Attempted to decommission unknown executor $executorId.")
454+
}
445455
logInfo(s"Finished decommissioning executor $executorId.")
446456

447457
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
18221822
}
18231823
}
18241824

1825+
/*
1826+
* Returns the last migration time and a boolean denoting if all the blocks have been migrated.
1827+
* If there are any tasks running since that time the boolean may be incorrect.
1828+
*/
1829+
private[spark] def lastMigrationInfo(): (Long, Boolean) = {
1830+
decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false))
1831+
}
1832+
18251833
private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] =
18261834
master.getReplicateInfoForRDDBlocks(blockManagerId)
18271835

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.storage
1919

2020
import java.util.concurrent.ExecutorService
21+
import java.util.concurrent.atomic.AtomicInteger
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
4142
private val maxReplicationFailuresForDecommission =
4243
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
4344

45+
// Used for tracking if our migrations are complete. Readable for testing
46+
@volatile private[storage] var lastRDDMigrationTime: Long = 0
47+
@volatile private[storage] var lastShuffleMigrationTime: Long = 0
48+
@volatile private[storage] var rddBlocksLeft: Boolean = true
49+
@volatile private[storage] var shuffleBlocksLeft: Boolean = true
50+
4451
/**
4552
* This runnable consumes any shuffle blocks in the queue for migration. This part of a
4653
* producer/consumer where the main migration loop updates the queue of blocks to be migrated
@@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner(
9198
null)// class tag, we don't need for shuffle
9299
logDebug(s"Migrated sub block ${blockId}")
93100
}
94-
logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
101+
logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
95102
} else {
96103
logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}")
97104
}
105+
numMigratedShuffles.incrementAndGet()
98106
}
99107
}
100108
// This catch is intentionally outside of the while running block.
@@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner(
115123
// Shuffles which are either in queue for migrations or migrated
116124
private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
117125

126+
// Shuffles which have migrated. This used to know when we are "done", being done can change
127+
// if a new shuffle file is created by a running task.
128+
private val numMigratedShuffles = new AtomicInteger(0)
129+
118130
// Shuffles which are queued for migration & number of retries so far.
131+
// Visible in storage for testing.
119132
private[storage] val shufflesToMigrate =
120133
new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
121134

122135
// Set if we encounter an error attempting to migrate and stop.
123136
@volatile private var stopped = false
137+
@volatile private var stoppedRDD =
138+
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
139+
@volatile private var stoppedShuffle =
140+
!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
124141

125142
private val migrationPeers =
126143
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
@@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner(
133150

134151
override def run(): Unit = {
135152
assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
136-
while (!stopped && !Thread.interrupted()) {
153+
while (!stopped && !stoppedRDD && !Thread.interrupted()) {
137154
logInfo("Iterating on migrating from the block manager.")
155+
// Validate we have peers to migrate to.
156+
val peers = bm.getPeers(false)
157+
// If we have no peers give up.
158+
if (peers.isEmpty) {
159+
stopped = true
160+
stoppedRDD = true
161+
}
138162
try {
163+
val startTime = System.nanoTime()
139164
logDebug("Attempting to replicate all cached RDD blocks")
140-
decommissionRddCacheBlocks()
165+
rddBlocksLeft = decommissionRddCacheBlocks()
166+
lastRDDMigrationTime = startTime
141167
logInfo("Attempt to replicate all cached blocks done")
142168
logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.")
143169
Thread.sleep(sleepInterval)
144170
} catch {
145171
case e: InterruptedException =>
146-
logInfo("Interrupted during migration, will not refresh migrations.")
147-
stopped = true
172+
logInfo("Interrupted during RDD migration, stopping")
173+
stoppedRDD = true
148174
case NonFatal(e) =>
149-
logError("Error occurred while trying to replicate for block manager decommissioning.",
175+
logError("Error occurred replicating RDD for block manager decommissioning.",
150176
e)
151-
stopped = true
177+
stoppedRDD = true
152178
}
153179
}
154180
}
@@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner(
162188

163189
override def run() {
164190
assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
165-
while (!stopped && !Thread.interrupted()) {
191+
while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
166192
try {
167193
logDebug("Attempting to replicate all shuffle blocks")
168-
refreshOffloadingShuffleBlocks()
194+
val startTime = System.nanoTime()
195+
shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
196+
lastShuffleMigrationTime = startTime
169197
logInfo("Done starting workers to migrate shuffle blocks")
170198
Thread.sleep(sleepInterval)
171199
} catch {
172200
case e: InterruptedException =>
173201
logInfo("Interrupted during migration, will not refresh migrations.")
174-
stopped = true
202+
stoppedShuffle = true
175203
case NonFatal(e) =>
176204
logError("Error occurred while trying to replicate for block manager decommissioning.",
177205
e)
178-
stopped = true
206+
stoppedShuffle = true
179207
}
180208
}
181209
}
@@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner(
191219
* but rather shadows them.
192220
* Requires an Indexed based shuffle resolver.
193221
* Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage.
222+
* Returns true if we are not done migrating shuffle blocks.
194223
*/
195-
private[storage] def refreshOffloadingShuffleBlocks(): Unit = {
224+
private[storage] def refreshOffloadingShuffleBlocks(): Boolean = {
196225
// Update the queue of shuffles to be migrated
197226
logInfo("Offloading shuffle blocks")
198227
val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
@@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner(
215244
deadPeers.foreach { peer =>
216245
migrationPeers.get(peer).foreach(_.running = false)
217246
}
247+
// If we don't have anyone to migrate to give up
248+
if (migrationPeers.values.find(_.running == true).isEmpty) {
249+
stoppedShuffle = true
250+
}
251+
// If we found any new shuffles to migrate or otherwise have not migrated everything.
252+
newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get()
218253
}
219254

220255
/**
@@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner(
231266
/**
232267
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
233268
* Visible for testing
269+
* Returns true if we have not migrated all of our RDD blocks.
234270
*/
235-
private[storage] def decommissionRddCacheBlocks(): Unit = {
271+
private[storage] def decommissionRddCacheBlocks(): Boolean = {
236272
val replicateBlocksInfo = bm.getMigratableRDDBlocks()
273+
// Refresh peers and validate we have somewhere to move blocks.
237274

238275
if (replicateBlocksInfo.nonEmpty) {
239276
logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
240277
"for block manager decommissioning")
241278
} else {
242279
logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate")
243-
return
280+
return false
244281
}
245282

246283
// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
@@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner(
252289
if (blocksFailedReplication.nonEmpty) {
253290
logWarning("Blocks failed replication in cache decommissioning " +
254291
s"process: ${blocksFailedReplication.mkString(",")}")
292+
return true
255293
}
294+
return false
256295
}
257296

258297
private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = {
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
327366
}
328367
logInfo("Stopped storage decommissioner")
329368
}
369+
370+
/*
371+
* Returns the last migration time and a boolean for if all blocks have been migrated.
372+
* The last migration time is calculated to be the minimum of the last migration of any
373+
* running migration (and if there are now current running migrations it is set to current).
374+
* This provides a timeStamp which, if there have been no tasks running since that time
375+
* we can know that all potential blocks that can be have been migrated off.
376+
*/
377+
private[storage] def lastMigrationInfo(): (Long, Boolean) = {
378+
if (stopped || (stoppedRDD && stoppedShuffle)) {
379+
// Since we don't have anything left to migrate ever (since we don't restart once
380+
// stopped), return that we're done with a validity timestamp that doesn't expire.
381+
(Long.MaxValue, true)
382+
} else {
383+
// Chose the min of the active times. See the function description for more information.
384+
val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) {
385+
Math.min(lastRDDMigrationTime, lastShuffleMigrationTime)
386+
} else if (!stoppedShuffle) {
387+
lastShuffleMigrationTime
388+
} else {
389+
lastRDDMigrationTime
390+
}
391+
392+
// Technically we could have blocks left if we encountered an error, but those blocks will
393+
// never be migrated, so we don't care about them.
394+
val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD)
395+
(lastMigrationTime, blocksMigrated)
396+
}
397+
}
330398
}

core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
4747
assert(sleepyRdd.count() === 10)
4848
}
4949

50-
test("verify a task with all workers decommissioned succeeds") {
50+
test("verify a running task with all workers decommissioned succeeds") {
51+
// Wait for the executors to come up
52+
TestUtils.waitUntilExecutorsUp(sc = sc,
53+
numExecutors = 2,
54+
timeout = 30000) // 30s
55+
5156
val input = sc.parallelize(1 to 10)
5257
// Listen for the job
5358
val sem = new Semaphore(0)
@@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
5661
sem.release()
5762
}
5863
})
59-
TestUtils.waitUntilExecutorsUp(sc = sc,
60-
numExecutors = 2,
61-
timeout = 30000) // 30s
64+
6265
val sleepyRdd = input.mapPartitions{ x =>
6366
Thread.sleep(5000) // 5s
6467
x
@@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
7679
execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false)))
7780
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
7881
assert(asyncCountResult === 10)
79-
// Try and launch task after decommissioning, this should fail
80-
val postDecommissioned = input.map(x => x)
81-
val postDecomAsyncCount = postDecommissioned.countAsync()
82-
val thrown = intercept[java.util.concurrent.TimeoutException]{
83-
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
84-
}
85-
assert(postDecomAsyncCount.isCompleted === false,
86-
"After exec decommission new task could not launch")
8782
}
8883
}

0 commit comments

Comments
 (0)