-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[autoscaler] Add a 'request_cores' function for manual autoscaling #4754
[autoscaler] Add a 'request_cores' function for manual autoscaling #4754
Conversation
Note that the latter four commits here are part of #4653, I include them only because without them this PR does not work for me. They can be removed prior to merge |
Test FAILed. |
Test FAILed. |
@robertnishihara Who is the right person to review this PR? |
python/ray/autoscaler/autoscaler.py
Outdated
@@ -48,12 +49,6 @@ | |||
# node. This takes precedence over min_workers. | |||
"max_workers": (int, REQUIRED), | |||
|
|||
# The number of workers to launch initially, in addition to the head node. | |||
"initial_workers": (int, OPTIONAL), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually find this quite useful and would prefer to keep it somewhere in the code, though maybe not in the cluster config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for it now @richardliaw that wouldn't be handled by a request_cores() call in your notebook / script on the head node? The reason we took it out was that we couldn't come up with any situation where request_cores() wouldn't replace it and figured that having a simpler / one-way-to-do-things design would be less confusing for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see; right now, my use case for initial_workers is to start a large cluster, and run an experiment overnight, where in a couple hours the experiment finishes and the cluster should downsize. Importantly, my experiment script was first tested on my local machine and then submitted to a cluster.
I guess I might have misunderstood the functionality of request_cores
- It'd be great if you could include both docstrings and a section on the documentation for how to use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the way you'd do this (we are actually doing that) is to, at the start of the script, run request_cores(redis_address, 2000)
which will ensure the cluster has a minimum of 2000 cores available for you to use.
I'll work on documentation!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, after talking to more users, I think keeping initial_workers
is necessary. It's a workaround that allows us to start GPU workers with a non-GPU head, which is a commonly used feature for training.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately though, I think the non-gpu-head gpu-worker setup should work without the initial_workers
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial workers is a huge hack, because if your nodes go idle, you now need to restart the entire cluster in order to get them to come back up. I think it's way better to request directly.
request_gpus could be added here if necessary.
I've been too busy with other stuff to get documentation written and this upstream but I'd be happy with a request_gpu's option in here.
I can take a glance but would prefer @hartikainen or @ericl to also go through this. @markgoodhead and @ls-daniel how do you guys test autoscaler changes? do you have any integration/end-to-end tests? |
|
||
# Delete unsupported keys from the node config | ||
try: | ||
del conf["Resources"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this key in particular?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's added to the config in order for request_cores
to know how many workers to start.
Scripts / remote functions shouldn't care about the cluster topology; they just care about resources.
In the future it might be possible to extend this so that there are multiple types of worker_node
e.g. one might have CPU resources and another GPU. You could then dynamically request a GPU for a task and have it spin down when done.
@richardliaw re "how do you guys test autoscaler changes? do you have any integration/end-to-end tests?" We don't have any additional tests as such, over and above what's in the code already (e.g. the testManualAutoscaling case I've added here). We are however working from master or close to it on an ongoing basis so we're spotting issues that way. I think that it's difficult to test that both AWS and GCP function because we don't really have any incentive to deal with GCP and GCP users don't care about AWS; I don't even have an account I can access, and I'm not sure it could be integrated into Travis either. |
@@ -9,16 +9,6 @@ min_workers: 0 | |||
# node. This takes precedence over min_workers. | |||
max_workers: 2 | |||
|
|||
# The initial number of worker nodes to launch in addition to the head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a breaking API change? We should avoid these for non experimental APIs.
OK this is fine. I think Can you revert changes to the config, making this just an addition of |
Apologies for the delay here, I've been ill for some time and catching up :) I'll look at this when I can. It's not a simple config change because initial_workers interacts with this e.g. we'd need max(initial_workers, requested_workers, other_stuff) and testing becomes harder as the combinatorial complexity increases |
b8f00cf
to
027843c
Compare
Docstrings done, old API restored, rebased on master. I'll give this a test now. |
Test FAILed. |
Test FAILed. |
Tested and working for me. I'll wait and see what Travis says. |
Looks like the test for aggressive autoscaling is failing. Investigating. |
Test PASSed. |
Test FAILed. |
Looks like only a lint is required - other remaining failures seem unrelated. |
self.load_metrics.prune_active_ips( | ||
[self.provider.internal_ip(node_id) for node_id in nodes]) | ||
target_workers = self.target_num_workers() | ||
|
||
if len(nodes) >= target_workers: | ||
if "CPU" in self.resource_requests: | ||
del self.resource_requests["CPU"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why drop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've satisfied the resource request.
It doesn't act as a runtime min_workers - it's a signal to make that level of resource available ephemerally, and scale down once it's no longer used. That way, consumers of the API don't need to think about ramping down their request as tasks complete.
Test FAILed. |
Test FAILed. |
Test FAILed. |
@richardliaw I think all the outstanding requested changes have been made - do you think this is ready for a merge now? |
Who is the correct person to approve this change now @richardliaw has removed their assignment? |
Hey @markgoodhead there was some offline discussion and think this could be hard to maintain long term. I think there are a couple alternatives that could work instead at the application level (even if a bit hacky):
The benefit of these approaches is they could be implemented outside of Ray without any changes (or perhaps minimal changes). |
Hi @ericl , It'd be good to understand more how you see this area evolving, what the UX looks like in the end and what's difficult about this feature to maintain. We've already come to rely really heavily on this feature internally (and have moved off initial workers entirely as it's obsoleted) because it completely changes the user experience on cloud computing platforms to make it dramatically better, to the point that we can't really imagine not using it now. Let me give you some example user workflows we have any why this solution is night and day compared to editing yaml files / initial workers type stuff. We have a setup internally now where we frequently bring up a ray cluster for R&D using a very simple command line tool. Starting the head node (which is a small, cheap, on-demand box) and installing all our software takes a couple of minutes on AWS. When that starts we automatically open a jupyter-lab terminal for the user, with the aim to make it as close to feeling like developing locally as possible. We start no workers on startup and have an aggressive worker timeout, as many large worker machines are the bulk of the cost of this setup (even as spot instances), so we only want them up when we're definitely going to be using them. From the user's perspective we don't want them thinking about the workers and how to manage them - they should just 'be there' when needed and turn themselves off when not. Normally a user will spend a few minutes setting up their notebook, maybe still running some commands on the head node, during which time if we had any initial workers started they'd already have been killed by our aggressive timeout (which we do really want - it's a great feature!), so regardless we're starting from zero workers each time and that's actually a good thing for our workload - there's no pressure to race to get your head node active or doing anything because you know workers will be there in the end when you need them. Often we have workloads where the user beforehand has a very good idea of how parallel they can run and how much resources they'll be able to efficiently utilise. This is obviously different across workloads and we're now at the point where we rely on being able to handle highly different workflows with very different resource requirements by setting our max workers to be basically as high as we feel ray can handle from a performance perspective - indeed we've tried as much as possible to obsolete users having to change yaml files because it enforces a very awkward workflow on the user (shutdown/restart your cluster every time you want a different setting - a lot of user pain as it takes minutes to switch and get things set back up again). With max workers set to as large as we feel is technically possible then any workload can be run on the same cluster as it's just a matter of requesting however much resource you feel you need / can use: if it's a 10 core job just request 10 cores, if it's a 5000 core job then request 5000. If the command in this PR is run first before the main notebook code then users can rely on getting the resource they want as quickly as possible; waiting for 5000 cores to start entirely from the autoscaler natively takes far too long (going from 1 to 5000 scaling by 20% each time takes quite a while!) and for many highly parallel workloads that slows down the completion time significantly. The two scaling modes work together well, as user's only need to be roughly correct in their resource requirements (in practise we find generally the user knows precisely the resource they can use) and if they are saturating the cluster and could utilise more resource we can rely on the autoscaler to bring up more workers (and take them down when the work is finished). We've actually tuned our autoscaler to be more efficient/conservative and only scale at 95%+ utilisation by 10% because we a) care a lot about wasted resource as it's very expensive and b) don't need to rely on the automatic autoscaling so much with this feature as we've given the users control. Based on our internal feedback we'd actually like to go one step further and support multiple worker types on the same head node with the autoscaler selecting the appropriate worker types based on the resource requested: e.g. if the user requests GPUs give them GPU workers, a mix of CPU and GPU might bring up a hybrid set of workers etc. Again, the goal here is to prevent users from having to think hard (or even at all) about managing the infrastructure, restarting clusters with different yaml files etc etc and make it feel like working on the head node is very lightweight and as close as possible to just working locally. A simple command of "I need X CPUs / GPUs now please" is a wonderful abstraction from a user perspective as it "just works" (indeed we use the same code locally/on the head node as it just does nothing if you're not on a cluster) with no restarting / file editing required and our users frequently work from the same head node all day doing very different workloads without ever having to worry about managing the infrastructure behind all the computation they're doing. The UX we're aiming for (and we have many more features we can think of that would incrementally get closer to this) is for working on the cluster to feel as easy as working on one machine locally, except this machine just has magic, one-line commands to suddenly become one massive machine with huge resource available to it (handled by ray). I appreciate the suggestions you came back with but both of those are a significant drop in UX for our workflows and right now our team can't imagine ever moving back to the 'old way' of doing things, it's just too painful from the perspective of a user who doesn't want to feel like they're having to work around Ray / the autoscaler rather than having it work for them. As you can see from my very long post (!) we're really passionate about this particular issue because we've seen dramatic benefits from going down this design path and hope this explains why. I think focusing on the usability / ease-of-use for cluster computing in Ray really opens up the target market / userbase as it means the learning curve for adoption for users who just want to 'get things done' and not feel like they have to learn a big new framework and a whole set of magic tweaks of various files etc to make things work is much lighter. One of the really big selling points that brought us to ray in the first place was the wonderfully simple UI for turning any python code parallel - one ray.init() and a @Remote decorator and you were good to go; no more knowledge of ray required. I'd really like to bring that same design philosophy to the autoscaler/cluster setup too and push hard for features that can be expressed in a single line of python code that then 'just work' like we've tried to do with this PR as I think the upside here is huge. I'd rather maintain our fork of this forever than have to go back to an initial-workers / yaml file editing world :-) Very keen to hear your guys thoughts on this and on what we could do to encourage the autoscaler to go in this direction in the long term. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markgoodhead thanks, it makes sense now that this can be a good long-term API for Ray. Let's get it in now as an experimental feature -- I left a couple comments that should be easy to address before merging.
I also agree allowing a mix of different node types would be very useful with such an API (especially so if we improved the scaling to take into account queue lengths from the scheduler).
python/ray/autoscaler/autoscaler.py
Outdated
|
||
redis_address: str -- the redis address of the head ray node (required) | ||
cores: int -- the number of CPU cores to request (required) | ||
redis_password: str -- redis password (optional, default None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can actually get redis_address and password from ray.worker.global_worker.node.redis_address
and ray.worker.global_worker.node.redis_password
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
python/ray/autoscaler/autoscaler.py
Outdated
@@ -824,3 +854,22 @@ def add_hash_of_file(fpath): | |||
_hash_cache[conf_str] = hasher.hexdigest() | |||
|
|||
return _hash_cache[conf_str] | |||
|
|||
def request_cores(redis_address, cores, redis_password=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a bit more general: ray.autoscaler.request_resources(num_cpus=None, num_gpus=None)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do although we haven't implemented the GPU bit yet - are you happy for us to just raise a NotImplementedError for now if someone requests those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
|
||
def request_cores(redis_address, cores, redis_password=None): | ||
"""Remotely request some CPU cores from the autoscaler. | ||
This function is to be called e.g. on a node before submitting a bunch of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the formatting changes requested, although I wasn't able to run the format script on my machine for some reason so some things may still be off.
python/ray/autoscaler/autoscaler.py
Outdated
|
||
Arguments: | ||
|
||
redis_address: str -- the redis address of the head ray node (required) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should all be indented by 4 spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Co-Authored-By: Robert Nishihara <robertnishihara@gmail.com>
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
This is failing lint (scripts/format.sh) |
I tried running lint on my machine but was getting errors - would one of the ray team be able to help out and run it? |
Test PASSed. |
This PR allows users to manually request resources for the autoscaler. It is aimed at being a more general solution to 'aggressive_autoscaling', 'initial_workers' and other such hacks.
In the future this could possibly be extended to start GPU nodes etc if the autoscaler ever supported multiple node formats.
I have removed those two options as they were originally added by me and I believe they are no longer necessary.
I have added a test for the functionality and removed the tests for now obsoleted code.
Usage:
This will then ensure that on the next autoscaler cycle, workers sufficient to run 20 tasks are made available (if not otherwise restricted e.g. by max_workers).
We are using this in production with thousands of workers and it seems able to quickly spin up 50-100 nodes on AWS.