Skip to content

[WIP][SPARK-20628][CORE] Blacklist nodes when they transition to DECOMMISSIONING state in YARN #19267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

juanrh
Copy link

@juanrh juanrh commented Sep 18, 2017

What changes were proposed in this pull request?

Dynamic cluster configurations where cluster nodes are added and removed frequently are common in public cloud environments, so this has become a problem for Spark users. To cope with this we propose implementing a mechanism in the line of YARN’s support for graceful node decommission or Mesos maintenance primitives. These changes allow cluster nodes to be transitioned to a ‘decommissioning’ state, at which point no more tasks will be scheduled on the executors running on those nodes. After a configurable drain time, nodes in the ‘decommissioning’ state will transition to a ‘decommissioned’ state, where shuffle blocks are not available anymore. Shuffle blocks stored on nodes in the ‘decommissioning’ state are available to other executors. By preventing more tasks from running on nodes in the ‘decommissioning’ state we avoid creating more shuffle blocks on those nodes, as those blocks won’t be available when nodes eventually transition to the ‘decommissioned’ state.

We have implemented a first version of this proposal for YARN, using Spark’s blacklisting mechanism for task scheduling ─available at the node level since Spark 2.2.0─ to ensure tasks are not scheduled on nodes in the ‘decommissioning’ state. With this solution it is the cluster manager, not the Spark application, that tracks the status of the node, and handles the transition from ‘decommissioning’ to ‘decommissioned’. The Spark driver simply reacts to the node state transitions.

How was this patch tested?

All functionality has been tested with unit tests, with an integration test based on the BaseYarnClusterSuite, and with manual testing on a cluster

@vanzin
Copy link
Contributor

vanzin commented Sep 28, 2017

@juanrh do you plan on working more on this before removing the "WIP"? Not sure what's your expectation here. People generally look over "WIP"s with so many other PRs to look at.

@juanrh
Copy link
Author

juanrh commented Oct 2, 2017

Hi @vanzin, thanks for taking a look.

This was part of a discussion with @holdenk about SPARK-20628. I have attached the document Spark_Blacklisting_on_decommissioning-Scope.pdf with our approach. The basic idea is implementing a mechanism similar to YARN's graceful decommission, but for Spark. In the PR we define a HostState type to represent the state of the cluster nodes, and take actions in CoarseGrainedSchedulerBackend.handleUpdatedHostState when a node transitions into a state where the node becomes partially or totally unavailable. Just like in YARN or Mesos, we propose a decommission mechanism with 2 phases, first a drain phase where the node is still running but not accepting further work (DECOMMISSIONING in YARN, and DRAIN in Mesos), followed by a second phase where executors in the node are forcibly shut down (DECOMMISIONED in YARN, and DOWN in Mesos). In this PR we focus only in YARN, and in the actions when the node transitions into DECOMMISSIONING state: blacklisting the node when it transitions to DECOMMISSIONING, and un-blacklist the node when it gets back to the normal healthy RUNNING state.
The decommissioning process would not be initiated by Spark, but by an operator or an automated system (e.g. the cloud environment where YARN is running), on response to some relevant event (e.g. a cluster resize event), and it would consist on calling the YARN administrative command yarn rmadmin -refreshNodes -g for the affected node. Spark would just react to the node state transition events it receives from the cluster manager.
To make this extensible to other cluster managers besides YARN, we define the HostState type in Spark, and keep the interaction with the specifics of each cluster manager into the corresponding packages. For example for YARN, when YarnAllocator gets a node state transition event, it converts the node event from the YARN specific NodeState into HostState, wraps it into a HostStatusUpdate message, and sends it to the CoarseGrainedSchedulerBackend, that then performs the required actions for that node.

This code works on a modified version of Hadoop 2.7.3 with patches to support YARN-4676 (basic graceful decommission), and an approximation to YARN-3224 (when a node transitions into DECOMMISSIONING state the resource manager notifies that to each relevant application master by adding it to the list of updated nodes available in the AllocateResponse returned by the RM as a response to the AM heartbeat). For these reasons, this code won't work as-is on vanilla Hadoop. The main problem is that the decommissioning mechanism for YARN is not completely implemented (see YARN-914), and some of the parts that are implemented are only available for YARN 2.9.0 (see YARN-4676). To cope with this, we propose implementing an administrative command to send node transitions directly to the Spark driver, as HostStatusUpdate messages addressed to the CoarseGrainedSchedulerBackend. This command would be similar to the yarn rmadmin -refreshNodes -g, which is currently used for decommissioning nodes in YARN. When YARN-914 is complete, this could still be used as a secondary interface for decommissioning nodes, so nodes transitions could be signaled either by the cluster manager, or using the administrative command (either manually or through some automation implemented by the cloud environment).

We would like to get some feedback on this approach in general, and in the administrative command solution in particular. If that sounds good, then we will work on modifying this PR so it works on vanilla Hadoop 2.7, and to implement the administrative command.

@juanrh
Copy link
Author

juanrh commented Oct 6, 2017

Hi @vanzin, do you have any comments on the design document attached above?

Thanks

@vanzin
Copy link
Contributor

vanzin commented Oct 6, 2017

Sorry I haven't been having time to look at anything around here lately. I recommend you make some noise in the mailing list or ping other people if you want to get attention to this.

@tgravescs
Copy link
Contributor

just reading through your description here all the yarn pieces aren't in place so you have an admin type command to signal spark that a node is being decommissioned. But that means someone has to run that command on every single spark application running on that cluster, correct? That doesn't seem very feasible on any relatively large cluster. One big question I have is, are there enough use cases where just the command is useful? Otherwise we are temporarily adding a command that we will have to keep supporting forever (or perhaps only the next major release). Or does it make sense to wait for YARN (or another resource manager) to have full support for this?

@juanrh
Copy link
Author

juanrh commented Oct 17, 2017

Hi Tom, thanks for your answer.

Regarding use cases for the Spark admin command, I think it would be a good fit for cloud environments, where single job clusters are common, because creating and destroying clusters is easy. Also, to cover the case for clusters with many applications, we could add some option to the admin command to decommission the specified nodes for all Spark applications running in the cluster, and implement that using features of the configured cluster manager to discover Spark applications. For example for Yarn we could use YarnClient.getApplications to discover the application master / Spark driver address for all running Spark applications, using ApplicationReport.getApplicationType to consider just applications with type "SPARK".

It would be nice not having to support a temporary command, but an advantage of doing so is that it allows to get this feature in Spark without having to wait for it to be available in the cluster manager. Also, for Spark Standalone mode I don't see any option than having an admin command, in the line of existing commands like ./sbin/start-master.sh, because in that case Spark also provides the cluster manager.

@juanrh
Copy link
Author

juanrh commented Oct 20, 2017

Hi @vanzin and @tgravescs, do you have any other comments on this proposal?

Thanks,

Juan

@tgravescs
Copy link
Contributor

for the admin type command we would need to have an RPC between the client command and the driver. There is a jira for that already (https://issues.apache.org/jira/browse/SPARK-21737) but its not done yet. I can see the admin command useful as it could be used in any cluster - yarn, mesos, standalone. It also doesn't appear the yarn support will be done any time soon unless we push on it so the only option would be that admin type command. So overall I'm fine with that approach it just needs to have more details about how that would work. The admin command on yarn wouldn't necessarily need to look at all apps. If you are decommissioning a single NM it could go to the rest api and get the list of apps running on it and just go to those.

I'm curious, Are you running into a particular case of this and if so what env?

@juanrh
Copy link
Author

juanrh commented Oct 26, 2017

Hi @tgravescs, thanks again for your feedback. Regarding concrete uses cases, this change might be used extend the existing graceful decommission mechanism available in AWS EMR from a while ago. That mechanism was originally designed for MapReduce with a 1 to 1 correspondence between tasks and YARN containers, so it has to be adapted to Spark where an executor running in a single YARN containers is able to run many Spark tasks. This could be useful to react to cluster resizes or SPOT instance terminations that happen in the middle of a Spark job execution. As described in the attached document, this PR sets the base framework for graceful decommission on Spark, and performs the first action to react to node decommission, but additional actions could be added in future PRs to react to other transitions, like for example unregistering shuffle blocks when a node transitions to DECOMMISSIONED state.

@juanrh
Copy link
Author

juanrh commented Nov 7, 2017

@tgravescs I have opened apache/hadoop#289 to the YARN changes to get a notification in the AM when a nodes transitions to DECOMMISSIONING. This should be already useful for this change, but I'll also take a look to https://issues.apache.org/jira/browse/SPARK-21737 so all cluster managers could be supported

@juanrh
Copy link
Author

juanrh commented Nov 27, 2017

@tgravescs I was finally able to contribute apache/hadoop#289 which solves YARN-6483. With that patch, and the code in this pull request, in YarnAllocator.allocateResources we will receive a NodeReport entry in allocateResponse.getUpdatedNodes for each node moved to DECOMMISSIONING state using Hadoop's graceful decommission, which would trigger blacklisting for those nodes.

But for now YARN-6483 has only been accepted for Hadoop 3.1.0, so I'll work on SPARK-21737 to have an alternative solution that doesn't rely on the cluster manager

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@vanzin
Copy link
Contributor

vanzin commented Dec 15, 2018

I'm going to close this for now since, based on the last few comments, it feels like this needs at least some re-work. Also because this is attached to a bug that has another open PR which is being actively worked on.

If reopening I suggest looking at that PR and opening a new bug if they're not really overlapping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants