Skip to content

Commit

Permalink
Update cluster guide (ray-project#347)
Browse files Browse the repository at this point in the history
* clarify cluster setup instructions

* update multinode documentation, update cluster script, fix minor bug in worker.py

* clarify cluster documentation and fix update_user_code
  • Loading branch information
jssmith authored and robertnishihara committed Aug 4, 2016
1 parent de200ff commit 3ee0fd8
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@
*_pb2.py
*.pb.h
*.pb.cc

# Ray cluster configuration
scripts/nodes.txt
108 changes: 73 additions & 35 deletions doc/using-ray-on-a-cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ permissions for the private key file to `600` (i.e. only you can read and write
it) so that `ssh` will work.
- Whenever you want to use the `ec2.py` script, set the environment variables
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to your Amazon EC2 access key ID
and secret access key. These can be obtained from the [AWS
homepage](http://aws.amazon.com/) by clicking Account > Security Credentials >
Access Credentials.
and secret access key. These can be generated from the [AWS
homepage](http://aws.amazon.com/) by clicking My Account > Security Credentials >
Access Keys, or by [creating an IAM user](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html).

### Launching a Cluster

- Install the required dependencies on the machine you will be using to run the
cluster launch scripts.
```
sudo pip install --upgrade boto
```

- Go into the `ray/scripts` directory.
- Run `python ec2.py -k <keypair> -i <key-file> -s <num-slaves> launch
<cluster-name>`, where `<keypair>` is the name of your EC2 key pair (that you
Expand All @@ -39,7 +45,13 @@ and `<cluster-name>` is the name to give to your cluster.
```bash
export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
python ec2.py --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 launch my-ray-cluster
python ec2.py --key-pair=awskey \
--identity-file=awskey.pem \
--region=us-west-1 \
--instance-type=c4.4xlarge \
--spot-price=2.50 \
--slaves=1 \
launch my-ray-cluster
```

The following options are worth pointing out:
Expand All @@ -55,6 +67,9 @@ capacity in one zone, and you should try to launch in another.
- `--spot-price=<price>` will launch the worker nodes as [Spot
Instances](http://aws.amazon.com/ec2/spot-instances/), bidding for the given
maximum price (in dollars).
- `--slaves=<num-slaves>` will launch a cluster with `(1 + num_slaves)` instances.
The first instance is the head node, which in addition to hosting workers runs the
Ray scheduler and application driver programs.

## Getting started with Ray on a cluster

Expand All @@ -70,48 +85,75 @@ cluster. For example
12.34.56.789
12.34.567.89
The first node in the file is the "head" node. The scheduler will be started on
the head node, and the driver should run on the head node as well.
the head node, and the driver should run on the head node as well. If the nodes
have public and private IP addresses (as in the case of EC2 instances), you can
list the `<public-ip-address>, <private-ip-address>` in `nodes.txt` like

12.34.56.789, 98.76.54.321
12.34.567.89, 98.76.543.21
The `cluster.py` administrative script will use the public IP addresses to ssh
to the nodes. Ray will use the private IP addresses to send messages between the
nodes during execution.

2. Make sure that the nodes can all communicate with one another. On EC2, this
can be done by creating a new security group and adding the inbound rule "all
traffic" and adding the outbound rule "all traffic". Then add all of the nodes
in your cluster to that security group.
can be done by creating a new security group with the appropriate inbound and
outbound rules and adding all of the nodes in your cluster to that security
group. This is done automatically by the `ec2.py` script. If you have used the
`ec2.py` script you can log into the hosts with the username `ubuntu`.

3. Run something like
3. From the `ray/scripts` directory, run something like

```
python scripts/cluster.py --nodes nodes.txt \
--key-file key.pem \
--username ubuntu \
--installation-directory /home/ubuntu/
python cluster.py --nodes=nodes.txt \
--key-file=awskey.pem \
--username=ubuntu
```
where you replace `nodes.txt`, `key.pem`, `ubuntu`, and `/home/ubuntu/` by the
appropriate values. This assumes that you can connect to each IP address
`<ip-address>` in `nodes.txt` with the command
where you replace `nodes.txt`, `key.pem`, and `ubuntu` by the appropriate
values. This assumes that you can connect to each IP address `<ip-address>` in
`nodes.txt` with the command
```
ssh -i <key-file> <username>@<ip-address>
```
4. The previous command should open a Python interpreter. To install Ray on the
cluster, run `cluster.install_ray()` in the interpreter. The interpreter should
block until the installation has completed. The standard output from the nodes
will be redirected to your terminal.
5. To check that the installation succeeded, you can ssh to each node, cd into
the directory `ray/test/`, and run the tests (e.g., `python runtest.py`).
6. Start the cluster (the scheduler, object stores, and workers) with the
command `cluster.start_ray("~/example_ray_code")`, where the argument is
the local path to the directory that contains your Python code. This command will
copy this source code to each node and will start the cluster. Each worker that
is started will have a local copy of the ~/example_ray_code directory in their
PYTHONPATH. After completing successfully, you can connect to the ssh to the
head node and attach a shell to the cluster by first running the following code.
5. To check that the installation succeeded, you can ssh to each node and run
the tests.
```
cd "$RAY_HOME/../user_source_files/example_ray_code";
source "$RAY_HOME/setup-env.sh";
cd $HOME/ray/
source setup-env.sh # Add Ray to your Python path.
python test/runtest.py # This tests basic functionality.
python test/array_test.py # This tests some array libraries.
```
6. Start the cluster with `cluster.start_ray()`. If you would like to deploy
source code to it, you can pass in the local path to the directory that contains
your Python code. For example, `cluster.start_ray("~/example_ray_code")`. This
will copy your source code to each node on the cluster, placing it in a
directory on the PYTHONPATH.
The `cluster.start_ray` command will start the Ray scheduler, object stores, and
workers, and before finishing it will print instructions for connecting to the
cluster via ssh.
7. To connect to the cluster (either with a Python shell or with a script), ssh
to the cluster's head node (as described by the output of the
`cluster.start_ray` command. E.g.,
```
Then within Python, run the following.
```python
import ray
ray.init(scheduler_address="12.34.56.789:10001", objstore_address="12.34.56.789:20001", driver_address="12.34.56.789:30001")
The cluster has been started. You can attach to the cluster by sshing to the head node with the following command.
ssh -i awskey.pem ubuntu@12.34.56.789
Then run the following commands.
cd $HOME/ray
source $HOME/ray/setup-env.sh # Add Ray to your Python path.
Then within a Python interpreter, run the following commands.
import ray
ray.init(scheduler_address="98.76.54.321:10001", objstore_address="98.76.54.321:20001", driver_address="98.76.54.321:30001")
```
7. Note that there are several more commands that can be run from within
Expand All @@ -126,7 +168,3 @@ Then within Python, run the following.
processes).
- `cluster.update_ray()` - This pulls the latest Ray source code and builds
it.
Once you've started a Ray cluster using the above instructions, to actually use
Ray, ssh to the head node (the first node listed in the `nodes.txt` file) and
attach a shell to the already running cluster.
3 changes: 2 additions & 1 deletion lib/python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def register_module(module, worker=global_worker):
_logger().info("registering {}.".format(val.func_name))
worker.register_function(val)

def init(start_ray_local=False, num_workers=None, num_objstores=1, scheduler_address=None, objstore_address=None, driver_address=None, driver_mode=SCRIPT_MODE):
def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_address=None, objstore_address=None, driver_address=None, driver_mode=SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
Expand Down Expand Up @@ -694,6 +694,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=1, scheduler_add
if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]:
raise Exception("If start_ray_local=True, then driver_mode must be in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE].")
num_workers = 1 if num_workers is None else num_workers
num_objstores = 1 if num_objstores is None else num_objstores
# Start the scheduler, object store, and some workers. These will be killed
# by the call to cleanup(), which happens when the Python script exits.
scheduler_address, objstore_addresses, driver_addresses = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None)
Expand Down
47 changes: 28 additions & 19 deletions scripts/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
parser.add_argument("--nodes", type=str, required=True, help="Test file with node IP addresses, one line per address.")
parser.add_argument("--key-file", type=str, required=True, help="Path to the file that contains the private key.")
parser.add_argument("--username", type=str, required=True, help="User name for logging in.")
parser.add_argument("--installation-directory", type=str, required=True, help="The directory in which to install Ray.")

class RayCluster(object):
"""A class for setting up, starting, and stopping Ray on a cluster.
Expand Down Expand Up @@ -130,7 +129,7 @@ def install_ray(self):
""".format(self.installation_directory, self.installation_directory)
self._run_command_over_ssh_on_all_nodes_in_parallel(install_ray_command)

def start_ray(self, user_source_directory, num_workers_per_node=10):
def start_ray(self, user_source_directory=None, num_workers_per_node=10):
"""Start Ray on a cluster.
This method is used to start Ray on a cluster. It will ssh to the head node,
Expand All @@ -139,13 +138,14 @@ def start_ray(self, user_source_directory, num_workers_per_node=10):
workers.
Args:
user_source_directory (str): The path to the local directory containing the
user's source code. Files and directories in this directory can be used
as modules in remote functions.
user_source_directory (Optional[str]): The path to the local directory
containing the user's source code. If provided, files and directories in
this directory can be used as modules in remote functions.
num_workers_per_node (int): The number workers to start on each node.
"""
# First update the worker code on the nodes.
remote_user_source_directory = self._update_user_code(user_source_directory)
if user_source_directory is not None:
remote_user_source_directory = self._update_user_code(user_source_directory)

scripts_directory = os.path.join(self.installation_directory, "ray/scripts")
# Start the scheduler
Expand All @@ -160,16 +160,18 @@ def start_ray(self, user_source_directory, num_workers_per_node=10):
# Start the workers on each node
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
start_workers_commands = []
remote_user_source_directory_str = "\\\"{}\\\"".format(remote_user_source_directory) if user_source_directory is not None else "None"
for i, node_ip_address in enumerate(self.node_ip_addresses):
start_workers_command = """
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, user_source_directory=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory)
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory_str)
start_workers_commands.append(start_workers_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands)

setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh")
cd_location = remote_user_source_directory if user_source_directory is not None else os.path.join(self.installation_directory, "ray")
print """
The cluster has been started. You can attach to the cluster by sshing to the head node with the following command.
Expand All @@ -178,13 +180,13 @@ def start_ray(self, user_source_directory, num_workers_per_node=10):
Then run the following commands.
cd {}
source {}
source {} # Add Ray to your Python path.
Then within a Python interpreter, run the following commands.
Then within a Python interpreter or script, run the following commands.
import ray
ray.init(scheduler_address="{}:10001", objstore_address="{}:20001", driver_address="{}:30001")
""".format(self.key_file, self.username, self.node_ip_addresses[0], remote_user_source_directory, setup_env_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])
""".format(self.key_file, self.username, self.node_ip_addresses[0], cd_location, setup_env_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])

def stop_ray(self):
"""Kill all of the processes in the Ray cluster.
Expand All @@ -195,33 +197,39 @@ def stop_ray(self):
kill_cluster_command = "killall scheduler objstore python > /dev/null 2> /dev/null"
self._run_command_over_ssh_on_all_nodes_in_parallel(kill_cluster_command)

def update_ray(self):
def update_ray(self, branch=None):
"""Pull the latest Ray source code and rebuild Ray.
This method is used for updating the Ray source code on a Ray cluster. It
will ssh to each node, will pull the latest source code from the Ray
repository, and will rerun the build script (though currently it will not
rebuild the third party libraries).
Args:
branch (Optional[str]): The branch to check out. If omitted, then stay on
the current branch.
"""
ray_directory = os.path.join(self.installation_directory, "ray")
change_branch_command = "git checkout -f {}".format(branch) if branch is not None else ""
update_cluster_command = """
cd "{}" &&
{}
git fetch &&
git reset --hard "@{{upstream}}" -- &&
(make -C "./build" clean || rm -rf "./build") &&
./build.sh
""".format(ray_directory)
""".format(ray_directory, change_branch_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(update_cluster_command)

def _update_user_code(self, user_source_directory):
"""Update the user's source code on each node in the cluster.
This method is used to update the user's source code on each node in the
cluster. The local user_source_directory will be copied under ray_source_files in
the ray installation directory on the worker node. For example, if the ray
installation directory is "/d/e/f" and we call _update_source_code("~/a/b/c"),
then the contents of "~/a/b/c" on the local machine will be copied to
"/d/e/f/user_source_files/c" on each node in the cluster.
cluster. The local user_source_directory will be copied under
ray_source_files in the home directory on the worker node. For example, if
we call _update_source_code("~/a/b/c"), then the contents of "~/a/b/c" on
the local machine will be copied to "~/user_source_files/c" on each
node in the cluster.
Args:
user_source_directory (str): The path on the local machine to the directory
Expand All @@ -236,7 +244,7 @@ def _update_user_code(self, user_source_directory):
raise Exception("Directory {} does not exist.".format(user_source_directory))
# If user_source_directory is "/a/b/c", then local_directory_name is "c".
local_directory_name = os.path.split(os.path.realpath(user_source_directory))[1]
remote_directory = os.path.join(self.installation_directory, "user_source_files", local_directory_name)
remote_directory = os.path.join("user_source_files", local_directory_name)
# Remove and recreate the directory on the node.
recreate_directory_command = """
rm -r "{}";
Expand Down Expand Up @@ -290,7 +298,8 @@ def _check_ip_addresses(node_ip_addresses):
args = parser.parse_args()
username = args.username
key_file = args.key_file
installation_directory = args.installation_directory
# Install Ray in the user's home directory on the cluster.
installation_directory = "$HOME"
node_ip_addresses = []
node_private_ip_addresses = []
for line in open(args.nodes).readlines():
Expand Down

0 comments on commit 3ee0fd8

Please sign in to comment.