1818package org .apache .spark .storage
1919
2020import java .util .concurrent .ExecutorService
21+ import java .util .concurrent .atomic .AtomicInteger
2122
2223import scala .collection .JavaConverters ._
2324import scala .collection .mutable
@@ -41,6 +42,10 @@ private[storage] class BlockManagerDecommissioner(
4142 private val maxReplicationFailuresForDecommission =
4243 conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK )
4344
45+ // This is only valid if there are no tasks running since lastMigrationTime
46+ @ volatile private [storage] var lastMigrationTime : Long = 0
47+ @ volatile private [storage] var allBlocksMigrated = false
48+
4449 /**
4550 * This runnable consumes any shuffle blocks in the queue for migration. This part of a
4651 * producer/consumer where the main migration loop updates the queue of blocks to be migrated
@@ -90,7 +95,8 @@ private[storage] class BlockManagerDecommissioner(
9095 null )// class tag, we don't need for shuffle
9196 logDebug(s " Migrated sub block ${blockId}" )
9297 }
93- logInfo(s " Migrated ${shuffleBlockInfo} to ${peer}" )
98+ logInfo(s " Migrated ${shuffleBlockInfo}" )
99+ numMigratedShuffles.incrementAndGet()
94100 }
95101 }
96102 // This catch is intentionally outside of the while running block.
@@ -111,6 +117,12 @@ private[storage] class BlockManagerDecommissioner(
111117 // Shuffles which are either in queue for migrations or migrated
112118 private val migratingShuffles = mutable.HashSet [ShuffleBlockInfo ]()
113119
120+ // Shuffles which have migrated. This used to know when we are "done", being done can change
121+ // if a new shuffle file is created by a running task.
122+ private val numMigratedShuffles = new AtomicInteger (0 )
123+
124+
125+
114126 // Shuffles which are queued for migration
115127 private [storage] val shufflesToMigrate =
116128 new java.util.concurrent.ConcurrentLinkedQueue [ShuffleBlockInfo ]()
@@ -123,6 +135,7 @@ private[storage] class BlockManagerDecommissioner(
123135 private lazy val blockMigrationExecutor =
124136 ThreadUtils .newDaemonSingleThreadExecutor(" block-manager-decommission" )
125137
138+
126139 private val blockMigrationRunnable = new Runnable {
127140 val sleepInterval = conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL )
128141
@@ -133,21 +146,47 @@ private[storage] class BlockManagerDecommissioner(
133146 s " ${config.STORAGE_RDD_DECOMMISSION_ENABLED .key}\n " +
134147 s " ${config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED .key}" )
135148 stopped = true
149+ allBlocksMigrated = true
136150 }
151+ var blocksLeft = false
137152 while (! stopped && ! Thread .interrupted()) {
138153 logInfo(" Iterating on migrating from the block manager." )
139154 try {
155+ val startMigrationTime = System .nanoTime()
140156 // If enabled we migrate shuffle blocks first as they are more expensive.
141157 if (conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED )) {
142158 logDebug(" Attempting to replicate all shuffle blocks" )
143- offloadShuffleBlocks()
144- logInfo(" Done starting workers to migrate shuffle blocks" )
159+ blocksLeft = offloadShuffleBlocks()
160+ logInfo(s " Done starting workers to migrate shuffle blocks ${blocksLeft} " )
145161 }
146162 if (conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED )) {
147163 logDebug(" Attempting to replicate all cached RDD blocks" )
148- decommissionRddCacheBlocks()
164+ val cacheBlocksLeft = decommissionRddCacheBlocks()
165+ blocksLeft = blocksLeft || cacheBlocksLeft
149166 logInfo(" Attempt to replicate all cached blocks done" )
150167 }
168+
169+ // Only update the migration info if it block have not changed under us.
170+ if (lastMigrationTime < startMigrationTime) {
171+ lastMigrationTime = startMigrationTime
172+ allBlocksMigrated = ! blocksLeft
173+ logInfo(s " Updating migration info to ${startMigrationTime}, ${allBlocksMigrated}" )
174+ } else {
175+ logInfo(s " Blocks changed under us (last migration time is ${lastMigrationTime}) " )
176+ allBlocksMigrated = false
177+ }
178+
179+ // Stop if we don't have any migrations configured.
180+ if (! conf.get(config.STORAGE_RDD_DECOMMISSION_ENABLED ) &&
181+ ! conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED )) {
182+ logWarning(" Decommissioning, but no task configured set one or both:\n " +
183+ " spark.storage.decommission.shuffle_blocks\n " +
184+ " spark.storage.decommission.rdd_blocks" )
185+ lastMigrationTime = System .nanoTime()
186+ allBlocksMigrated = true
187+ stopped = true
188+ }
189+
151190 logInfo(s " Waiting for ${sleepInterval} before refreshing migrations. " )
152191 Thread .sleep(sleepInterval)
153192 } catch {
@@ -172,8 +211,9 @@ private[storage] class BlockManagerDecommissioner(
172211 * but rather shadows them.
173212 * Requires an Indexed based shuffle resolver.
174213 * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage.
214+ * Returns true if we are not done migrating shuffle blocks.
175215 */
176- private [storage] def offloadShuffleBlocks (): Unit = {
216+ private [storage] def offloadShuffleBlocks (): Boolean = {
177217 // Update the queue of shuffles to be migrated
178218 logInfo(" Offloading shuffle blocks" )
179219 val localShuffles = bm.migratableResolver.getStoredShuffles()
@@ -197,6 +237,8 @@ private[storage] class BlockManagerDecommissioner(
197237 deadPeers.foreach { peer =>
198238 migrationPeers.get(peer).foreach(_.running = false )
199239 }
240+ // If we found any new shuffles to migrate or otherwise have not migrated everything.
241+ newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get()
200242 }
201243
202244 /**
@@ -213,16 +255,17 @@ private[storage] class BlockManagerDecommissioner(
213255 /**
214256 * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
215257 * Visible for testing
258+ * Returns true if we have not migrated all of our RDD blocks.
216259 */
217- private [storage] def decommissionRddCacheBlocks (): Unit = {
260+ private [storage] def decommissionRddCacheBlocks (): Boolean = {
218261 val replicateBlocksInfo = bm.getMigratableRDDBlocks()
219262
220263 if (replicateBlocksInfo.nonEmpty) {
221264 logInfo(s " Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
222265 " for block manager decommissioning" )
223266 } else {
224267 logWarning(s " Asked to decommission RDD cache blocks, but no blocks to migrate " )
225- return
268+ return false
226269 }
227270
228271 // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
@@ -234,7 +277,9 @@ private[storage] class BlockManagerDecommissioner(
234277 if (blocksFailedReplication.nonEmpty) {
235278 logWarning(" Blocks failed replication in cache decommissioning " +
236279 s " process: ${blocksFailedReplication.mkString(" ," )}" )
280+ return true
237281 }
282+ return false
238283 }
239284
240285 private def migrateBlock (blockToReplicate : ReplicateBlock ): Boolean = {
@@ -277,4 +322,16 @@ private[storage] class BlockManagerDecommissioner(
277322 logInfo(" Stopping block migration thread" )
278323 blockMigrationExecutor.shutdownNow()
279324 }
325+
326+ /*
327+ * Returns the last migration time and a boolean for if all blocks have been migrated.
328+ * If there are any tasks running since that time the boolean may be incorrect.
329+ */
330+ private [storage] def lastMigrationInfo (): (Long , Boolean ) = {
331+ if (stopped) {
332+ (System .nanoTime(), true )
333+ } else {
334+ (lastMigrationTime, allBlocksMigrated)
335+ }
336+ }
280337}
0 commit comments