-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21097][CORE] Add option to recover cached data #19041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Welcome to the Apache Spark project @brad-kaiser, GitHub tells me this is your first time contributing code :) While this is a bit outside of my area of review as we've discussed I've been doing some similar WIP work in the area along with some others so I'm hoping we can all find a way to collaborate. In the meantime I can maybe help out with Jenkins ok to test. |
566e93e to
0ab870c
Compare
|
Hi @andrewor14 @vanzin @tdas , would you mind taking a look at this change? Thanks! |
0ab870c to
e187c6f
Compare
|
It might take some time for me to find time to review this, but in the meantime others who might be interested: @cloud-fan @JoshRosen |
|
Thanks vanzin. |
|
I have been thinking about a different but related use case; supporting the ability to operate in dynamic allocation mode without requiring a separate shuffle service. The motivation is to make it more frictionless for a spark driver to operate using D.A. in a container platform (such as k8s), where standing up the shuffle service adds an extra step that may require cluster admin intervention. Some of this logic seems to overlap - putting executors into a "draining" state, and attempting to reduce data loss from executor scale-down so that the application doesn't start to thrash. |
|
Hi @erikerlandson , Thanks for taking a look. Yeah I definitely think the logic here is similar to your use case. I am specifically only replicating blocks cached in memory, but there's no reason you couldn't also do blocks cached to disk. If you think you can use it for spark on kubernetes that would be awesome. |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a first round of comments, but I need at least another go to fully understand this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a config constant in core/src/main/scala/org/apache/spark/internal/config/package.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: math.max is more Scala-y.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation is off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this class is very cryptic. After a lot of mental gymnastics it sounds like an action, which is a bad name for a class.
Perhaps CacheRecoveryManager?
Also seems like the constructor should be private too? (class RecoverCacheShutdown private (...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that better, renamed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use the constructor in the unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you import the types instead? Also the code after the => would look better in the next line, given that's how the other cases do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was an issue with scala.util.Success getting shadowed by the success object in TaskEndReason.scala, but that's fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blacklist might be a better name, but this is fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left it as excluding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird for an RPC (even if in this case it's actually a local call) to return a Future. Why not pass the context to replicateOneBlock and have it reply with the correct value when it's done?
That would simplify the code for the sender of this message, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exclude should at least be consistent with the name of the corresponding argument in ReplicateBlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this comment makes the whole comment block a little bit confusing. (As in, which one is correct?)
If you want to document the excluding parameter you can create a scaladoc comment instead. Mentioning "graceful shutdown" here doesn't really add a lot of information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed both comments because the old one isn't accurate anymore.
docs/configuration.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at other blocks above, your indentation is off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
Thanks for taking a look @vanzin. I'll address these points. |
ce32a8c to
7ee8d5a
Compare
|
ok to test |
|
Test build #82160 has finished for PR 19041 at commit
|
|
Test build #82389 has finished for PR 19041 at commit
|
59d593f to
95d5eb5
Compare
|
Hi @vanzin, I have addressed all of your comments. If there's anything else I can do please let me know. Thanks for your help. |
|
Test build #82427 has finished for PR 19041 at commit
|
|
Test build #82450 has finished for PR 19041 at commit
|
a3af4bd to
985874d
Compare
|
Test build #82478 has finished for PR 19041 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a bunch of comments - most of them stylistic. Still have to go through tests and another look at the block manager changes...
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| /** | ||
| * Responsible for asynchronously replicating all of an executors cached blocks, and then shutting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| conf: SparkConf) | ||
| extends Logging { | ||
|
|
||
| private val threadPool = ThreadUtils.newDaemonCachedThreadPool("recover-cache-shutdown-pool") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as before with the weird name. "cache-recovery-manager-pool" is a much better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| * | ||
| * @return | ||
| */ | ||
| def stop(): java.util.List[Runnable] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You never use the return value, why not Unit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| * | ||
| * @param execIds the executors to check | ||
| */ | ||
| private def checkForReplicableBlocks(execIds: Seq[String]) = state.getBlocks(execIds).foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: Unit =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is gone
| } | ||
|
|
||
| /** | ||
| * Get list of cached blocks from BlockManagerMaster. If there are cached blocks, replicate them, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method doesn't return the list, so its short description should more accurately reflect what it actually does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is gone now.
|
|
||
| override def markPendingToRemove(executorIds: Seq[String]): Unit = synchronized { | ||
| logDebug(s"marking $executorIds pending to remove") | ||
| executorIds.foreach(id => executorsPendingToRemove.put(id, true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.foreach { id => ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| case GetStorageStatus => | ||
| context.reply(storageStatus) | ||
|
|
||
| case GetCachedBlocks(executorId) => context.reply(getCachedBlocks(executorId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code in next line to follow pattern in other cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is removed
| info <- blockManagerInfo.get(blockManagerId) | ||
| replicaSet <- blockLocations.asScala.get(blockId) | ||
| replicas = replicaSet.toSeq | ||
| maxReps = replicaSet.size + 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why + 2? Good to add comment explaining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented
| response.getOrElse(Future.successful(false)).foreach(context.reply) | ||
| } | ||
|
|
||
| private def getCachedBlocks(executorId: String): collection.Set[BlockId] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just Set[BlockId]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| cachedBlocks.getOrElse(Set.empty) | ||
| } | ||
|
|
||
| private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]): Map[String, Long] = for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the body in { } - looks kinda odd without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
|
Thanks @vanzin I will work on these comments. |
985874d to
fc84aa7
Compare
|
Test build #84489 has finished for PR 19041 at commit
|
|
Can you fix the javadoc issue? |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass through the tests and noticed you're using Thread.sleep heavily. That's a very common source of test flakiness when the machines running tests are loaded, so please rework the code to be more resilient to timing issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very minor, but because you don't have the full picture here, it could happen that some of the blocks in the "chosen" executors cannot be replicated (e.g. because a block is too large for any of the remaining executors to hold), and you could potentially replicate blocks from these "loser" executors in that case.
Ok not to handle this, but it'd be nice to add a comment (for a future enhancement, maybe).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conversely, you might decide to not even try to replicate some executors, because all of their data can't be replicated, but in fact some of the blocks on the executor could be replicated (maybe even the most useful ones).
given the limitations & complications of this method, I'm wondering whether its even worth it to do this filtering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I was really concerned with not hurting the stability of the cluster. So in the case where we are running out of cluster memory, I thought it would be safer to fail fast and give up on replication.
Your right it would probably be more useful to get as many complete RDDs recovered as possible, and this method is complicated. I can test taking out this check and see what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this more readable:
canBeRecovered.foreach { execId =>
replicateUntilDone(execId, startKillTimer(execId))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats much better! fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can go in previous import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: TrySuccess (more verbose but less cryptic).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call this only if the timer was successfully stopped? (Same above.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use config constants where possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
Thanks @vanzin I fixed the javadoc bug and I will address these issues. I spent some time investigating an issue that turned out to be SPARK-22618. In the process I rewrote a lot of CacheRecoveryManager so it is much simpler. Now I just removed blocks off of the dying executor and I don't have to keep track of block state in the CacheRecoveryManager itself. |
|
@brad-kaiser have you had time to look at Imran's feedback? Your patch also has conflicts now... |
|
Thanks @brad-kaiser -- want to re-iterate my comment from Feb 2nd, I think that is really the most important part to address before getting into the details of the current implementation:
|
|
Hi @squito, The back and forth communication between CacheRecoveryManager and the BlockManagerMasterEndpoint is so that we always have an up to date view of what executors are undergoing cache recovery and we don't replicate blocks to those executors. If you look at recoverLatestBlock, we include the contents of the recoveringExecutors cache. We could conceivably move that cache into the block manager master endpoint, but I think that would end up being messier. I wanted to keep all the cache recovery code localized and not clutter up Block Manager Master Endpoint. CacheRecoveryManager and BlockManagerMaster Endpoint will also be local to the same process so rpc calls between them should be cheap, especially compared to the time it will take to copy blocks around. I will look into the race between removing the block and replicating the next block. Thanks |
|
I updated BlockManagerMasterEndpoint.recoverLatestRDDBlock so that we proactively remove the block from blockManagerInfo when we ask the slave to remove the block. Thanks for finding this! |
|
Test build #88471 has finished for PR 19041 at commit
|
|
Test build #88550 has finished for PR 19041 at commit
|
|
Test build #88588 has finished for PR 19041 at commit
|
|
I've added a line in ExecutorAllocationManager.validateSettings to ensure that the cached executor timeout is set if cache recovery is enabled. I imagine most people would want to set the regular executor timeout to be a little lower than the cached executor timeout when using cache recovery. |
|
Test build #88603 has finished for PR 19041 at commit
|
|
Test build #88634 has finished for PR 19041 at commit
|
|
Test build #88635 has finished for PR 19041 at commit
|
|
Test build #88673 has finished for PR 19041 at commit
|
9e8e68c to
c8f7ad0
Compare
|
Test build #88716 has finished for PR 19041 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paging back in still ... will finish review tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than the latest rdd, it would actually make more sense to take advantage of the LRU already in the MemoryStore: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L91
but maybe that is not easy to expose.
But I think that also means that the receiving end will put the replicated block at the back of that LinkedHashMap, even though it really hasn't been accessed at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't you immediately kill those executors which dont' pass checkMem? are they getting killed somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return type in doc is wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the reason you do this one block at a time is because you update recoveringExecutors between each call, right? To help avoid replicating to another executor which will start draining its blocks after this starts replicating the first block, but before it starts replicating later blocks?
if so, that explanation should go in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find recoveringExecutors pretty confusing, I think its executors that are recovering from some problem, but are going to be OK -- not executors that are about to die, which we are recovering data from. how about drainingExecutors? (though I have a feeling this name may have been discussed in earlier rounds of comments and this is what we settled on ... if so, thats fine.)
…llocation [SPARK-21097][CORE] cleaned up logging and tests [SPARK-21097][CORE] added config constant and fixed some formatting issues [SPARK-21097][CORE] renamed RecoverCacheShutdown to CacheRecoveryManager and made some small fixes [SPARK-21097][CORE] cleaned up some style issues [SPARK-21097][CORE] simplified memory checking code [SPARK-21097][CORE] removed executorsToKill map I added to CoarseGrainedSchedulerBackend [SPARK-21097][CORE] updated replicateOneBlock to avoid weird nested Future. [SPARK-21097][CORE] Fix unit tests broken by change to ExecutorAllocationClient.killExecutors. Remove printlns from RecoverCachedDataSuite [SPARK-21097][CORE] Simplify CacheRecoveryManager and style cleanup [SPARK-21097][CORE] fixed javadoc and addressed styling issue [SPARK-21097][CORE] style fixes [SPARK-21097][CORE] merge ReplicateLatestRDDBlock and RemoveBlockFromExecutor into RecoverLatestRDDBlock [SPARK-21097][CORE] make .startCacheRecovery return a future. Remove Thread.sleeps from CacheRecoveryManagementSuite [SPARK-21097][CORE] rename RecoverCachedDataSuite to CacheRecoveryIntegrationSuite [SPARK-21097][CORE] fix bug w recoveringExecutors in CacheRecoveryManager Rename RecoverCachedDataSuite to CacheRecoveryIntegrationSuite Fix style issues in CacheRecoveryIntegrationSuite [SPARK-21097][CORE] Don't make CacheRecoveryManager final, fix some comments and return types [SPARK-21097][CORE] fix indentation and comment [SPARK-21097][CORE] fix test names [SPARK-21097][CORE] move cleanup code in test to finally block [SPARK-21097][CORE] give test more time to finish and simplify FakeBMM [SPARK-21097][CORE] address race condition in BlockManagerMasterEndpoint.recoverLatestRDDBlock by manually removing block from BlockManagerInfo before we ask the slave to remove it. [SPARK-21097][CORE] refactor CacheRecoveryManager.startCacheRecovery to return a more sensible Future that returns whether the executors were killed becuase they were done recovering or because we timed out. [SPARK-21097][CORE] Only start one shuffle service for all tests. Add comment explaining test. Fix some spacing issues. [SPARK-21097] update ExecutorAllocationManager.validateSettings to prevent users from enabling cache recovery without setting spark.dynamicAllocation.cachedExecutorIdleTimeout. Update comments to be more clear. [SPARK-21097][CORE] make CacheRecoveryManager.checkMem asynchronous [SPARK-21097][CORE] fix comment and cleanup [SPARK-21097][CORE] do not replicate disk only blocks
c8f7ad0 to
03ed8a2
Compare
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still disturbed by the +2 replicas, but lemme post my review so far as I look into it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know there is a mix in the existing code, but we prefer to use asserts rather than "should" matchers now -- there have been some cases where a small syntax mistake makes should matchers do nothing, though code still compiles and looks ok. and I think its less familiar to most people
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't actually doing what the test name is, right? The eventually above this will wait until the executors have been killed, not just until they have started moving their cache blocks elsewhere. And why would rdd2 only get created on one exec? I think dynamic allocation will try to spun up 4 executors again, the tasks are just so short they happen to finish on one.
I don't think you're going to be able to test exactly what you want at this level of the api, though -- this probably isn't doing anything more than the "cached data is replicated before dynamic de-allocation" test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though there are 4 tasks, they could run on fewer executors before dynamic allocation spins them all up. This could become a flaky tests. I think you should change the waitUntilExecutorsUp to be 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can probably just use this one test, I don't see much value in splitting it into more tests, and you can even rename this one to "cached data is replicated before dynamic de-allocation".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're not actually removing executors here immediately with the new cache recovery path, you should update the doc to describe that too.
Also we only mention the return type if we've got something interesting to say about it, so you can just skip mentioning it entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any meaningful difference between this and "replicate blocks until empty and then kill executor". This isn't actually checking the timer is canceled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't totally understand what you're simulating here with the FakeBMM. Which exec is each block stored on? If I understand correctly, you're not trying to tie each block to a particular exec, and just saying whatever the next exec we get, say the next block is on that. If so, I'd at least have a comment indicating that. Though I actually think this would be easier to follow if you tied blocks to specific executors, otherwise when there is a failure its hard to follow exactly what is going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand the meaning of this variable at all on my first read, I had assumed it was blockId -> replicationCount, and was really confused by that. Perhaps renaming to executorToDrainedBlockCount?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to go along with my comment about never killing executors that fail the check for having enough memory, if you add these checks it will fail:
assert(results.size === 3)
execs.foreach { exec =>
verify(eam, times(1)).killExecutors(Seq(exec))
}
also to go along with my comment below about tying specific blocks to specific executors in FakeBMM, it would let you make more specific checks here, eg. bmme.replicated.keySet === Set("1", "2")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you've got the same block 4 times -- while that is possible with replication, probably not what you're trying to test here. Easiest to fix with
val blocks = (1 to 4).map(RDDBlockId(_, 1))might as well do that in other places too
| firstBlock <- if (blocks.isEmpty) None else Some(blocks.maxBy(_.rddId)) | ||
| replicaSet <- blockLocations.asScala.get(firstBlock) | ||
| // Add 2 to force this block to be replicated to one new executor. | ||
| maxReps = replicaSet.size + 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured out why you need +2 instead of +1. The existing code wants you to explicitly remove id of the blockManager you're trying to replicate from in replicaSet. See:
spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Lines 236 to 239 in cccaaa1
| val candidateBMId = blockLocations(i) | |
| blockManagerInfo.get(candidateBMId).foreach { bm => | |
| val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) | |
| val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) |
While the existing code is confusing, I definitely don't like using +2 here as a workaround, as it gets pretty confusing. I'd at least update the comments on BlockManager.replicate() etc., or maybe just change its behavior and update the callsites.
|
thanks for the updates @brad-kaiser. I think I understand and don't have any major concerns. It doesn't seem easy to use the LRU from MemoryStore, so can set that aside for now btw as you push updates, I'd prefer to just add new commits on top (even merges to master), as that makes it easier for reviewers to see incremental changes. |
|
Test build #88899 has finished for PR 19041 at commit
|
|
Thats great thanks @squito. I will start addressing these comments now. |
| val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS) | ||
| val replicationFuture = replicateUntilDone(execId) | ||
|
|
||
| Future.firstCompletedOf(List(timeoutFuture, replicationFuture)).andThen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont' think this will do what you want. Try this code (if you do it in the scala repl, be sure to use -Yrepl-class-based : https://stackoverflow.com/a/23111645/1442961)
import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
val scheduler = Executors.newSingleThreadScheduledExecutor()
val pool = Executors.newSingleThreadExecutor()
implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(pool)
def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = {
val p = Promise[T]()
val runnable = new Runnable {
def run(): Unit = { println("time's up"); p.success(value) }
}
scheduler.schedule(runnable, seconds, TimeUnit.SECONDS)
p.future
}
def printStuff(x: Int): String = {
(0 until x).foreach{ i => println(i); Thread.sleep(1000)}
"done"
}
// The *value* of the future is correct here, but you'll notice the timer keeps going, we still see "time's up".
// Not so bad in this case ...
Future.firstCompletedOf(Seq(Future(printStuff(2)), returnAfterTimeout("timeout", 5)))
// The final *value* of the future is correct again here -- we get the timeout. But (a) you'll see that printStuff keeps
// running anyway and (b) the future isn't actually ready until printStuff completes, even though it should be ready
// as soon as the timer is up
Future.firstCompletedOf(Seq(Future(printStuff(10)), returnAfterTimeout("timeout", 1)))
// Slightly better, the TimeoutException is thrown as soon as the timer is up, but printStuff keeps running anyway
Await.result(Future(printStuff(10)), 1 second)So I'd change this to use ThreadUtils.awaitResult and also you need your timer to set some condition which the recovery thread is checking, so it knows to stop trying to replicate more blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so my original thought was that whatever future finished first, the replication future or the timeout future, the other would complete harmlessly in the background. I didn't realize that the future returned by firstCompletedOf wouldn't complete until both were done. I will fix this. Thanks
|
hey @brad-kaiser lemme temper what I said in my previous comments a bit -- I understand what you're doing here now and I think it makes sense, i don't see any serious design issues. But this is adding something new to a pretty core area of spark, so expect some time still on reviews etc. I also think you should probably go through the SPIP process -- though its not huge, I think its better to increase visibility a bit: https://spark.apache.org/improvement-proposals.html anyway still think this is looking good, but want to set expectations on what is left to do here. |
|
ok @squito , thanks for the heads up, I will start on the SPIP process. |
|
Can one of the admins verify this patch? |
|
I'm closing this for now since it seems to have lost steam. Updating the branch will re-open the PR. |
What changes were proposed in this pull request?
Add an option to to recover cached data when using dynamic allocation. Currently when dynamic allocation decommissions an idle executor, any cached RDD blocks on that executor will be lost. Saving that data has the potential for a real performance boost for shared instances of spark.
If there are any problems saving cached data, this change falls back to regular dynamic allocation behavior and drops the blocks.
How was this patch tested?
There are unit tests in RecoverCacheShutdownSuite and integration tests in RecoverCachedDataSuite. There are also benchmark results available here: https://docs.google.com/document/d/1E6_rhAAJB8Ww0n52-LYcFTO1zhJBWgfIXzNjLi29730/edit?usp=sharing