From 3ee0fd8f34e11e445c51dbdf0ad7798c5c2637dc Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Thu, 4 Aug 2016 09:14:20 -0700 Subject: [PATCH] Update cluster guide (#347) * clarify cluster setup instructions * update multinode documentation, update cluster script, fix minor bug in worker.py * clarify cluster documentation and fix update_user_code --- .gitignore | 3 + doc/using-ray-on-a-cluster.md | 108 +++++++++++++++++++++++----------- lib/python/ray/worker.py | 3 +- scripts/cluster.py | 47 +++++++++------ 4 files changed, 106 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index e9282b725fed..a7f8b5620548 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,6 @@ *_pb2.py *.pb.h *.pb.cc + +# Ray cluster configuration +scripts/nodes.txt diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index c15f2ab7b746..bf31a923f4c0 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -21,12 +21,18 @@ permissions for the private key file to `600` (i.e. only you can read and write it) so that `ssh` will work. - Whenever you want to use the `ec2.py` script, set the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to your Amazon EC2 access key ID -and secret access key. These can be obtained from the [AWS -homepage](http://aws.amazon.com/) by clicking Account > Security Credentials > -Access Credentials. +and secret access key. These can be generated from the [AWS +homepage](http://aws.amazon.com/) by clicking My Account > Security Credentials > +Access Keys, or by [creating an IAM user](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html). ### Launching a Cluster +- Install the required dependencies on the machine you will be using to run the +cluster launch scripts. + ``` + sudo pip install --upgrade boto + ``` + - Go into the `ray/scripts` directory. - Run `python ec2.py -k -i -s launch `, where `` is the name of your EC2 key pair (that you @@ -39,7 +45,13 @@ and `` is the name to give to your cluster. ```bash export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123 - python ec2.py --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 launch my-ray-cluster + python ec2.py --key-pair=awskey \ + --identity-file=awskey.pem \ + --region=us-west-1 \ + --instance-type=c4.4xlarge \ + --spot-price=2.50 \ + --slaves=1 \ + launch my-ray-cluster ``` The following options are worth pointing out: @@ -55,6 +67,9 @@ capacity in one zone, and you should try to launch in another. - `--spot-price=` will launch the worker nodes as [Spot Instances](http://aws.amazon.com/ec2/spot-instances/), bidding for the given maximum price (in dollars). +- `--slaves=` will launch a cluster with `(1 + num_slaves)` instances. +The first instance is the head node, which in addition to hosting workers runs the +Ray scheduler and application driver programs. ## Getting started with Ray on a cluster @@ -70,24 +85,32 @@ cluster. For example 12.34.56.789 12.34.567.89 The first node in the file is the "head" node. The scheduler will be started on -the head node, and the driver should run on the head node as well. +the head node, and the driver should run on the head node as well. If the nodes +have public and private IP addresses (as in the case of EC2 instances), you can +list the `, ` in `nodes.txt` like + + 12.34.56.789, 98.76.54.321 + 12.34.567.89, 98.76.543.21 +The `cluster.py` administrative script will use the public IP addresses to ssh +to the nodes. Ray will use the private IP addresses to send messages between the +nodes during execution. 2. Make sure that the nodes can all communicate with one another. On EC2, this -can be done by creating a new security group and adding the inbound rule "all -traffic" and adding the outbound rule "all traffic". Then add all of the nodes -in your cluster to that security group. +can be done by creating a new security group with the appropriate inbound and +outbound rules and adding all of the nodes in your cluster to that security +group. This is done automatically by the `ec2.py` script. If you have used the +`ec2.py` script you can log into the hosts with the username `ubuntu`. -3. Run something like +3. From the `ray/scripts` directory, run something like ``` - python scripts/cluster.py --nodes nodes.txt \ - --key-file key.pem \ - --username ubuntu \ - --installation-directory /home/ubuntu/ + python cluster.py --nodes=nodes.txt \ + --key-file=awskey.pem \ + --username=ubuntu ``` -where you replace `nodes.txt`, `key.pem`, `ubuntu`, and `/home/ubuntu/` by the -appropriate values. This assumes that you can connect to each IP address -`` in `nodes.txt` with the command +where you replace `nodes.txt`, `key.pem`, and `ubuntu` by the appropriate +values. This assumes that you can connect to each IP address `` in +`nodes.txt` with the command ``` ssh -i @ ``` @@ -95,23 +118,42 @@ appropriate values. This assumes that you can connect to each IP address cluster, run `cluster.install_ray()` in the interpreter. The interpreter should block until the installation has completed. The standard output from the nodes will be redirected to your terminal. -5. To check that the installation succeeded, you can ssh to each node, cd into -the directory `ray/test/`, and run the tests (e.g., `python runtest.py`). -6. Start the cluster (the scheduler, object stores, and workers) with the -command `cluster.start_ray("~/example_ray_code")`, where the argument is -the local path to the directory that contains your Python code. This command will -copy this source code to each node and will start the cluster. Each worker that -is started will have a local copy of the ~/example_ray_code directory in their -PYTHONPATH. After completing successfully, you can connect to the ssh to the -head node and attach a shell to the cluster by first running the following code. +5. To check that the installation succeeded, you can ssh to each node and run +the tests. ``` - cd "$RAY_HOME/../user_source_files/example_ray_code"; - source "$RAY_HOME/setup-env.sh"; + cd $HOME/ray/ + source setup-env.sh # Add Ray to your Python path. + python test/runtest.py # This tests basic functionality. + python test/array_test.py # This tests some array libraries. + ``` + +6. Start the cluster with `cluster.start_ray()`. If you would like to deploy +source code to it, you can pass in the local path to the directory that contains +your Python code. For example, `cluster.start_ray("~/example_ray_code")`. This +will copy your source code to each node on the cluster, placing it in a +directory on the PYTHONPATH. + +The `cluster.start_ray` command will start the Ray scheduler, object stores, and +workers, and before finishing it will print instructions for connecting to the +cluster via ssh. + +7. To connect to the cluster (either with a Python shell or with a script), ssh +to the cluster's head node (as described by the output of the +`cluster.start_ray` command. E.g., ``` -Then within Python, run the following. - ```python - import ray - ray.init(scheduler_address="12.34.56.789:10001", objstore_address="12.34.56.789:20001", driver_address="12.34.56.789:30001") + The cluster has been started. You can attach to the cluster by sshing to the head node with the following command. + + ssh -i awskey.pem ubuntu@12.34.56.789 + + Then run the following commands. + + cd $HOME/ray + source $HOME/ray/setup-env.sh # Add Ray to your Python path. + + Then within a Python interpreter, run the following commands. + + import ray + ray.init(scheduler_address="98.76.54.321:10001", objstore_address="98.76.54.321:20001", driver_address="98.76.54.321:30001") ``` 7. Note that there are several more commands that can be run from within @@ -126,7 +168,3 @@ Then within Python, run the following. processes). - `cluster.update_ray()` - This pulls the latest Ray source code and builds it. - -Once you've started a Ray cluster using the above instructions, to actually use -Ray, ssh to the head node (the first node listed in the `nodes.txt` file) and -attach a shell to the already running cluster. diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index c750327bd402..87ae3a76a3d2 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -658,7 +658,7 @@ def register_module(module, worker=global_worker): _logger().info("registering {}.".format(val.func_name)) worker.register_function(val) -def init(start_ray_local=False, num_workers=None, num_objstores=1, scheduler_address=None, objstore_address=None, driver_address=None, driver_mode=SCRIPT_MODE): +def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_address=None, objstore_address=None, driver_address=None, driver_mode=SCRIPT_MODE): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -694,6 +694,7 @@ def init(start_ray_local=False, num_workers=None, num_objstores=1, scheduler_add if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]: raise Exception("If start_ray_local=True, then driver_mode must be in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE].") num_workers = 1 if num_workers is None else num_workers + num_objstores = 1 if num_objstores is None else num_objstores # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. scheduler_address, objstore_addresses, driver_addresses = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None) diff --git a/scripts/cluster.py b/scripts/cluster.py index ddf29639e138..8d656d68aa0a 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -10,7 +10,6 @@ parser.add_argument("--nodes", type=str, required=True, help="Test file with node IP addresses, one line per address.") parser.add_argument("--key-file", type=str, required=True, help="Path to the file that contains the private key.") parser.add_argument("--username", type=str, required=True, help="User name for logging in.") -parser.add_argument("--installation-directory", type=str, required=True, help="The directory in which to install Ray.") class RayCluster(object): """A class for setting up, starting, and stopping Ray on a cluster. @@ -130,7 +129,7 @@ def install_ray(self): """.format(self.installation_directory, self.installation_directory) self._run_command_over_ssh_on_all_nodes_in_parallel(install_ray_command) - def start_ray(self, user_source_directory, num_workers_per_node=10): + def start_ray(self, user_source_directory=None, num_workers_per_node=10): """Start Ray on a cluster. This method is used to start Ray on a cluster. It will ssh to the head node, @@ -139,13 +138,14 @@ def start_ray(self, user_source_directory, num_workers_per_node=10): workers. Args: - user_source_directory (str): The path to the local directory containing the - user's source code. Files and directories in this directory can be used - as modules in remote functions. + user_source_directory (Optional[str]): The path to the local directory + containing the user's source code. If provided, files and directories in + this directory can be used as modules in remote functions. num_workers_per_node (int): The number workers to start on each node. """ # First update the worker code on the nodes. - remote_user_source_directory = self._update_user_code(user_source_directory) + if user_source_directory is not None: + remote_user_source_directory = self._update_user_code(user_source_directory) scripts_directory = os.path.join(self.installation_directory, "ray/scripts") # Start the scheduler @@ -160,16 +160,18 @@ def start_ray(self, user_source_directory, num_workers_per_node=10): # Start the workers on each node # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " start_workers_commands = [] + remote_user_source_directory_str = "\\\"{}\\\"".format(remote_user_source_directory) if user_source_directory is not None else "None" for i, node_ip_address in enumerate(self.node_ip_addresses): start_workers_command = """ cd "{}"; source ../setup-env.sh; python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, user_source_directory=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory) + """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory_str) start_workers_commands.append(start_workers_command) self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh") + cd_location = remote_user_source_directory if user_source_directory is not None else os.path.join(self.installation_directory, "ray") print """ The cluster has been started. You can attach to the cluster by sshing to the head node with the following command. @@ -178,13 +180,13 @@ def start_ray(self, user_source_directory, num_workers_per_node=10): Then run the following commands. cd {} - source {} + source {} # Add Ray to your Python path. - Then within a Python interpreter, run the following commands. + Then within a Python interpreter or script, run the following commands. import ray ray.init(scheduler_address="{}:10001", objstore_address="{}:20001", driver_address="{}:30001") - """.format(self.key_file, self.username, self.node_ip_addresses[0], remote_user_source_directory, setup_env_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) + """.format(self.key_file, self.username, self.node_ip_addresses[0], cd_location, setup_env_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) def stop_ray(self): """Kill all of the processes in the Ray cluster. @@ -195,33 +197,39 @@ def stop_ray(self): kill_cluster_command = "killall scheduler objstore python > /dev/null 2> /dev/null" self._run_command_over_ssh_on_all_nodes_in_parallel(kill_cluster_command) - def update_ray(self): + def update_ray(self, branch=None): """Pull the latest Ray source code and rebuild Ray. This method is used for updating the Ray source code on a Ray cluster. It will ssh to each node, will pull the latest source code from the Ray repository, and will rerun the build script (though currently it will not rebuild the third party libraries). + + Args: + branch (Optional[str]): The branch to check out. If omitted, then stay on + the current branch. """ ray_directory = os.path.join(self.installation_directory, "ray") + change_branch_command = "git checkout -f {}".format(branch) if branch is not None else "" update_cluster_command = """ cd "{}" && + {} git fetch && git reset --hard "@{{upstream}}" -- && (make -C "./build" clean || rm -rf "./build") && ./build.sh - """.format(ray_directory) + """.format(ray_directory, change_branch_command) self._run_command_over_ssh_on_all_nodes_in_parallel(update_cluster_command) def _update_user_code(self, user_source_directory): """Update the user's source code on each node in the cluster. This method is used to update the user's source code on each node in the - cluster. The local user_source_directory will be copied under ray_source_files in - the ray installation directory on the worker node. For example, if the ray - installation directory is "/d/e/f" and we call _update_source_code("~/a/b/c"), - then the contents of "~/a/b/c" on the local machine will be copied to - "/d/e/f/user_source_files/c" on each node in the cluster. + cluster. The local user_source_directory will be copied under + ray_source_files in the home directory on the worker node. For example, if + we call _update_source_code("~/a/b/c"), then the contents of "~/a/b/c" on + the local machine will be copied to "~/user_source_files/c" on each + node in the cluster. Args: user_source_directory (str): The path on the local machine to the directory @@ -236,7 +244,7 @@ def _update_user_code(self, user_source_directory): raise Exception("Directory {} does not exist.".format(user_source_directory)) # If user_source_directory is "/a/b/c", then local_directory_name is "c". local_directory_name = os.path.split(os.path.realpath(user_source_directory))[1] - remote_directory = os.path.join(self.installation_directory, "user_source_files", local_directory_name) + remote_directory = os.path.join("user_source_files", local_directory_name) # Remove and recreate the directory on the node. recreate_directory_command = """ rm -r "{}"; @@ -290,7 +298,8 @@ def _check_ip_addresses(node_ip_addresses): args = parser.parse_args() username = args.username key_file = args.key_file - installation_directory = args.installation_directory + # Install Ray in the user's home directory on the cluster. + installation_directory = "$HOME" node_ip_addresses = [] node_private_ip_addresses = [] for line in open(args.nodes).readlines():