-
Notifications
You must be signed in to change notification settings - Fork 6.5k
EC2 cluster setup scripts and initial version of auto-scaler #1311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slick stuff, some comments and will try out the spot instance functionality
import time | ||
import sys | ||
|
||
import yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an (official) dependency now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do I make it one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think add to requirements.txt
and all the test setup scripts?
python/ray/scripts/scripts.py
Outdated
@click.option( | ||
"--max-workers", required=False, type=int, help=( | ||
"Override the configured max worker count for the cluster.")) | ||
def create_or_update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a fan of the naming but this is just aesthetics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you call it? E.g.,
ray start_cluster
ray terminate_cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I guess start_cluster
doesn't capture the possibility of updating an existing cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One idea is ray cluster --start
and ray cluster --update
(similar to ray start --head
and ray start --redis-address ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ray create_or_update_cluster <>
....
python/ray/scripts/scripts.py
Outdated
"Override the configured min worker count for the cluster.")) | ||
@click.option( | ||
"--max-workers", required=False, type=int, help=( | ||
"Override the configured max worker count for the cluster.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"max worker node count"? (and "min worker node count" in docs above - can be confused with ray workers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
self.runtime_hash, self.node_id), | ||
file=self.stdout) | ||
|
||
def do_update(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind adding some comments for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
print(self.debug_string()) | ||
return | ||
else: | ||
# If enough nodes, terminate an out-of-date node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you kill nodes first, then you start nodes, then you kill out-of-date nodes - doesn't this result in less nodes than target?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess in the next loop, this will be addressed - but it's a little weird being out of sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it's kind of an arbitrary order. I'm not sure there is a right answer here.
python/ray/autoscaler/updater.py
Outdated
NodeUpdater.__init__(self, *args, **kwargs) | ||
|
||
|
||
class NodeUpdaterThread(NodeUpdater, Thread): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's used for the unit test, to allow the provider to be mocked in a single process
python/ray/autoscaler/autoscaler.py
Outdated
del self.updaters[node_id] | ||
print(self.debug_string()) | ||
|
||
def reload_config(self, errors_fatal): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everywhere, errors_fatal
is a key value - update signature to match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
python/ray/autoscaler/autoscaler.py
Outdated
There are two ways to start an autoscaling cluster: manually by running | ||
`ray start --head --autoscaling-config=/path/to/config.json` on a | ||
instance that has permission to launch other instances, or you can also use | ||
`ray bootstrap /path/to/config.json` from your laptop, which will configure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of date?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
python/ray/autoscaler/commands.py
Outdated
remote_key_path = "~/ray_bootstrap_key.pem".format( | ||
config["auth"]["ssh_user"]) | ||
cluster_config_path = "~/ray_bootstrap_config.yaml".format( | ||
config["auth"]["ssh_user"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this do anything? There's no {}
in the above two strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
python/ray/autoscaler/commands.py
Outdated
new_mounts = {} | ||
for remote_path in config["file_mounts"].keys(): | ||
new_mounts[remote_path] = remote_path | ||
remote_config["file_mounts"] = new_mounts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think keys()
is needed - also, below is less verbose
remote_config["file_mounts"] = {
path: path for path in config["file_mounts"]}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed keys.
print( | ||
"StandardAutoscaler: Terminating unneeded node: " | ||
"{}".format(nodes[-1])) | ||
self.provider.terminate_node(nodes[-1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naive question - Is this ok? will this have weird effects on fault tolerance/do you need to drain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing the fault tolerance will not be very happy with this since we don't have task checkpoints. That's fine for now, GCS integration is not implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@richardliaw what do you mean by "drain"? Do you mean transfer objects away from that node to other nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I just meant letting tasks finish while not scheduling any more tasks on that node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robertnishihara SSH key creation should be more robust now
python/ray/autoscaler/autoscaler.py
Outdated
There are two ways to start an autoscaling cluster: manually by running | ||
`ray start --head --autoscaling-config=/path/to/config.json` on a | ||
instance that has permission to launch other instances, or you can also use | ||
`ray bootstrap /path/to/config.json` from your laptop, which will configure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
print( | ||
"StandardAutoscaler: Terminating unneeded node: " | ||
"{}".format(nodes[-1])) | ||
self.provider.terminate_node(nodes[-1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing the fault tolerance will not be very happy with this since we don't have task checkpoints. That's fine for now, GCS integration is not implemented.
print(self.debug_string()) | ||
return | ||
else: | ||
# If enough nodes, terminate an out-of-date node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it's kind of an arbitrary order. I'm not sure there is a right answer here.
python/ray/autoscaler/autoscaler.py
Outdated
del self.updaters[node_id] | ||
print(self.debug_string()) | ||
|
||
def reload_config(self, errors_fatal): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances | ||
head_node: | ||
InstanceType: m5.large | ||
ImageId: ami-212d465b |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that could work. Is it possible to pip install a particular travis build on an unmerged PR?
self.runtime_hash, self.node_id), | ||
file=self.stdout) | ||
|
||
def do_update(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
python/ray/autoscaler/updater.py
Outdated
NodeUpdater.__init__(self, *args, **kwargs) | ||
|
||
|
||
class NodeUpdaterThread(NodeUpdater, Thread): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's used for the unit test, to allow the provider to be mocked in a single process
python/ray/scripts/scripts.py
Outdated
"Override the configured min worker count for the cluster.")) | ||
@click.option( | ||
"--max-workers", required=False, type=int, help=( | ||
"Override the configured max worker count for the cluster.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
python/ray/scripts/scripts.py
Outdated
@click.option( | ||
"--max-workers", required=False, type=int, help=( | ||
"Override the configured max worker count for the cluster.")) | ||
def create_or_update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ray create_or_update_cluster <>
....
@@ -0,0 +1,326 @@ | |||
from __future__ import absolute_import |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm always forgetting...
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Test failed on Travis with
It should be |
Oops! fixed |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
|
||
def _update(self): | ||
nodes = self.workers() | ||
target_num_workers = self.config["max_workers"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like target_num_workers
, whatever happened to that?
What do these changes do?
This adds an autoscaler component to Ray, which can autonomously (eventually) add and remove worker nodes. This also simplifies cluster setup -- all we need to do is bootstrap a head node that in turn can set up the worker nodes.