-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned #27864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned #27864
Conversation
… an executor is decommissioned
cc - @cloud-fan @holdenk |
Jenkins ok to test |
Thank you for making a PR, @prakharjain09 . |
Test build #119623 has finished for PR 27864 at commit
|
@holdenk @cloud-fan This PR currently does not handle running tasks - What to do with tasks that are already running on these decommissioned executors? If we start BlockManager decommissioning and we stop accepting new cached RDD blocks, then these already running tasks might fail (In case these already running tasks try to cache something). One possible approach is: Kill these already running tasks after some configurable timeout. Executor decommissioning can be broken into two steps:
Firstly we can do Any thoughts/suggestions? |
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
// Maximum number of storage replication failure which replicateBlock can handle | ||
// before giving up for one block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We had better have this message at the configuration declaration part.
- We may need to revise the conf name to explicitly show this clause,
before giving up for one block
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Added message as part of description of conf.
- Renamed to "spark.storage.decommission.maxReplicationFailuresPerBlock". Any suggestions on some other name?
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Outdated
Show resolved
Hide resolved
Test build #119763 has finished for PR 27864 at commit
|
Test build #119774 has finished for PR 27864 at commit
|
@Dooyoung-Hwang Thanks for the review! I have addressed most of the review comments. @Dooyoung-Hwang @cloud-fan @holdenk Please review the changes and provide feedback. |
So maybe not in this PR, but rather than kill tasks which have the chance of succeeding what about if cached with replication level 2? The logic might get a bit funky so I think a separate follow up PR there would be better, but this way we get the best of both worlds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for working on this, I'm really you glad you picked it up. I have some questions about the design and some small places for improvement, but really excited.
private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = | ||
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") | ||
.doc("Maximum number of failures to tolerate for offloading " + | ||
"one block in single decommission cache blocks iteration") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could work on the wording here but not critical.
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Show resolved
Hide resolved
@@ -1281,6 +1285,9 @@ private[spark] class BlockManager( | |||
|
|||
require(blockId != null, "BlockId is null") | |||
require(level != null && level.isValid, "StorageLevel is null or invalid") | |||
if (blockManagerDecommissioning && blockId.isRDD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should reject the block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added overall design to explain why I went ahead with this - #27864 (comment) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not sure about this part of the design. I'd like to see a test to make sure this won't cause task failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Fixed this. Removed the code for not accepting new RDD cache blocks on Decommissioning executor. Also added a test to show that ongoing tasks which are going to cache data in future succeeds as part of BlockManagerDecommissionSuite.
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
if (!blockManagerDecommissioning) { | ||
logInfo("Starting block manager decommissioning process") | ||
blockManagerDecommissioning = true | ||
decommissionManager = Some(new BlockManagerDecommissionManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'm not sure why were doing this in this manner? It seems like if we did this as a blocking call and returned true when we were done we could move on to a next phase of decommissioning once the blocks were replicated in the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk I have added the overall design for this change. Please review the same and provide your suggestions/feedback.
*/ | ||
private class BlockManagerDecommissionManager { | ||
@volatile private var stopped = false | ||
private val cacheReplicationThread = new Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about the thread up above, I'm not sure why we need to be in a separate thread (but maybe I've missed something).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to explain why we have created a thread here - #27864 (comment). Basically this BlockManager will keep on retrying offloading of cache blocks every 30 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I guess we can just (later on) send a message back once all the blocks are done migrating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When all the blocks are done migrating - Driver will automatically know. BlockManagerMaster is continuously receiving BlockUpdates from this BlockManager (using the UpdateBlockInfo message). So when all blocks are offloaded - Dynamic allocation will atomatically know that this executor doesn't have cache data and If other criteria are met (like executorIdleTimeout etc), then the executor will automatically be removed by ExecutorMonitor class of DynamicAllocation.
Also, how would you feel about adding an integration test with K8s? We could use a local object which errors on recompute and verify that decommissioning and a second count doesn't trigger it. |
@holdenk Thanks for the review. My bad, I should have added the overall design initially itself. Current overall design:
Please review the same and provide your suggestions. |
Test build #121678 has finished for PR 27864 at commit
|
Jenkins retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is really good progress. I'd like to have a follow up JIRA to explore if there is a better way to pick the peers / parallelize block migration but I think that can be a follow on PR.
cc @dongjoon-hyun have your issues been addressed?
// so that we end up prioritize them over each other | ||
val blocksFailedReplication = replicateBlocksInfo.filterNot { | ||
case ReplicateBlock(blockId, existingReplicas, maxReplicas) => | ||
val replicatedSuccessfully = replicateBlock( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Network congestion is certainly a possibility, I think that for now this strike a good balance between simple code and avoiding hanging all transfers if we have one slow target host. We can revisit this in the future if it turns out we need more control in production environments. Sound good?
I'll take another look tomorrow with fresh eyes and make sure, I'm a little too sleepy to merge any code today. |
Test build #121700 has finished for PR 27864 at commit
|
Jenkins retest this please |
Test build #121772: Deflake Build #121700 has started for PR 27864 at commit |
I've filed SPARK-31555 for the follow up work we've been discussing in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Thank you all! |
Test build #121775 has finished for PR 27864 at commit
|
Test build #121772: Deflake Build #121700 has finished for PR 27864 at commit
|
This seems to break the master branch
|
The first two things are |
Ah damn it, I misread the K8s one as the regular Jenkins. It looks like it's the shutdown hook getting in the way, but I'll just revert this in master since it's Friday night and we can revisit the PR without the shutdown hook. |
+1 for reverting. Thank you so much! |
Thanks for catching it :) |
Reverted in #28337 |
Thank you! |
@dongjoon-hyun @holdenk Thanks a lot for following up on this. I see 4 test failures https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2-hive-2.3/551/ and all of them seems unrelated:
What possibly should be the next step? Is there a way to open this pullrequest again or should I open a new pullrequest where I will try fixing the tests?
@holdenk Can you please share more pointers on what might be causing the test failures? |
@prakharjain09 . Please create the same PR once more. Jenkins will validate your PR. |
In this PR, there is no evidence for the last commit to pass the all unit tests. |
What changes were proposed in this pull request?
After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.
Why are the changes needed?
We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.
Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.
Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.
Current overall Flow:
CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.
BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.
BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).
Does this PR introduce any user-facing change?
NO
How was this patch tested?
Added UTs.