-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-31197][CORE] Shutdown executor once we are done decommissioning #29211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
54c6298
8c10f9a
27a5f49
5d4b52a
cdc3336
a17c442
2bbd5d1
0160299
284e0cd
7a157ff
640b0ef
7e7821b
dcb01fc
060a443
20d3cc0
22eb4c6
e139b9f
5c0a544
679fbe7
e81c3fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( | |
|
||
private[this] val stopping = new AtomicBoolean(false) | ||
var executor: Executor = null | ||
@volatile private var decommissioned = false | ||
@volatile var driver: Option[RpcEndpointRef] = None | ||
|
||
// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need | ||
|
@@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend( | |
*/ | ||
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] | ||
|
||
@volatile private var decommissioned = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: In the spirit of minimizing code diff, is there is a strong reason why this decommissioned flag was moved down from line 67 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't consider it related to the variables it was grouped with strongly enough so I moved it to stand alone. |
||
|
||
override def onStart(): Unit = { | ||
logInfo("Registering PWR handler.") | ||
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + | ||
|
@@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend( | |
case UpdateDelegationTokens(tokenBytes) => | ||
logInfo(s"Received tokens of ${tokenBytes.length} bytes") | ||
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) | ||
|
||
case DecommissionSelf => | ||
logInfo("Received decommission self") | ||
decommissionSelf() | ||
} | ||
|
||
override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||
|
@@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend( | |
if (executor != null) { | ||
executor.decommission() | ||
} | ||
logInfo("Done decommissioning self.") | ||
// Shutdown the executor once all tasks are gone & any configured migrations completed. | ||
// Detecting migrations completion doesn't need to be perfect and we want to minimize the | ||
// overhead for executors that are not in decommissioning state as overall that will be | ||
// more of the executors. For example, this will not catch a block which is already in | ||
// the process of being put from a remote executor before migration starts. This trade-off | ||
// is viewed as acceptable to minimize introduction of any new locking structures in critical | ||
// code paths. | ||
|
||
val shutdownThread = new Thread("wait-for-blocks-to-migrate") { | ||
override def run(): Unit = { | ||
var lastTaskRunningTime = System.nanoTime() | ||
val sleep_time = 1000 // 1s | ||
|
||
while (true) { | ||
logInfo("Checking to see if we can shutdown.") | ||
Thread.sleep(sleep_time) | ||
if (executor == null || executor.numRunningTasks == 0) { | ||
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
logInfo("No running tasks, checking migrations") | ||
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() | ||
// We can only trust allBlocksMigrated boolean value if there were no tasks running | ||
// since the start of computing it. | ||
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { | ||
logInfo("No running tasks, all blocks migrated, stopping.") | ||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||
} else { | ||
logInfo("All blocks not yet migrated.") | ||
} | ||
} else { | ||
logInfo("No running tasks, no block migration configured, stopping.") | ||
exitExecutor(0, "Finished decommissioning", notifyDriver = true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if we need to wait for tasks to finish if storage decommission is disabled. I mean, for a shuffle map task and result task with indirect result, their outputs still base on blocks. As a result, we'd spend time waiting for them to finish but get nothing good in return. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless it's an action. If someone just wants things to exit immediately they should not enable decommissioning. |
||
} | ||
} else { | ||
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you missed a s" here ... the string interpolation isn't happening |
||
// If there is a running task it could store blocks, so make sure we wait for a | ||
// migration loop to complete after the last task is done. | ||
// Note: this is only advanced if there is a running task, if there | ||
// is no running task but the blocks are not done migrating this does not | ||
// move forward. | ||
lastTaskRunningTime = System.nanoTime() | ||
} | ||
} | ||
} | ||
} | ||
shutdownThread.setDaemon(true) | ||
shutdownThread.start() | ||
|
||
logInfo("Will exit when finished decommissioning") | ||
// Return true since we are handling a signal | ||
true | ||
} catch { | ||
case e: Exception => | ||
logError(s"Error ${e} during attempt to decommission self") | ||
logError("Unexpected error while decommissioning self", e) | ||
false | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
case e: Exception => | ||
logError(s"Unexpected error during decommissioning ${e.toString}", e) | ||
} | ||
// Send decommission message to the executor, this may be a duplicate since the executor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we need to prevent duplicate |
||
// could have been the one to notify us. But it's also possible the notification came from | ||
// elsewhere and the executor does not yet know. | ||
executorDataMap.get(executorId) match { | ||
case Some(executorInfo) => | ||
executorInfo.executorEndpoint.send(DecommissionSelf) | ||
case None => | ||
// Ignoring the executor since it is not registered. | ||
logWarning(s"Attempted to decommission unknown executor $executorId.") | ||
} | ||
logInfo(s"Finished decommissioning executor $executorId.") | ||
|
||
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager( | |
} | ||
} | ||
|
||
/* | ||
* Returns the last migration time and a boolean denoting if all the blocks have been migrated. | ||
* If there are any tasks running since that time the boolean may be incorrect. | ||
*/ | ||
private[spark] def lastMigrationInfo(): (Long, Boolean) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of directly exposing the "BlocksMigrated" and "BlocksToMigrate" instead of a single boolean ? I think it will provide more debugging value than a single flag. ie, instead of returning "BlocksMigrated >= BlocksToMigrate", return the two counters directly. Or maybe this may be a lot more work because you want to track this for both shuffle as well as persisted blocks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd really rather not. I think the level above us should only be concerned if the blocks are done migrating or not. If we expose that up a level the logic is going to get even more complicated cross-class boundaries. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool !. I am always anal about logging to a fault :-P |
||
decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) | ||
} | ||
|
||
private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = | ||
master.getReplicateInfoForRDDBlocks(blockManagerId) | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,6 +18,7 @@ | |||||
package org.apache.spark.storage | ||||||
|
||||||
import java.util.concurrent.ExecutorService | ||||||
import java.util.concurrent.atomic.AtomicInteger | ||||||
|
||||||
import scala.collection.JavaConverters._ | ||||||
import scala.collection.mutable | ||||||
|
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner( | |||||
private val maxReplicationFailuresForDecommission = | ||||||
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) | ||||||
|
||||||
// Used for tracking if our migrations are complete. Readable for testing | ||||||
@volatile private[storage] var lastRDDMigrationTime: Long = 0 | ||||||
@volatile private[storage] var lastShuffleMigrationTime: Long = 0 | ||||||
@volatile private[storage] var rddBlocksLeft: Boolean = true | ||||||
@volatile private[storage] var shuffleBlocksLeft: Boolean = true | ||||||
|
||||||
/** | ||||||
* This runnable consumes any shuffle blocks in the queue for migration. This part of a | ||||||
* producer/consumer where the main migration loop updates the queue of blocks to be migrated | ||||||
|
@@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner( | |||||
null)// class tag, we don't need for shuffle | ||||||
logDebug(s"Migrated sub block ${blockId}") | ||||||
} | ||||||
logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") | ||||||
logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}") | ||||||
} else { | ||||||
logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") | ||||||
} | ||||||
numMigratedShuffles.incrementAndGet() | ||||||
} | ||||||
} | ||||||
// This catch is intentionally outside of the while running block. | ||||||
|
@@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner( | |||||
// Shuffles which are either in queue for migrations or migrated | ||||||
private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() | ||||||
|
||||||
// Shuffles which have migrated. This used to know when we are "done", being done can change | ||||||
// if a new shuffle file is created by a running task. | ||||||
private val numMigratedShuffles = new AtomicInteger(0) | ||||||
|
||||||
// Shuffles which are queued for migration & number of retries so far. | ||||||
// Visible in storage for testing. | ||||||
private[storage] val shufflesToMigrate = | ||||||
new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]() | ||||||
|
||||||
// Set if we encounter an error attempting to migrate and stop. | ||||||
@volatile private var stopped = false | ||||||
@volatile private var stoppedRDD = | ||||||
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) | ||||||
@volatile private var stoppedShuffle = | ||||||
!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) | ||||||
|
||||||
private val migrationPeers = | ||||||
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() | ||||||
|
@@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner( | |||||
|
||||||
override def run(): Unit = { | ||||||
assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) | ||||||
while (!stopped && !Thread.interrupted()) { | ||||||
while (!stopped && !stoppedRDD && !Thread.interrupted()) { | ||||||
holdenk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
logInfo("Iterating on migrating from the block manager.") | ||||||
// Validate we have peers to migrate to. | ||||||
val peers = bm.getPeers(false) | ||||||
// If we have no peers give up. | ||||||
if (peers.isEmpty) { | ||||||
stopped = true | ||||||
stoppedRDD = true | ||||||
} | ||||||
try { | ||||||
val startTime = System.nanoTime() | ||||||
logDebug("Attempting to replicate all cached RDD blocks") | ||||||
decommissionRddCacheBlocks() | ||||||
rddBlocksLeft = decommissionRddCacheBlocks() | ||||||
lastRDDMigrationTime = startTime | ||||||
logInfo("Attempt to replicate all cached blocks done") | ||||||
logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") | ||||||
Thread.sleep(sleepInterval) | ||||||
} catch { | ||||||
case e: InterruptedException => | ||||||
logInfo("Interrupted during migration, will not refresh migrations.") | ||||||
stopped = true | ||||||
logInfo("Interrupted during RDD migration, stopping") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for changing this to 'stopping'. "will not refresh migrations" was not clear. :-) |
||||||
stoppedRDD = true | ||||||
case NonFatal(e) => | ||||||
logError("Error occurred while trying to replicate for block manager decommissioning.", | ||||||
logError("Error occurred replicating RDD for block manager decommissioning.", | ||||||
e) | ||||||
stopped = true | ||||||
stoppedRDD = true | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner( | |||||
|
||||||
override def run() { | ||||||
assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) | ||||||
while (!stopped && !Thread.interrupted()) { | ||||||
while (!stopped && !stoppedShuffle && !Thread.interrupted()) { | ||||||
try { | ||||||
logDebug("Attempting to replicate all shuffle blocks") | ||||||
refreshOffloadingShuffleBlocks() | ||||||
val startTime = System.nanoTime() | ||||||
shuffleBlocksLeft = refreshOffloadingShuffleBlocks() | ||||||
lastShuffleMigrationTime = startTime | ||||||
logInfo("Done starting workers to migrate shuffle blocks") | ||||||
Thread.sleep(sleepInterval) | ||||||
} catch { | ||||||
case e: InterruptedException => | ||||||
logInfo("Interrupted during migration, will not refresh migrations.") | ||||||
stopped = true | ||||||
stoppedShuffle = true | ||||||
case NonFatal(e) => | ||||||
logError("Error occurred while trying to replicate for block manager decommissioning.", | ||||||
e) | ||||||
stopped = true | ||||||
stoppedShuffle = true | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner( | |||||
* but rather shadows them. | ||||||
* Requires an Indexed based shuffle resolver. | ||||||
* Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. | ||||||
* Returns true if we are not done migrating shuffle blocks. | ||||||
*/ | ||||||
private[storage] def refreshOffloadingShuffleBlocks(): Unit = { | ||||||
private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { | ||||||
// Update the queue of shuffles to be migrated | ||||||
logInfo("Offloading shuffle blocks") | ||||||
val localShuffles = bm.migratableResolver.getStoredShuffles().toSet | ||||||
|
@@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner( | |||||
deadPeers.foreach { peer => | ||||||
migrationPeers.get(peer).foreach(_.running = false) | ||||||
} | ||||||
// If we don't have anyone to migrate to give up | ||||||
if (migrationPeers.values.find(_.running == true).isEmpty) { | ||||||
stoppedShuffle = true | ||||||
} | ||||||
// If we found any new shuffles to migrate or otherwise have not migrated everything. | ||||||
newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner( | |||||
/** | ||||||
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers | ||||||
* Visible for testing | ||||||
* Returns true if we have not migrated all of our RDD blocks. | ||||||
*/ | ||||||
private[storage] def decommissionRddCacheBlocks(): Unit = { | ||||||
private[storage] def decommissionRddCacheBlocks(): Boolean = { | ||||||
val replicateBlocksInfo = bm.getMigratableRDDBlocks() | ||||||
// Refresh peers and validate we have somewhere to move blocks. | ||||||
|
||||||
if (replicateBlocksInfo.nonEmpty) { | ||||||
logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + | ||||||
"for block manager decommissioning") | ||||||
} else { | ||||||
logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") | ||||||
return | ||||||
return false | ||||||
} | ||||||
|
||||||
// TODO: We can sort these blocks based on some policy (LRU/blockSize etc) | ||||||
|
@@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner( | |||||
if (blocksFailedReplication.nonEmpty) { | ||||||
logWarning("Blocks failed replication in cache decommissioning " + | ||||||
s"process: ${blocksFailedReplication.mkString(",")}") | ||||||
return true | ||||||
} | ||||||
return false | ||||||
} | ||||||
|
||||||
private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { | ||||||
|
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner( | |||||
} | ||||||
logInfo("Stopped storage decommissioner") | ||||||
} | ||||||
|
||||||
/* | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the doc ! |
||||||
* Returns the last migration time and a boolean for if all blocks have been migrated. | ||||||
* The last migration time is calculated to be the minimum of the last migration of any | ||||||
* running migration (and if there are now current running migrations it is set to current). | ||||||
* This provides a timeStamp which, if there have been no tasks running since that time | ||||||
* we can know that all potential blocks that can be have been migrated off. | ||||||
*/ | ||||||
private[storage] def lastMigrationInfo(): (Long, Boolean) = { | ||||||
if (stopped || (stoppedRDD && stoppedShuffle)) { | ||||||
// Since we don't have anything left to migrate ever (since we don't restart once | ||||||
// stopped), return that we're done with a validity timestamp that doesn't expire. | ||||||
(Long.MaxValue, true) | ||||||
} else { | ||||||
// Chose the min of the active times. See the function description for more information. | ||||||
val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) { | ||||||
Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This must be As I understand in spark/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala Line 305 in 484f8e2
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking a bit more on this part. The intention of My first thought was to improve this by expressing this intention directly with a simple flag in the Then I went a bit further and checked the spark/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala Line 267 in 484f8e2
Still I like to have a stopping condition in the while loop so please consider to use the flag instead of the times. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @holdenk @agrawaldevesh what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with you @attilapiros. On both the counts:
IMHO, this is great feedback and will improve this PR. Thank you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it only increases if there is a task running. We don’t want it to shutdown or consider the blocks migrated if there is a task migrated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PR against your branch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I confess that even I am finding the logic in the lastMigrationInfo a bit hard to follow. And as far as I can see, this logic is the key to getting the shutdown thread to exit cleanly. If the information returned by lastMigrationInfo is wrong then the shutdown thread may exit prematurely or never. Both of which should be avoided, particularly because the shutdown thread has no timeout on the amount of time it can hang around. This complexity is intrinsic. The logic is indeed complex but I think what we can add some more documentation here, perhaps explaining the intent of the code using a couple of examples ? Even better would be to please add a unit test for this. I think it should be easy to unit test just the lastMigrationInfo. This is just testing that the function is behaving as expected by stubbing out other parts. This test would also help with providing the necessary guardrails as we change this function in the future. I expect this function to change as we production harden the cache/shuffle block migration in the near term. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I’m down to unit test this function, makes sense to me and it’s fairly standalone. I’ll work on the documentation over the weekend. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. holdenk#7 :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the approach in that PR re-opens the race condition were preventing here. I would really rather not do that. I’d also like us to make progress here though, so we can temporarily accept the race condition and file a JIRA and revisit it later as a stand-alone item if y’all are not comfortable with any of the ways to avoid the race. |
||||||
} else if (!stoppedShuffle) { | ||||||
lastShuffleMigrationTime | ||||||
} else { | ||||||
lastRDDMigrationTime | ||||||
} | ||||||
|
||||||
// Technically we could have blocks left if we encountered an error, but those blocks will | ||||||
// never be migrated, so we don't care about them. | ||||||
val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) | ||||||
(lastMigrationTime, blocksMigrated) | ||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { | |
assert(sleepyRdd.count() === 10) | ||
} | ||
|
||
test("verify a task with all workers decommissioned succeeds") { | ||
test("verify a running task with all workers decommissioned succeeds") { | ||
// Wait for the executors to come up | ||
TestUtils.waitUntilExecutorsUp(sc = sc, | ||
numExecutors = 2, | ||
timeout = 30000) // 30s | ||
|
||
val input = sc.parallelize(1 to 10) | ||
// Listen for the job | ||
val sem = new Semaphore(0) | ||
|
@@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { | |
sem.release() | ||
} | ||
}) | ||
TestUtils.waitUntilExecutorsUp(sc = sc, | ||
numExecutors = 2, | ||
timeout = 30000) // 30s | ||
|
||
val sleepyRdd = input.mapPartitions{ x => | ||
Thread.sleep(5000) // 5s | ||
x | ||
|
@@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { | |
execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) | ||
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) | ||
assert(asyncCountResult === 10) | ||
// Try and launch task after decommissioning, this should fail | ||
val postDecommissioned = input.map(x => x) | ||
val postDecomAsyncCount = postDecommissioned.countAsync() | ||
val thrown = intercept[java.util.concurrent.TimeoutException]{ | ||
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) | ||
} | ||
assert(postDecomAsyncCount.isCompleted === false, | ||
"After exec decommission new task could not launch") | ||
Comment on lines
-79
to
-86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is removed because we relaunch it now so it would succeed. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be DecommissionWorker ? That sounds more like a command to me.
Whereas
WorkerDecommissioned
sounds like a state.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong preferences (although this doesn't have
ed
at the end so I don't think it's implying a state).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most other command messages start with a verb, while the 'state updated' ones are in past tense. So I think we should rename this to DecommissionWorker to match that.