Skip to content

[SPARK-31197][CORE] Shutdown executor once we are done decommissioning #29211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
54c6298
Shutdown executor once we are done decommissioning
holdenk Jul 21, 2020
8c10f9a
Whitespace cleanup
holdenk Jul 23, 2020
27a5f49
Code review feedback, various cleanups
holdenk Jul 24, 2020
5d4b52a
Just use new thread and set daemon since runInNewThread does a join w…
holdenk Jul 24, 2020
cdc3336
Update the text since 10 seconds is quick but not as fast as we can
holdenk Jul 24, 2020
a17c442
rename WorkerDecommission message to DecommissionWorker. (CR feedback)
holdenk Jul 24, 2020
2bbd5d1
Revert "rename WorkerDecommission message to DecommissionWorker. (CR …
holdenk Jul 24, 2020
0160299
Fix it so we don't block the decom message
holdenk Jul 27, 2020
284e0cd
Was thinking a little bit, if we move the sleep up a bit we can unify…
holdenk Jul 27, 2020
7a157ff
Fix
holdenk Jul 28, 2020
640b0ef
Intentionally don't start the tread to verify the executor is being s…
holdenk Jul 28, 2020
7e7821b
Revert "Intentionally don't start the tread to verify the executor is…
holdenk Jul 28, 2020
dcb01fc
fix thread start
holdenk Jul 29, 2020
060a443
Use tryAcquire so we don't block forever (CR feedback)
holdenk Jul 29, 2020
20d3cc0
Since we shut down & then re-launch the executors now as decommission…
holdenk Jul 30, 2020
22eb4c6
The executor can also have exited completely and then there is no entry
holdenk Jul 30, 2020
e139b9f
Try and write some comments
holdenk Jul 31, 2020
5c0a544
Add unit tests for how we handle timestamp tracking of migration time…
holdenk Aug 3, 2020
679fbe7
Code review feedback, mostly cleanup the unit tests
holdenk Aug 4, 2020
e81c3fc
CR feedback, replace some magic values with option in the tests to im…
holdenk Aug 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
case WorkerDecommission(_, _) =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this be DecommissionWorker ? That sounds more like a command to me.

Whereas WorkerDecommissioned sounds like a state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong preferences (although this doesn't have ed at the end so I don't think it's implying a state).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most other command messages start with a verb, while the 'state updated' ones are in past tense. So I think we should rename this to DecommissionWorker to match that.

decommissionSelf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
Expand All @@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]

@volatile private var decommissioned = false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: In the spirit of minimizing code diff, is there is a strong reason why this decommissioned flag was moved down from line 67 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't consider it related to the variables it was grouped with strongly enough so I moved it to stand alone.


override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
Expand Down Expand Up @@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend(
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)

case DecommissionSelf =>
logInfo("Received decommission self")
decommissionSelf()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
logInfo("Done decommissioning self.")
// Shutdown the executor once all tasks are gone & any configured migrations completed.
// Detecting migrations completion doesn't need to be perfect and we want to minimize the
// overhead for executors that are not in decommissioning state as overall that will be
// more of the executors. For example, this will not catch a block which is already in
// the process of being put from a remote executor before migration starts. This trade-off
// is viewed as acceptable to minimize introduction of any new locking structures in critical
// code paths.

val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
override def run(): Unit = {
var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s

while (true) {
logInfo("Checking to see if we can shutdown.")
Thread.sleep(sleep_time)
if (executor == null || executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
logInfo("No running tasks, checking migrations")
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
} else {
logInfo("All blocks not yet migrated.")
}
} else {
logInfo("No running tasks, no block migration configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we need to wait for tasks to finish if storage decommission is disabled. I mean, for a shuffle map task and result task with indirect result, their outputs still base on blocks. As a result, we'd spend time waiting for them to finish but get nothing good in return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless it's an action. If someone just wants things to exit immediately they should not enable decommissioning.

}
} else {
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed a s" here ... the string interpolation isn't happening

// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there
// is no running task but the blocks are not done migrating this does not
// move forward.
lastTaskRunningTime = System.nanoTime()
}
}
}
}
shutdownThread.setDaemon(true)
shutdownThread.start()

logInfo("Will exit when finished decommissioning")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
logError("Unexpected error while decommissioning self", e)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {

// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage

// Used to ask an executor to decommission itself. (Can be an internal message)
case object DecommissionSelf extends CoarseGrainedClusterMessage
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
// Send decommission message to the executor, this may be a duplicate since the executor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to prevent duplicate DecommissionSelf at the executor side? For now, I don't see we handle duplicate DecommissionSelf and it may create duplicate threads as a result.

// could have been the one to notify us. But it's also possible the notification came from
// elsewhere and the executor does not yet know.
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(DecommissionSelf)
case None =>
// Ignoring the executor since it is not registered.
logWarning(s"Attempted to decommission unknown executor $executorId.")
}
logInfo(s"Finished decommissioning executor $executorId.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
}
}

/*
* Returns the last migration time and a boolean denoting if all the blocks have been migrated.
* If there are any tasks running since that time the boolean may be incorrect.
*/
private[spark] def lastMigrationInfo(): (Long, Boolean) = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of directly exposing the "BlocksMigrated" and "BlocksToMigrate" instead of a single boolean ? I think it will provide more debugging value than a single flag.

ie, instead of returning "BlocksMigrated >= BlocksToMigrate", return the two counters directly.

Or maybe this may be a lot more work because you want to track this for both shuffle as well as persisted blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really rather not. I think the level above us should only be concerned if the blocks are done migrating or not. If we expose that up a level the logic is going to get even more complicated cross-class boundaries.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool !. I am always anal about logging to a fault :-P

decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false))
}

private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] =
master.getReplicateInfoForRDDBlocks(blockManagerId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
private val maxReplicationFailuresForDecommission =
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)

// Used for tracking if our migrations are complete. Readable for testing
@volatile private[storage] var lastRDDMigrationTime: Long = 0
@volatile private[storage] var lastShuffleMigrationTime: Long = 0
@volatile private[storage] var rddBlocksLeft: Boolean = true
@volatile private[storage] var shuffleBlocksLeft: Boolean = true

/**
* This runnable consumes any shuffle blocks in the queue for migration. This part of a
* producer/consumer where the main migration loop updates the queue of blocks to be migrated
Expand Down Expand Up @@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner(
null)// class tag, we don't need for shuffle
logDebug(s"Migrated sub block ${blockId}")
}
logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
} else {
logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}")
}
numMigratedShuffles.incrementAndGet()
}
}
// This catch is intentionally outside of the while running block.
Expand All @@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner(
// Shuffles which are either in queue for migrations or migrated
private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()

// Shuffles which have migrated. This used to know when we are "done", being done can change
// if a new shuffle file is created by a running task.
private val numMigratedShuffles = new AtomicInteger(0)

// Shuffles which are queued for migration & number of retries so far.
// Visible in storage for testing.
private[storage] val shufflesToMigrate =
new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()

// Set if we encounter an error attempting to migrate and stop.
@volatile private var stopped = false
@volatile private var stoppedRDD =
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
@volatile private var stoppedShuffle =
!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)

private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
Expand All @@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner(

override def run(): Unit = {
assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
while (!stopped && !Thread.interrupted()) {
while (!stopped && !stoppedRDD && !Thread.interrupted()) {
logInfo("Iterating on migrating from the block manager.")
// Validate we have peers to migrate to.
val peers = bm.getPeers(false)
// If we have no peers give up.
if (peers.isEmpty) {
stopped = true
stoppedRDD = true
}
try {
val startTime = System.nanoTime()
logDebug("Attempting to replicate all cached RDD blocks")
decommissionRddCacheBlocks()
rddBlocksLeft = decommissionRddCacheBlocks()
lastRDDMigrationTime = startTime
logInfo("Attempt to replicate all cached blocks done")
logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
logInfo("Interrupted during migration, will not refresh migrations.")
stopped = true
logInfo("Interrupted during RDD migration, stopping")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing this to 'stopping'. "will not refresh migrations" was not clear. :-)

stoppedRDD = true
case NonFatal(e) =>
logError("Error occurred while trying to replicate for block manager decommissioning.",
logError("Error occurred replicating RDD for block manager decommissioning.",
e)
stopped = true
stoppedRDD = true
}
}
}
Expand All @@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner(

override def run() {
assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
while (!stopped && !Thread.interrupted()) {
while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
try {
logDebug("Attempting to replicate all shuffle blocks")
refreshOffloadingShuffleBlocks()
val startTime = System.nanoTime()
shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
lastShuffleMigrationTime = startTime
logInfo("Done starting workers to migrate shuffle blocks")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
logInfo("Interrupted during migration, will not refresh migrations.")
stopped = true
stoppedShuffle = true
case NonFatal(e) =>
logError("Error occurred while trying to replicate for block manager decommissioning.",
e)
stopped = true
stoppedShuffle = true
}
}
}
Expand All @@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner(
* but rather shadows them.
* Requires an Indexed based shuffle resolver.
* Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage.
* Returns true if we are not done migrating shuffle blocks.
*/
private[storage] def refreshOffloadingShuffleBlocks(): Unit = {
private[storage] def refreshOffloadingShuffleBlocks(): Boolean = {
// Update the queue of shuffles to be migrated
logInfo("Offloading shuffle blocks")
val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
Expand All @@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner(
deadPeers.foreach { peer =>
migrationPeers.get(peer).foreach(_.running = false)
}
// If we don't have anyone to migrate to give up
if (migrationPeers.values.find(_.running == true).isEmpty) {
stoppedShuffle = true
}
// If we found any new shuffles to migrate or otherwise have not migrated everything.
newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get()
}

/**
Expand All @@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner(
/**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
* Returns true if we have not migrated all of our RDD blocks.
*/
private[storage] def decommissionRddCacheBlocks(): Unit = {
private[storage] def decommissionRddCacheBlocks(): Boolean = {
val replicateBlocksInfo = bm.getMigratableRDDBlocks()
// Refresh peers and validate we have somewhere to move blocks.

if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
"for block manager decommissioning")
} else {
logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate")
return
return false
}

// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
Expand All @@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner(
if (blocksFailedReplication.nonEmpty) {
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
return true
}
return false
}

private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = {
Expand Down Expand Up @@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
}
logInfo("Stopped storage decommissioner")
}

/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the doc !

* Returns the last migration time and a boolean for if all blocks have been migrated.
* The last migration time is calculated to be the minimum of the last migration of any
* running migration (and if there are now current running migrations it is set to current).
* This provides a timeStamp which, if there have been no tasks running since that time
* we can know that all potential blocks that can be have been migrated off.
*/
private[storage] def lastMigrationInfo(): (Long, Boolean) = {
if (stopped || (stoppedRDD && stoppedShuffle)) {
// Since we don't have anything left to migrate ever (since we don't restart once
// stopped), return that we're done with a validity timestamp that doesn't expire.
(Long.MaxValue, true)
} else {
// Chose the min of the active times. See the function description for more information.
val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) {
Math.min(lastRDDMigrationTime, lastShuffleMigrationTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be Math.max.

As I understand in CoarseGrainedExecutorBackend the lastTaskRunningTime increases with about 1000ms in every iteration, let's assume exactly 1000ms. So if the RDD migration finished at 500ms (let's count in the example from 0 here) but shuffle files to be migrated are still left and they will be finished only in the next round (let's assume in 1500ms) then we we never shutdown the executor: as in the current round blocksMigrated is false and in all the following ones migrationTime will be less than lastTaskRunningTime, so this condition will be never satisfied:

if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a bit more on this part.

The intention of lastRDDMigrationTime, lastShuffleMigrationTime and lastTaskRunningTime is to notify the driver only once about the exit of this executor.

My first thought was to improve this by expressing this intention directly with a simple flag in the shutdownThread(like exitTriggered which can be used as a stopping condition instead of the while (true) ) and remove all the three time variables and change lastMigrationInfo to return just a boolean and rename it for its new role (for example to isFinished or just simply to finished).

Then I went a bit further and checked the exitExecutor method and I think even the flag is not necessarily needed:

Still I like to have a stopping condition in the while loop so please consider to use the flag instead of the times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @agrawaldevesh what do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you @attilapiros. On both the counts:

  • Converting this into a level trigger vs an edge trigger: ie the flag goes on and remains on when it is time to exit.
  • Having consistency b/w the termination condition of the thread above and the computation of the boolean variable in this method. Having a helper method would help with this and also improve longer term maintainability.

IMHO, this is great feedback and will improve this PR. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it only increases if there is a task running. We don’t want it to shutdown or consider the blocks migrated if there is a task migrated.
Unfortunately this flag can come and go depending on the tasks running on the executor which is why it’s structured the way it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR against your branch

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawaldevesh: what is your opinion/idea?

I confess that even I am finding the logic in the lastMigrationInfo a bit hard to follow. And as far as I can see, this logic is the key to getting the shutdown thread to exit cleanly. If the information returned by lastMigrationInfo is wrong then the shutdown thread may exit prematurely or never. Both of which should be avoided, particularly because the shutdown thread has no timeout on the amount of time it can hang around.

This complexity is intrinsic. The logic is indeed complex but I think what we can add some more documentation here, perhaps explaining the intent of the code using a couple of examples ? Even better would be to please add a unit test for this. I think it should be easy to unit test just the lastMigrationInfo. This is just testing that the function is behaving as expected by stubbing out other parts. This test would also help with providing the necessary guardrails as we change this function in the future. I expect this function to change as we production harden the cache/shuffle block migration in the near term.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I’m down to unit test this function, makes sense to me and it’s fairly standalone.

I’ll work on the documentation over the weekend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the approach in that PR re-opens the race condition were preventing here. I would really rather not do that.

I’d also like us to make progress here though, so we can temporarily accept the race condition and file a JIRA and revisit it later as a stand-alone item if y’all are not comfortable with any of the ways to avoid the race.

} else if (!stoppedShuffle) {
lastShuffleMigrationTime
} else {
lastRDDMigrationTime
}

// Technically we could have blocks left if we encountered an error, but those blocks will
// never be migrated, so we don't care about them.
val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD)
(lastMigrationTime, blocksMigrated)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
assert(sleepyRdd.count() === 10)
}

test("verify a task with all workers decommissioned succeeds") {
test("verify a running task with all workers decommissioned succeeds") {
// Wait for the executors to come up
TestUtils.waitUntilExecutorsUp(sc = sc,
numExecutors = 2,
timeout = 30000) // 30s

val input = sc.parallelize(1 to 10)
// Listen for the job
val sem = new Semaphore(0)
Expand All @@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
sem.release()
}
})
TestUtils.waitUntilExecutorsUp(sc = sc,
numExecutors = 2,
timeout = 30000) // 30s

val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(5000) // 5s
x
Expand All @@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false)))
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
// Try and launch task after decommissioning, this should fail
val postDecommissioned = input.map(x => x)
val postDecomAsyncCount = postDecommissioned.countAsync()
val thrown = intercept[java.util.concurrent.TimeoutException]{
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
}
assert(postDecomAsyncCount.isCompleted === false,
"After exec decommission new task could not launch")
Comment on lines -79 to -86
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed because we relaunch it now so it would succeed.

}
}
Loading