-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[WIP][SPARK-20628][CORE] Blacklist nodes when they transition to DECOMMISSIONING state in YARN #19267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-20628][CORE] Blacklist nodes when they transition to DECOMMISSIONING state in YARN #19267
Conversation
…nges in Cluster Manager
…klisting and decommissioning blacklisting
@juanrh do you plan on working more on this before removing the "WIP"? Not sure what's your expectation here. People generally look over "WIP"s with so many other PRs to look at. |
Hi @vanzin, thanks for taking a look. This was part of a discussion with @holdenk about SPARK-20628. I have attached the document Spark_Blacklisting_on_decommissioning-Scope.pdf with our approach. The basic idea is implementing a mechanism similar to YARN's graceful decommission, but for Spark. In the PR we define a This code works on a modified version of Hadoop 2.7.3 with patches to support YARN-4676 (basic graceful decommission), and an approximation to YARN-3224 (when a node transitions into DECOMMISSIONING state the resource manager notifies that to each relevant application master by adding it to the list of updated nodes available in the AllocateResponse returned by the RM as a response to the AM heartbeat). For these reasons, this code won't work as-is on vanilla Hadoop. The main problem is that the decommissioning mechanism for YARN is not completely implemented (see YARN-914), and some of the parts that are implemented are only available for YARN 2.9.0 (see YARN-4676). To cope with this, we propose implementing an administrative command to send node transitions directly to the Spark driver, as We would like to get some feedback on this approach in general, and in the administrative command solution in particular. If that sounds good, then we will work on modifying this PR so it works on vanilla Hadoop 2.7, and to implement the administrative command. |
Hi @vanzin, do you have any comments on the design document attached above? Thanks |
Sorry I haven't been having time to look at anything around here lately. I recommend you make some noise in the mailing list or ping other people if you want to get attention to this. |
just reading through your description here all the yarn pieces aren't in place so you have an admin type command to signal spark that a node is being decommissioned. But that means someone has to run that command on every single spark application running on that cluster, correct? That doesn't seem very feasible on any relatively large cluster. One big question I have is, are there enough use cases where just the command is useful? Otherwise we are temporarily adding a command that we will have to keep supporting forever (or perhaps only the next major release). Or does it make sense to wait for YARN (or another resource manager) to have full support for this? |
Hi Tom, thanks for your answer. Regarding use cases for the Spark admin command, I think it would be a good fit for cloud environments, where single job clusters are common, because creating and destroying clusters is easy. Also, to cover the case for clusters with many applications, we could add some option to the admin command to decommission the specified nodes for all Spark applications running in the cluster, and implement that using features of the configured cluster manager to discover Spark applications. For example for Yarn we could use It would be nice not having to support a temporary command, but an advantage of doing so is that it allows to get this feature in Spark without having to wait for it to be available in the cluster manager. Also, for Spark Standalone mode I don't see any option than having an admin command, in the line of existing commands like |
Hi @vanzin and @tgravescs, do you have any other comments on this proposal? Thanks, Juan |
for the admin type command we would need to have an RPC between the client command and the driver. There is a jira for that already (https://issues.apache.org/jira/browse/SPARK-21737) but its not done yet. I can see the admin command useful as it could be used in any cluster - yarn, mesos, standalone. It also doesn't appear the yarn support will be done any time soon unless we push on it so the only option would be that admin type command. So overall I'm fine with that approach it just needs to have more details about how that would work. The admin command on yarn wouldn't necessarily need to look at all apps. If you are decommissioning a single NM it could go to the rest api and get the list of apps running on it and just go to those. I'm curious, Are you running into a particular case of this and if so what env? |
Hi @tgravescs, thanks again for your feedback. Regarding concrete uses cases, this change might be used extend the existing graceful decommission mechanism available in AWS EMR from a while ago. That mechanism was originally designed for MapReduce with a 1 to 1 correspondence between tasks and YARN containers, so it has to be adapted to Spark where an executor running in a single YARN containers is able to run many Spark tasks. This could be useful to react to cluster resizes or SPOT instance terminations that happen in the middle of a Spark job execution. As described in the attached document, this PR sets the base framework for graceful decommission on Spark, and performs the first action to react to node decommission, but additional actions could be added in future PRs to react to other transitions, like for example unregistering shuffle blocks when a node transitions to DECOMMISSIONED state. |
@tgravescs I have opened apache/hadoop#289 to the YARN changes to get a notification in the AM when a nodes transitions to DECOMMISSIONING. This should be already useful for this change, but I'll also take a look to https://issues.apache.org/jira/browse/SPARK-21737 so all cluster managers could be supported |
@tgravescs I was finally able to contribute apache/hadoop#289 which solves YARN-6483. With that patch, and the code in this pull request, in But for now YARN-6483 has only been accepted for Hadoop 3.1.0, so I'll work on SPARK-21737 to have an alternative solution that doesn't rely on the cluster manager |
Can one of the admins verify this patch? |
I'm going to close this for now since, based on the last few comments, it feels like this needs at least some re-work. Also because this is attached to a bug that has another open PR which is being actively worked on. If reopening I suggest looking at that PR and opening a new bug if they're not really overlapping. |
What changes were proposed in this pull request?
Dynamic cluster configurations where cluster nodes are added and removed frequently are common in public cloud environments, so this has become a problem for Spark users. To cope with this we propose implementing a mechanism in the line of YARN’s support for graceful node decommission or Mesos maintenance primitives. These changes allow cluster nodes to be transitioned to a ‘decommissioning’ state, at which point no more tasks will be scheduled on the executors running on those nodes. After a configurable drain time, nodes in the ‘decommissioning’ state will transition to a ‘decommissioned’ state, where shuffle blocks are not available anymore. Shuffle blocks stored on nodes in the ‘decommissioning’ state are available to other executors. By preventing more tasks from running on nodes in the ‘decommissioning’ state we avoid creating more shuffle blocks on those nodes, as those blocks won’t be available when nodes eventually transition to the ‘decommissioned’ state.
We have implemented a first version of this proposal for YARN, using Spark’s blacklisting mechanism for task scheduling ─available at the node level since Spark 2.2.0─ to ensure tasks are not scheduled on nodes in the ‘decommissioning’ state. With this solution it is the cluster manager, not the Spark application, that tracks the status of the node, and handles the transition from ‘decommissioning’ to ‘decommissioned’. The Spark driver simply reacts to the node state transitions.
How was this patch tested?
All functionality has been tested with unit tests, with an integration test based on the BaseYarnClusterSuite, and with manual testing on a cluster