@@ -208,8 +208,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
208208 makeOffers(executorId)
209209
210210 case AddNodeToDecommission (hostname, terminationTimeMs, nodeDecommissionReason) =>
211- decommissionTracker.foreach(
212- _.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason))
211+ decommissionTracker.foreach { tracker =>
212+ tracker.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason)
213+ val nodeDecommissionState = tracker.getDecommissionedNodeState(hostname)
214+ if (nodeDecommissionState.isDefined &&
215+ ! nodeDecommissionState.contains(NodeDecommissionState .TERMINATED )) {
216+ val exe = scheduler.getExecutorsAliveOnHost(hostname)
217+ exe match {
218+ case Some (set) =>
219+ for (e <- set) {
220+ moveCachedRddFromDecommissionExecutor(e)
221+ }
222+ case None => logInfo(" There is active no executor available" +
223+ " in decommission node for moving the cached RDD" )
224+ }
225+ }
226+ }
213227
214228 case RemoveNodeToDecommission (hostname) =>
215229 decommissionTracker.foreach(_.removeNodeToDecommission(hostname))
@@ -463,25 +477,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
463477 logError(s " Unexpected error during decommissioning ${e.toString}" , e)
464478 }
465479 logInfo(s " Finished decommissioning executor $executorId. " )
466-
467- if (conf.get(STORAGE_DECOMMISSION_ENABLED )) {
468- try {
469- logInfo(" Starting decommissioning block manager corresponding to " +
470- s " executor $executorId. " )
471- scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq (executorId))
472- } catch {
473- case e : Exception =>
474- logError(" Unexpected error during block manager " +
475- s " decommissioning for executor $executorId: ${e.toString}" , e)
476- }
477- logInfo(s " Acknowledged decommissioning block manager corresponding to $executorId. " )
478- }
480+ moveCachedRddFromDecommissionExecutor(executorId)
479481 } else {
480482 logInfo(s " Skipping decommissioning of executor $executorId. " )
481483 }
482484 shouldDisable
483485 }
484486
487+ /**
488+ * Move cached Rdd from Decommission executors
489+ */
490+ private def moveCachedRddFromDecommissionExecutor (executorId : String ): Unit = {
491+ if (conf.get(STORAGE_DECOMMISSION_ENABLED )) {
492+ try {
493+ logInfo(" Starting decommissioning block manager corresponding to " +
494+ s " executor $executorId. " )
495+ scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq (executorId))
496+ } catch {
497+ case e : Exception =>
498+ logError(" Unexpected error during block manager " +
499+ s " decommissioning for executor $executorId: ${e.toString}" , e)
500+ }
501+ logInfo(s " Acknowledged decommissioning block manager corresponding to $executorId. " )
502+ }
503+ }
504+
485505 /**
486506 * Stop making resource offers for the given executor. The executor is marked as lost with
487507 * the loss reason still pending.
0 commit comments