Skip to content

Conversation

@brad-kaiser
Copy link

@brad-kaiser brad-kaiser commented Aug 24, 2017

What changes were proposed in this pull request?

Add an option to to recover cached data when using dynamic allocation. Currently when dynamic allocation decommissions an idle executor, any cached RDD blocks on that executor will be lost. Saving that data has the potential for a real performance boost for shared instances of spark.

If there are any problems saving cached data, this change falls back to regular dynamic allocation behavior and drops the blocks.

How was this patch tested?

There are unit tests in RecoverCacheShutdownSuite and integration tests in RecoverCachedDataSuite. There are also benchmark results available here: https://docs.google.com/document/d/1E6_rhAAJB8Ww0n52-LYcFTO1zhJBWgfIXzNjLi29730/edit?usp=sharing

@brad-kaiser brad-kaiser changed the title [SPARK-21097][CORE][WIP] Add option to recover cached data during dynamic a… [SPARK-21097][CORE][WIP] Add option to recover cached data Aug 24, 2017
@holdenk
Copy link
Contributor

holdenk commented Aug 24, 2017

Welcome to the Apache Spark project @brad-kaiser, GitHub tells me this is your first time contributing code :)

While this is a bit outside of my area of review as we've discussed I've been doing some similar WIP work in the area along with some others so I'm hoping we can all find a way to collaborate.

In the meantime I can maybe help out with Jenkins ok to test.

@brad-kaiser brad-kaiser changed the title [SPARK-21097][CORE][WIP] Add option to recover cached data [SPARK-21097][CORE] Add option to recover cached data Sep 1, 2017
@brad-kaiser
Copy link
Author

Hi @andrewor14 @vanzin @tdas , would you mind taking a look at this change? Thanks!

@vanzin
Copy link
Contributor

vanzin commented Sep 5, 2017

It might take some time for me to find time to review this, but in the meantime others who might be interested: @cloud-fan @JoshRosen

@brad-kaiser
Copy link
Author

Thanks vanzin.

@erikerlandson
Copy link
Contributor

I have been thinking about a different but related use case; supporting the ability to operate in dynamic allocation mode without requiring a separate shuffle service. The motivation is to make it more frictionless for a spark driver to operate using D.A. in a container platform (such as k8s), where standing up the shuffle service adds an extra step that may require cluster admin intervention.

Some of this logic seems to overlap - putting executors into a "draining" state, and attempting to reduce data loss from executor scale-down so that the application doesn't start to thrash.

@brad-kaiser
Copy link
Author

Hi @erikerlandson ,

Thanks for taking a look. Yeah I definitely think the logic here is similar to your use case. I am specifically only replicating blocks cached in memory, but there's no reason you couldn't also do blocks cached to disk. If you think you can use it for spark on kubernetes that would be awesome.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a first round of comments, but I need at least another go to fully understand this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a config constant in core/src/main/scala/org/apache/spark/internal/config/package.scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: math.max is more Scala-y.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is off.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this class is very cryptic. After a lot of mental gymnastics it sounds like an action, which is a bad name for a class.

Perhaps CacheRecoveryManager?

Also seems like the constructor should be private too? (class RecoverCacheShutdown private (...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that better, renamed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use the constructor in the unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you import the types instead? Also the code after the => would look better in the next line, given that's how the other cases do it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an issue with scala.util.Success getting shadowed by the success object in TaskEndReason.scala, but that's fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blacklist might be a better name, but this is fine too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left it as excluding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird for an RPC (even if in this case it's actually a local call) to return a Future. Why not pass the context to replicateOneBlock and have it reply with the correct value when it's done?

That would simplify the code for the sender of this message, too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exclude should at least be consistent with the name of the corresponding argument in ReplicateBlock.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this comment makes the whole comment block a little bit confusing. (As in, which one is correct?)

If you want to document the excluding parameter you can create a scaladoc comment instead. Mentioning "graceful shutdown" here doesn't really add a lot of information.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed both comments because the old one isn't accurate anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at other blocks above, your indentation is off.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@brad-kaiser
Copy link
Author

Thanks for taking a look @vanzin. I'll address these points.

@vanzin
Copy link
Contributor

vanzin commented Sep 25, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82160 has finished for PR 19041 at commit d174761.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class GetSizeOfBlocks(blocks: Map[String, Set[RDDBlockId]]) extends ToBlockManagerMaster

@SparkQA
Copy link

SparkQA commented Oct 2, 2017

Test build #82389 has finished for PR 19041 at commit 59d593f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brad-kaiser
Copy link
Author

Hi @vanzin, I have addressed all of your comments. If there's anything else I can do please let me know. Thanks for your help.

@SparkQA
Copy link

SparkQA commented Oct 3, 2017

Test build #82427 has finished for PR 19041 at commit 95d5eb5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 4, 2017

Test build #82450 has finished for PR 19041 at commit a3af4bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 5, 2017

Test build #82478 has finished for PR 19041 at commit 985874d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a bunch of comments - most of them stylistic. Still have to go through tests and another look at the block manager changes...

import org.apache.spark.util.ThreadUtils

/**
* Responsible for asynchronously replicating all of an executors cached blocks, and then shutting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor's

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

conf: SparkConf)
extends Logging {

private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as before with the weird name. "cache-recovery-manager-pool" is a much better name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* @return
*/
def stop(): java.util.List[Runnable] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You never use the return value, why not Unit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* @param execIds the executors to check
*/
private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: Unit =

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is gone

}

/**
* Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method doesn't return the list, so its short description should more accurately reflect what it actually does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is gone now.


override def markPendingToRemove(executorIds: Seq[String]): Unit = synchronized {
logDebug(s"marking $executorIds pending to remove")
executorIds.foreach(id => executorsPendingToRemove.put(id, true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.foreach { id => ... }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

case GetStorageStatus =>
context.reply(storageStatus)

case GetCachedBlocks(executorId) => context.reply(getCachedBlocks(executorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code in next line to follow pattern in other cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is removed

info <- blockManagerInfo.get(blockManagerId)
replicaSet <- blockLocations.asScala.get(blockId)
replicas = replicaSet.toSeq
maxReps = replicaSet.size + 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why + 2? Good to add comment explaining.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented

response.getOrElse(Future.successful(false)).foreach(context.reply)
}

private def getCachedBlocks(executorId: String): collection.Set[BlockId] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just Set[BlockId]?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

cachedBlocks.getOrElse(Set.empty)
}

private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]): Map[String, Long] = for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the body in { } - looks kinda odd without it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@brad-kaiser
Copy link
Author

Thanks @vanzin I will work on these comments.

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84489 has finished for PR 19041 at commit fc84aa7.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ReplicateLatestRDDBlock(executorId: String, excludingExecs: Seq[String])
  • case class RemoveBlockFromExecutor(executorId: String, blockId: RDDBlockId)

@vanzin
Copy link
Contributor

vanzin commented Dec 6, 2017

Can you fix the javadoc issue?

[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/core/target/java/org/apache/spark/CacheRecoveryManager.java:80: error: invalid use of @return
[error]    * @return
[error]      ^

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first pass through the tests and noticed you're using Thread.sleep heavily. That's a very common source of test flakiness when the machines running tests are loaded, so please rework the code to be more resilient to timing issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very minor, but because you don't have the full picture here, it could happen that some of the blocks in the "chosen" executors cannot be replicated (e.g. because a block is too large for any of the remaining executors to hold), and you could potentially replicate blocks from these "loser" executors in that case.

Ok not to handle this, but it'd be nice to add a comment (for a future enhancement, maybe).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conversely, you might decide to not even try to replicate some executors, because all of their data can't be replicated, but in fact some of the blocks on the executor could be replicated (maybe even the most useful ones).

given the limitations & complications of this method, I'm wondering whether its even worth it to do this filtering?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I was really concerned with not hurting the stability of the cluster. So in the case where we are running out of cluster memory, I thought it would be safer to fail fast and give up on replication.

Your right it would probably be more useful to get as many complete RDDs recovered as possible, and this method is complicated. I can test taking out this check and see what happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this more readable:

canBeRecovered.foreach { execId =>
  replicateUntilDone(execId, startKillTimer(execId))
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats much better! fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can go in previous import.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: TrySuccess (more verbose but less cryptic).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this only if the timer was successfully stopped? (Same above.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use config constants where possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@brad-kaiser
Copy link
Author

Thanks @vanzin I fixed the javadoc bug and I will address these issues.

I spent some time investigating an issue that turned out to be SPARK-22618. In the process I rewrote a lot of CacheRecoveryManager so it is much simpler. Now I just removed blocks off of the dying executor and I don't have to keep track of block state in the CacheRecoveryManager itself.

@vanzin
Copy link
Contributor

vanzin commented Mar 8, 2018

@brad-kaiser have you had time to look at Imran's feedback? Your patch also has conflicts now...

@brad-kaiser
Copy link
Author

Hi @squito , thank you for your feedback! I have not been able to work on this PR lately, but I will get back to it soon. @vanzin I will also address the rest of your feedback and fix those merge conflicts. Thanks!

@squito
Copy link
Contributor

squito commented Mar 16, 2018

Thanks @brad-kaiser -- want to re-iterate my comment from Feb 2nd, I think that is really the most important part to address before getting into the details of the current implementation:

Thought some more about the race between RemoveBlock getting sent back from the executor vs when the CacheRecoveryManager tries to replicate the next block -- actually why is there the back-and-forth with the driver for every block? Why isn't there just one message from the CacheRecoveryManager to the executor, saying "Drain all RDD blocks" and then one message from the executor back to the driver when its done?

@brad-kaiser
Copy link
Author

Hi @squito,

The back and forth communication between CacheRecoveryManager and the BlockManagerMasterEndpoint is so that we always have an up to date view of what executors are undergoing cache recovery and we don't replicate blocks to those executors. If you look at recoverLatestBlock, we include the contents of the recoveringExecutors cache.

We could conceivably move that cache into the block manager master endpoint, but I think that would end up being messier. I wanted to keep all the cache recovery code localized and not clutter up Block Manager Master Endpoint. CacheRecoveryManager and BlockManagerMaster Endpoint will also be local to the same process so rpc calls between them should be cheap, especially compared to the time it will take to copy blocks around.

I will look into the race between removing the block and replicating the next block.

Thanks

@brad-kaiser
Copy link
Author

@squito

I updated BlockManagerMasterEndpoint.recoverLatestRDDBlock so that we proactively remove the block from blockManagerInfo when we ask the slave to remove the block. Thanks for finding this!

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88471 has finished for PR 19041 at commit 668dd82.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2018

Test build #88550 has finished for PR 19041 at commit c79b68f.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Test build #88588 has finished for PR 19041 at commit ca985c7.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@brad-kaiser
Copy link
Author

@squito

I've added a line in ExecutorAllocationManager.validateSettings to ensure that the cached executor timeout is set if cache recovery is enabled. I imagine most people would want to set the regular executor timeout to be a little lower than the cached executor timeout when using cache recovery.

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Test build #88603 has finished for PR 19041 at commit c4e9a80.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 27, 2018

Test build #88634 has finished for PR 19041 at commit e68ad5d.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 27, 2018

Test build #88635 has finished for PR 19041 at commit faf2b10.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 28, 2018

Test build #88673 has finished for PR 19041 at commit 9e8e68c.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88716 has finished for PR 19041 at commit c8f7ad0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ReplicateBlock(
  • case class RecoverLatestRDDBlock(executorId: String, excludingExecs: Seq[String])

@brad-kaiser
Copy link
Author

Hey @vanzin, @squito,

I think I've addressed all of your comments. If I missed something or you have more comments, just let me know.

Thanks
Brad

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paging back in still ... will finish review tomorrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than the latest rdd, it would actually make more sense to take advantage of the LRU already in the MemoryStore: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L91

but maybe that is not easy to expose.

But I think that also means that the receiving end will put the replicated block at the back of that LinkedHashMap, even though it really hasn't been accessed at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't you immediately kill those executors which dont' pass checkMem? are they getting killed somewhere else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type in doc is wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the reason you do this one block at a time is because you update recoveringExecutors between each call, right? To help avoid replicating to another executor which will start draining its blocks after this starts replicating the first block, but before it starts replicating later blocks?

if so, that explanation should go in a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find recoveringExecutors pretty confusing, I think its executors that are recovering from some problem, but are going to be OK -- not executors that are about to die, which we are recovering data from. how about drainingExecutors? (though I have a feeling this name may have been discussed in earlier rounds of comments and this is what we settled on ... if so, thats fine.)

…llocation

[SPARK-21097][CORE] cleaned up logging and tests

[SPARK-21097][CORE] added config constant and fixed some formatting issues

[SPARK-21097][CORE] renamed RecoverCacheShutdown to CacheRecoveryManager and made some small fixes

[SPARK-21097][CORE] cleaned up some style issues

[SPARK-21097][CORE] simplified memory checking code

[SPARK-21097][CORE] removed executorsToKill map I added to CoarseGrainedSchedulerBackend

[SPARK-21097][CORE] updated replicateOneBlock to avoid weird nested Future.

[SPARK-21097][CORE] Fix unit tests broken by change to ExecutorAllocationClient.killExecutors.
Remove printlns from RecoverCachedDataSuite

[SPARK-21097][CORE] Simplify CacheRecoveryManager and style cleanup

[SPARK-21097][CORE] fixed javadoc and addressed styling issue

[SPARK-21097][CORE] style fixes

[SPARK-21097][CORE] merge ReplicateLatestRDDBlock and RemoveBlockFromExecutor into RecoverLatestRDDBlock

[SPARK-21097][CORE] make .startCacheRecovery return a future. Remove Thread.sleeps from CacheRecoveryManagementSuite

[SPARK-21097][CORE] rename RecoverCachedDataSuite to CacheRecoveryIntegrationSuite

[SPARK-21097][CORE] fix bug w recoveringExecutors in CacheRecoveryManager
Rename RecoverCachedDataSuite to CacheRecoveryIntegrationSuite
Fix style issues in CacheRecoveryIntegrationSuite

[SPARK-21097][CORE] Don't make CacheRecoveryManager final, fix some comments and return types

[SPARK-21097][CORE] fix indentation and comment

[SPARK-21097][CORE] fix test names

[SPARK-21097][CORE] move cleanup code in test to finally block

[SPARK-21097][CORE] give test more time to finish and simplify FakeBMM

[SPARK-21097][CORE] address race condition in BlockManagerMasterEndpoint.recoverLatestRDDBlock by manually removing block from BlockManagerInfo before we ask the slave to remove it.

[SPARK-21097][CORE] refactor CacheRecoveryManager.startCacheRecovery to return a more sensible Future that returns whether the executors were killed becuase they were done recovering or because we timed out.

[SPARK-21097][CORE] Only start one shuffle service for all tests. Add comment explaining test. Fix some spacing issues.

[SPARK-21097] update ExecutorAllocationManager.validateSettings to prevent users from enabling cache recovery without setting spark.dynamicAllocation.cachedExecutorIdleTimeout. Update comments to be more clear.

[SPARK-21097][CORE] make CacheRecoveryManager.checkMem asynchronous

[SPARK-21097][CORE] fix comment and cleanup

[SPARK-21097][CORE] do not replicate disk only blocks
Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still disturbed by the +2 replicas, but lemme post my review so far as I look into it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there is a mix in the existing code, but we prefer to use asserts rather than "should" matchers now -- there have been some cases where a small syntax mistake makes should matchers do nothing, though code still compiles and looks ok. and I think its less familiar to most people

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't actually doing what the test name is, right? The eventually above this will wait until the executors have been killed, not just until they have started moving their cache blocks elsewhere. And why would rdd2 only get created on one exec? I think dynamic allocation will try to spun up 4 executors again, the tasks are just so short they happen to finish on one.

I don't think you're going to be able to test exactly what you want at this level of the api, though -- this probably isn't doing anything more than the "cached data is replicated before dynamic de-allocation" test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though there are 4 tasks, they could run on fewer executors before dynamic allocation spins them all up. This could become a flaky tests. I think you should change the waitUntilExecutorsUp to be 4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can probably just use this one test, I don't see much value in splitting it into more tests, and you can even rename this one to "cached data is replicated before dynamic de-allocation".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're not actually removing executors here immediately with the new cache recovery path, you should update the doc to describe that too.

Also we only mention the return type if we've got something interesting to say about it, so you can just skip mentioning it entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any meaningful difference between this and "replicate blocks until empty and then kill executor". This isn't actually checking the timer is canceled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't totally understand what you're simulating here with the FakeBMM. Which exec is each block stored on? If I understand correctly, you're not trying to tie each block to a particular exec, and just saying whatever the next exec we get, say the next block is on that. If so, I'd at least have a comment indicating that. Though I actually think this would be easier to follow if you tied blocks to specific executors, otherwise when there is a failure its hard to follow exactly what is going on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand the meaning of this variable at all on my first read, I had assumed it was blockId -> replicationCount, and was really confused by that. Perhaps renaming to executorToDrainedBlockCount?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to go along with my comment about never killing executors that fail the check for having enough memory, if you add these checks it will fail:

      assert(results.size === 3)
      execs.foreach { exec =>
        verify(eam, times(1)).killExecutors(Seq(exec))
      } 

also to go along with my comment below about tying specific blocks to specific executors in FakeBMM, it would let you make more specific checks here, eg. bmme.replicated.keySet === Set("1", "2")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you've got the same block 4 times -- while that is possible with replication, probably not what you're trying to test here. Easiest to fix with

val blocks = (1 to 4).map(RDDBlockId(_, 1))

might as well do that in other places too

firstBlock <- if (blocks.isEmpty) None else Some(blocks.maxBy(_.rddId))
replicaSet <- blockLocations.asScala.get(firstBlock)
// Add 2 to force this block to be replicated to one new executor.
maxReps = replicaSet.size + 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out why you need +2 instead of +1. The existing code wants you to explicitly remove id of the blockManager you're trying to replicate from in replicaSet. See:

val candidateBMId = blockLocations(i)
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)

While the existing code is confusing, I definitely don't like using +2 here as a workaround, as it gets pretty confusing. I'd at least update the comments on BlockManager.replicate() etc., or maybe just change its behavior and update the callsites.

@squito
Copy link
Contributor

squito commented Apr 4, 2018

thanks for the updates @brad-kaiser. I think I understand and don't have any major concerns. It doesn't seem easy to use the LRU from MemoryStore, so can set that aside for now

btw as you push updates, I'd prefer to just add new commits on top (even merges to master), as that makes it easier for reviewers to see incremental changes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2018

Test build #88899 has finished for PR 19041 at commit 03ed8a2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ReplicateBlock(
  • case class RecoverLatestRDDBlock(executorId: String, excludingExecs: Seq[String])

@brad-kaiser
Copy link
Author

Thats great thanks @squito. I will start addressing these comments now.

val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
val replicationFuture = replicateUntilDone(execId)

Future.firstCompletedOf(List(timeoutFuture, replicationFuture)).andThen {
Copy link
Contributor

@squito squito Apr 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont' think this will do what you want. Try this code (if you do it in the scala repl, be sure to use -Yrepl-class-based : https://stackoverflow.com/a/23111645/1442961)

import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}

val scheduler = Executors.newSingleThreadScheduledExecutor()
val pool = Executors.newSingleThreadExecutor()
implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(pool)

def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = {
    val p = Promise[T]()
    val runnable = new Runnable {
      def run(): Unit = { println("time's up"); p.success(value) }
    }
    scheduler.schedule(runnable, seconds, TimeUnit.SECONDS)
    p.future
  }

def printStuff(x: Int): String = {
  (0 until x).foreach{ i => println(i); Thread.sleep(1000)}
  "done"
} 

// The *value* of the future is correct here, but you'll notice the timer keeps going, we still see "time's up".
// Not so bad in this case ...
Future.firstCompletedOf(Seq(Future(printStuff(2)), returnAfterTimeout("timeout", 5)))

// The final *value* of the future is correct again here -- we get the timeout.  But (a) you'll see that printStuff keeps
// running anyway and (b) the future isn't actually ready until printStuff completes, even though it should be ready
// as soon as the timer is up
Future.firstCompletedOf(Seq(Future(printStuff(10)), returnAfterTimeout("timeout", 1)))

// Slightly better, the TimeoutException is thrown as soon as the timer is up, but printStuff keeps running anyway
Await.result(Future(printStuff(10)), 1 second)

So I'd change this to use ThreadUtils.awaitResult and also you need your timer to set some condition which the recovery thread is checking, so it knows to stop trying to replicate more blocks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so my original thought was that whatever future finished first, the replication future or the timeout future, the other would complete harmlessly in the background. I didn't realize that the future returned by firstCompletedOf wouldn't complete until both were done. I will fix this. Thanks

@squito
Copy link
Contributor

squito commented Apr 5, 2018

hey @brad-kaiser lemme temper what I said in my previous comments a bit -- I understand what you're doing here now and I think it makes sense, i don't see any serious design issues. But this is adding something new to a pretty core area of spark, so expect some time still on reviews etc. I also think you should probably go through the SPIP process -- though its not huge, I think its better to increase visibility a bit: https://spark.apache.org/improvement-proposals.html

anyway still think this is looking good, but want to set expectations on what is left to do here.

@brad-kaiser
Copy link
Author

ok @squito , thanks for the heads up, I will start on the SPIP process.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@vanzin
Copy link
Contributor

vanzin commented Feb 7, 2019

I'm closing this for now since it seems to have lost steam. Updating the branch will re-open the PR.

@vanzin vanzin closed this Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants