Skip to content

[SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned #27864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

prakharjain09
Copy link
Contributor

@prakharjain09 prakharjain09 commented Mar 10, 2020

What changes were proposed in this pull request?

After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

Why are the changes needed?

We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

  1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

  2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

  3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Added UTs.

@prakharjain09
Copy link
Contributor Author

cc - @cloud-fan @holdenk

@holdenk
Copy link
Contributor

holdenk commented Mar 10, 2020

Jenkins ok to test

@dongjoon-hyun
Copy link
Member

Thank you for making a PR, @prakharjain09 .

@SparkQA
Copy link

SparkQA commented Mar 10, 2020

Test build #119623 has finished for PR 27864 at commit 6a47615.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09
Copy link
Contributor Author

prakharjain09 commented Mar 13, 2020

@holdenk @cloud-fan This PR currently does not handle running tasks - What to do with tasks that are already running on these decommissioned executors? If we start BlockManager decommissioning and we stop accepting new cached RDD blocks, then these already running tasks might fail (In case these already running tasks try to cache something).

One possible approach is: Kill these already running tasks after some configurable timeout.

Executor decommissioning can be broken into two steps:

  1. Compute decommissioning: Stop assigning new tasks, Kill existing running tasks - maybe after some timeout
  2. Storage decommissioning: BlockManager stops taking new cache blocks, offloads old cache blocks

Firstly we can do Compute decommissioning i.e. Stop assigning new tasks (already done as part of SPARK-20628), kill running tasks after some timeout (TODO)
Then do Storage decommissioning i.e. Stop accepting new RDD cache blocks (done as part of this PR), Move existing RDD cache blocks to other possible locations (done as part of this PR), Maybe drop the blocks if executor is not able to offload after some timeout - say after 5-10 cycles/retries.

Any thoughts/suggestions?

}

// Maximum number of storage replication failure which replicateBlock can handle
// before giving up for one block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We had better have this message at the configuration declaration part.
  2. We may need to revise the conf name to explicitly show this clause, before giving up for one block.

Copy link
Contributor Author

@prakharjain09 prakharjain09 Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun

  1. Added message as part of description of conf.
  2. Renamed to "spark.storage.decommission.maxReplicationFailuresPerBlock". Any suggestions on some other name?

@SparkQA
Copy link

SparkQA commented Mar 13, 2020

Test build #119763 has finished for PR 27864 at commit 622e1ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 14, 2020

Test build #119774 has finished for PR 27864 at commit d792092.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@prakharjain09
Copy link
Contributor Author

prakharjain09 commented Mar 20, 2020

@Dooyoung-Hwang Thanks for the review! I have addressed most of the review comments.

@Dooyoung-Hwang @cloud-fan @holdenk Please review the changes and provide feedback.

@holdenk
Copy link
Contributor

holdenk commented Mar 26, 2020

So maybe not in this PR, but rather than kill tasks which have the chance of succeeding what about if cached with replication level 2? The logic might get a bit funky so I think a separate follow up PR there would be better, but this way we get the best of both worlds.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for working on this, I'm really you glad you picked it up. I have some questions about the design and some small places for improvement, but really excited.

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.doc("Maximum number of failures to tolerate for offloading " +
"one block in single decommission cache blocks iteration")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could work on the wording here but not critical.

@@ -1281,6 +1285,9 @@ private[spark] class BlockManager(

require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
if (blockManagerDecommissioning && blockId.isRDD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should reject the block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added overall design to explain why I went ahead with this - #27864 (comment) .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not sure about this part of the design. I'd like to see a test to make sure this won't cause task failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Fixed this. Removed the code for not accepting new RDD cache blocks on Decommissioning executor. Also added a test to show that ongoing tasks which are going to cache data in future succeeds as part of BlockManagerDecommissionSuite.

if (!blockManagerDecommissioning) {
logInfo("Starting block manager decommissioning process")
blockManagerDecommissioning = true
decommissionManager = Some(new BlockManagerDecommissionManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm not sure why were doing this in this manner? It seems like if we did this as a blocking call and returned true when we were done we could move on to a next phase of decommissioning once the blocks were replicated in the driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk I have added the overall design for this change. Please review the same and provide your suggestions/feedback.

*/
private class BlockManagerDecommissionManager {
@volatile private var stopped = false
private val cacheReplicationThread = new Thread {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about the thread up above, I'm not sure why we need to be in a separate thread (but maybe I've missed something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to explain why we have created a thread here - #27864 (comment). Basically this BlockManager will keep on retrying offloading of cache blocks every 30 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I guess we can just (later on) send a message back once all the blocks are done migrating.

Copy link
Contributor Author

@prakharjain09 prakharjain09 Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all the blocks are done migrating - Driver will automatically know. BlockManagerMaster is continuously receiving BlockUpdates from this BlockManager (using the UpdateBlockInfo message). So when all blocks are offloaded - Dynamic allocation will atomatically know that this executor doesn't have cache data and If other criteria are met (like executorIdleTimeout etc), then the executor will automatically be removed by ExecutorMonitor class of DynamicAllocation.

@holdenk
Copy link
Contributor

holdenk commented Mar 27, 2020

Also, how would you feel about adding an integration test with K8s? We could use a local object which errors on recompute and verify that decommissioning and a second count doesn't trigger it.

@prakharjain09
Copy link
Contributor Author

prakharjain09 commented Mar 29, 2020

Thank you so much for working on this, I'm really you glad you picked it up. I have some questions about the design and some small places for improvement, but really excited.

@holdenk Thanks for the review. My bad, I should have added the overall design initially itself.

Current overall design:

  1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

  2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this,

  • it updates the "decommissioningBlockManagerSet". This set contains all BMs which are undergoing decommissioning. This set is maintained to giver the correct peerList to the active BMs. Active BMs keep on asking BlockManagerMasterEndpoint for peer list. We should only give active BM's as part of peer list.

  • it sends a message to the corresponding BlockManager to start the decommissioning process.

  1. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will take following 2 actions on receiving the message -
  • The BM-x will stop accepting new RDD Cache blocks. Since this block manager is in decommissioning state - so it shouldn't store any new cache data (this is similar to DataNode decommissioning in HDFS). If it accepts new blocks - then those blocks also needs to be offloaded.
  • BM-x will start BlockManagerDecommissionManager thread. This thread will try to offload any cache blocks on this BM every 30 seconds (or configured time). In the first attempt - it might be possible that all cache blocks are not offloaded because of limited space on other BMs, so it can retry offloading after 30 seconds and so on... The available cache space in the application can keep on changing, because of dynamic allocation/or user can explicitly uncache some other RDD. So we can do multiple attempts to offload RDD cache blocks from BM-x. Steps performed in single attempt:
    a) Ask for replication info of all the cache blocks on BM-x with BlockManagerMasterEndpoint. This will let BM-x know which block is already replicated at what all places - So that we can avoid replicating at those places.
    b) For each block - try to replicate it to peers. Drop the block if block is successfully replicated to one of the peer, else keep the block. If the block is replicated and dropped from BM-x, the same will be automatically communicated to the driver and dynamic allocation (ExecutorMonitor class) will update its bookkeeping and remove the executor from the system.

Please review the same and provide your suggestions.

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121678 has finished for PR 27864 at commit bb324f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Apr 23, 2020

Jenkins retest this please

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is really good progress. I'd like to have a follow up JIRA to explore if there is a better way to pick the peers / parallelize block migration but I think that can be a follow on PR.
cc @dongjoon-hyun have your issues been addressed?

// so that we end up prioritize them over each other
val blocksFailedReplication = replicateBlocksInfo.filterNot {
case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
val replicatedSuccessfully = replicateBlock(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Network congestion is certainly a possibility, I think that for now this strike a good balance between simple code and avoiding hanging all transfers if we have one slow target host. We can revisit this in the future if it turns out we need more control in production environments. Sound good?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 23, 2020

Yes, @holdenk . Please feel free to proceed to merge if you think this is good to you.
cc @dbtsai

@holdenk
Copy link
Contributor

holdenk commented Apr 23, 2020

I'll take another look tomorrow with fresh eyes and make sure, I'm a little too sleepy to merge any code today.

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121700 has finished for PR 27864 at commit bb324f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Apr 24, 2020

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121772: Deflake Build #121700 has started for PR 27864 at commit bb324f9.

@holdenk
Copy link
Contributor

holdenk commented Apr 24, 2020

I've filed SPARK-31555 for the follow up work we've been discussing in this PR.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@asfgit asfgit closed this in 249b214 Apr 24, 2020
@dongjoon-hyun
Copy link
Member

Thank you all!

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121775 has finished for PR 27864 at commit bb324f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 24, 2020

Test build #121772: Deflake Build #121700 has finished for PR 27864 at commit bb324f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @holdenk , does this pass the Jenkins? I cannot find a successful run for the last commit, bb324f9 .

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 25, 2020

@holdenk
Copy link
Contributor

holdenk commented Apr 25, 2020

There was I think unless I misread the Jenkins bot's comments:
image

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 25, 2020

The first two things are start signal. The last one is K8s integration test Jenkins job which doesn't run UT.

@holdenk
Copy link
Contributor

holdenk commented Apr 25, 2020

Ah damn it, I misread the K8s one as the regular Jenkins. It looks like it's the shutdown hook getting in the way, but I'll just revert this in master since it's Friday night and we can revisit the PR without the shutdown hook.

@dongjoon-hyun
Copy link
Member

+1 for reverting. Thank you so much!

@holdenk
Copy link
Contributor

holdenk commented Apr 25, 2020

Thanks for catching it :)

@holdenk
Copy link
Contributor

holdenk commented Apr 25, 2020

Reverted in #28337

@dongjoon-hyun
Copy link
Member

Thank you!

@prakharjain09
Copy link
Contributor Author

prakharjain09 commented Apr 25, 2020

@dongjoon-hyun @holdenk Thanks a lot for following up on this.

I see 4 test failures https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2-hive-2.3/551/ and all of them seems unrelated:

Test Result (4 failures / +4)
org.apache.spark.sql.hive.thriftserver.CliSuite.path command
org.apache.spark.sql.hive.thriftserver.CliSuite.Pad Decimal numbers with trailing zeros to the scale of the column
org.apache.spark.sql.hive.thriftserver.CliSuite.SPARK-30049 Should not complain for quotes in commented lines
org.apache.spark.sql.hive.thriftserver.CliSuite.SPARK-30049 Should not complain for quotes in commented with multi-lines

What possibly should be the next step? Is there a way to open this pullrequest again or should I open a new pullrequest where I will try fixing the tests?

It looks like it's the shutdown hook getting in the way

@holdenk Can you please share more pointers on what might be causing the test failures?

@dongjoon-hyun
Copy link
Member

@prakharjain09 . Please create the same PR once more. Jenkins will validate your PR.

@dongjoon-hyun
Copy link
Member

In this PR, there is no evidence for the last commit to pass the all unit tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants