Skip to content

Commit 9565c40

Browse files
committed
Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits.
1 parent 9903a96 commit 9565c40

File tree

3 files changed

+51
-44
lines changed

3 files changed

+51
-44
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -285,39 +285,43 @@ private[spark] class CoarseGrainedExecutorBackend(
285285
// is viewed as acceptable to minimize introduction of any new locking structures in critical
286286
// code paths.
287287

288-
val shutdownThread = new Thread() {
289-
var lastTaskRunningTime = System.nanoTime()
290-
val sleep_time = 1000 // 1s
291-
292-
while (true) {
293-
logInfo("Checking to see if we can shutdown.")
294-
if (executor == null || executor.numRunningTasks == 0) {
295-
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
296-
logInfo("No running tasks, checking migrations")
297-
val allBlocksMigrated = env.blockManager.lastMigrationInfo()
298-
// We can only trust allBlocksMigrated boolean value if there were no tasks running
299-
// since the start of computing it.
300-
if (allBlocksMigrated._2 &&
301-
(allBlocksMigrated._1 > lastTaskRunningTime)) {
302-
logInfo("No running tasks, all blocks migrated, stopping.")
303-
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
288+
val shutdownExec = ThreadUtils.newDaemonSingleThreadExecutor("wait for decommissioning")
289+
val shutdownRunnable = new Runnable() {
290+
override def run(): Unit = {
291+
var lastTaskRunningTime = System.nanoTime()
292+
val sleep_time = 1000 // 1s
293+
294+
while (true) {
295+
logInfo("Checking to see if we can shutdown.")
296+
if (executor == null || executor.numRunningTasks == 0) {
297+
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
298+
logInfo("No running tasks, checking migrations")
299+
val allBlocksMigrated = env.blockManager.lastMigrationInfo()
300+
// We can only trust allBlocksMigrated boolean value if there were no tasks running
301+
// since the start of computing it.
302+
if (allBlocksMigrated._2 &&
303+
(allBlocksMigrated._1 > lastTaskRunningTime)) {
304+
logInfo("No running tasks, all blocks migrated, stopping.")
305+
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
306+
} else {
307+
logInfo("All blocks not yet migrated.")
308+
}
304309
} else {
305-
logInfo("All blocks not yet migrated.")
310+
logInfo("No running tasks, no block migration configured, stopping.")
311+
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
306312
}
313+
Thread.sleep(sleep_time)
307314
} else {
308-
logInfo("No running tasks, no block migration configured, stopping.")
309-
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
315+
logInfo("Blocked from shutdown by running task")
316+
// If there is a running task it could store blocks, so make sure we wait for a
317+
// migration loop to complete after the last task is done.
318+
Thread.sleep(sleep_time)
319+
lastTaskRunningTime = System.nanoTime()
310320
}
311-
Thread.sleep(sleep_time)
312-
} else {
313-
logInfo("Blocked from shutdown by running task")
314-
// If there is a running task it could store blocks, so make sure we wait for a
315-
// migration loop to complete after the last task is done.
316-
Thread.sleep(sleep_time)
317-
lastTaskRunningTime = System.nanoTime()
318321
}
319322
}
320323
}
324+
shutdownExec.submit(shutdownRunnable)
321325
logInfo("Will exit when finished decommissioning")
322326
// Return true since we are handling a signal
323327
true

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -461,16 +461,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
461461
override def decommissionExecutors(executorIds: Seq[String],
462462
adjustTargetNumExecutors: Boolean): Seq[String] = {
463463

464-
withLock {
464+
CoarseGrainedSchedulerBackend.this.synchronized {
465465
val executorsToDecommission = executorIds.filter{executorId =>
466-
CoarseGrainedSchedulerBackend.this.synchronized {
467-
// Only bother decommissioning executors which are alive.
468-
if (isExecutorActive(executorId)) {
469-
executorsPendingDecommission += executorId
470-
true
471-
} else {
472-
false
473-
}
466+
// Only bother decommissioning executors which are alive.
467+
if (isExecutorActive(executorId)) {
468+
executorsPendingDecommission += executorId
469+
true
470+
} else {
471+
false
474472
}
475473
}
476474

@@ -513,13 +511,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
513511
}
514512
// Send decommission message to the executor (it could have originated on the executor
515513
// but not necessarily.
516-
executorDataMap.get(executorId) match {
517-
case Some(executorInfo) =>
518-
executorInfo.executorEndpoint.send(DecommissionSelf)
519-
case None =>
520-
// Ignoring the executor since it is not registered.
521-
logWarning(s"Attempted to decommission unknown executor $executorId.")
522-
return false
514+
CoarseGrainedSchedulerBackend.this.synchronized {
515+
executorDataMap.get(executorId) match {
516+
case Some(executorInfo) =>
517+
executorInfo.executorEndpoint.send(DecommissionSelf)
518+
case None =>
519+
// Ignoring the executor since it is not registered.
520+
logWarning(s"Attempted to decommission unknown executor $executorId.")
521+
return false
522+
}
523523
}
524524
logInfo(s"Finished decommissioning executor $executorId.")
525525

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@ class BlockManagerMaster(
4343
logInfo("Removed " + execId + " successfully in removeExecutor")
4444
}
4545

46-
/** Decommission block managers corresponding to given set of executors */
46+
/** Decommission block managers corresponding to given set of executors
47+
* Non-blocking.
48+
*/
4749
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
48-
driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
50+
driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds))
51+
logInfo(s"Decommissioning block managers on ${executorIds}")
4952
}
5053

5154
/** Get Replication Info for all the RDD blocks stored in given blockManagerId */

0 commit comments

Comments
 (0)