Skip to content

EC2 cluster setup scripts and initial version of auto-scaler #1311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 81 commits into from
Dec 16, 2017

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Dec 11, 2017

What do these changes do?

This adds an autoscaler component to Ray, which can autonomously (eventually) add and remove worker nodes. This also simplifies cluster setup -- all we need to do is bootstrap a head node that in turn can set up the worker nodes.

Copy link
Contributor

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slick stuff, some comments and will try out the spot instance functionality

import time
import sys

import yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an (official) dependency now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I make it one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think add to requirements.txt and all the test setup scripts?

@click.option(
"--max-workers", required=False, type=int, help=(
"Override the configured max worker count for the cluster."))
def create_or_update(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a fan of the naming but this is just aesthetics

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you call it? E.g.,

ray start_cluster
ray terminate_cluster

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I guess start_cluster doesn't capture the possibility of updating an existing cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea is ray cluster --start and ray cluster --update (similar to ray start --head and ray start --redis-address ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray create_or_update_cluster <>....

"Override the configured min worker count for the cluster."))
@click.option(
"--max-workers", required=False, type=int, help=(
"Override the configured max worker count for the cluster."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"max worker node count"? (and "min worker node count" in docs above - can be confused with ray workers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self.runtime_hash, self.node_id),
file=self.stdout)

def do_update(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding some comments for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

print(self.debug_string())
return
else:
# If enough nodes, terminate an out-of-date node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you kill nodes first, then you start nodes, then you kill out-of-date nodes - doesn't this result in less nodes than target?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in the next loop, this will be addressed - but it's a little weird being out of sync

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it's kind of an arbitrary order. I'm not sure there is a right answer here.

NodeUpdater.__init__(self, *args, **kwargs)


class NodeUpdaterThread(NodeUpdater, Thread):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's used for the unit test, to allow the provider to be mocked in a single process

del self.updaters[node_id]
print(self.debug_string())

def reload_config(self, errors_fatal):
Copy link
Contributor

@richardliaw richardliaw Dec 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everywhere, errors_fatal is a key value - update signature to match?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

There are two ways to start an autoscaling cluster: manually by running
`ray start --head --autoscaling-config=/path/to/config.json` on a
instance that has permission to launch other instances, or you can also use
`ray bootstrap /path/to/config.json` from your laptop, which will configure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

remote_key_path = "~/ray_bootstrap_key.pem".format(
config["auth"]["ssh_user"])
cluster_config_path = "~/ray_bootstrap_config.yaml".format(
config["auth"]["ssh_user"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this do anything? There's no {} in the above two strings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

new_mounts = {}
for remote_path in config["file_mounts"].keys():
new_mounts[remote_path] = remote_path
remote_config["file_mounts"] = new_mounts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think keys() is needed - also, below is less verbose

remote_config["file_mounts"] = {
    path: path for path in config["file_mounts"]}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed keys.

print(
"StandardAutoscaler: Terminating unneeded node: "
"{}".format(nodes[-1]))
self.provider.terminate_node(nodes[-1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naive question - Is this ok? will this have weird effects on fault tolerance/do you need to drain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the fault tolerance will not be very happy with this since we don't have task checkpoints. That's fine for now, GCS integration is not implemented.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardliaw what do you mean by "drain"? Do you mean transfer objects away from that node to other nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I just meant letting tasks finish while not scheduling any more tasks on that node

Copy link
Contributor Author

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertnishihara SSH key creation should be more robust now

There are two ways to start an autoscaling cluster: manually by running
`ray start --head --autoscaling-config=/path/to/config.json` on a
instance that has permission to launch other instances, or you can also use
`ray bootstrap /path/to/config.json` from your laptop, which will configure
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

print(
"StandardAutoscaler: Terminating unneeded node: "
"{}".format(nodes[-1]))
self.provider.terminate_node(nodes[-1])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the fault tolerance will not be very happy with this since we don't have task checkpoints. That's fine for now, GCS integration is not implemented.

print(self.debug_string())
return
else:
# If enough nodes, terminate an out-of-date node.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it's kind of an arbitrary order. I'm not sure there is a right answer here.

del self.updaters[node_id]
print(self.debug_string())

def reload_config(self, errors_fatal):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
head_node:
InstanceType: m5.large
ImageId: ami-212d465b
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that could work. Is it possible to pip install a particular travis build on an unmerged PR?

self.runtime_hash, self.node_id),
file=self.stdout)

def do_update(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

NodeUpdater.__init__(self, *args, **kwargs)


class NodeUpdaterThread(NodeUpdater, Thread):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's used for the unit test, to allow the provider to be mocked in a single process

"Override the configured min worker count for the cluster."))
@click.option(
"--max-workers", required=False, type=int, help=(
"Override the configured max worker count for the cluster."))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@click.option(
"--max-workers", required=False, type=int, help=(
"Override the configured max worker count for the cluster."))
def create_or_update(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray create_or_update_cluster <>....

@@ -0,0 +1,326 @@
from __future__ import absolute_import
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm always forgetting...

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2810/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2811/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2812/
Test PASSed.

@robertnishihara
Copy link
Collaborator

robertnishihara commented Dec 15, 2017

Test failed on Travis with

�[0K$ python test/autoscaling_test.py
python: can't open file 'test/autoscaling_test.py': [Errno 2] No such file or directory

travis_time:end:0444da84:start=1513370111652119316,finish=1513370111670728297,duration=18608981
�[0K
�[31;1mThe command "python test/autoscaling_test.py" exited with 2.�[0m

It should be autoscaler_test.py

@ericl
Copy link
Contributor Author

ericl commented Dec 15, 2017

Oops! fixed

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2815/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2816/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2818/
Test PASSed.


def _update(self):
nodes = self.workers()
target_num_workers = self.config["max_workers"]
Copy link
Contributor

@DmitriGekhtman DmitriGekhtman Aug 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like target_num_workers, whatever happened to that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants