diff --git a/doc/source/api.rst b/doc/source/api.rst index e114d3b3c89a6..897f30c2cfd28 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -37,11 +37,19 @@ The Ray Command Line API :show-nested: .. click:: ray.scripts.scripts:create_or_update - :prog: ray create_or_update + :prog: ray up :show-nested: .. click:: ray.scripts.scripts:teardown - :prog: ray teardown + :prog: ray down + :show-nested: + +.. click:: ray.scripts.scripts:exec_cmd + :prog: ray exec + :show-nested: + +.. click:: ray.scripts.scripts:attach + :prog: ray attach :show-nested: .. click:: ray.scripts.scripts:get_head_ip diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index a3ef9a2ba3826..ab2627b51af64 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -1,7 +1,7 @@ Cloud Setup and Auto-Scaling ============================ -The ``ray create_or_update`` command starts an AWS or GCP Ray cluster from your personal computer. Once the cluster is up, you can then SSH into it to run Ray programs. +The ``ray up`` command starts or updates an AWS or GCP Ray cluster from your personal computer. Once the cluster is up, you can then SSH into it to run Ray programs. Quick start (AWS) ----------------- @@ -18,14 +18,14 @@ SSH into the head node, ``source activate tensorflow_p36``, and then run Ray pro # Create or update the cluster. When the command finishes, it will print # out the command that can be used to SSH into the cluster head node. - $ ray create_or_update ray/python/ray/autoscaler/aws/example-full.yaml + $ ray up ray/python/ray/autoscaler/aws/example-full.yaml # Reconfigure autoscaling behavior without interrupting running jobs - $ ray create_or_update ray/python/ray/autoscaler/aws/example-full.yaml \ + $ ray up ray/python/ray/autoscaler/aws/example-full.yaml \ --max-workers=N --no-restart # Teardown the cluster - $ ray teardown ray/python/ray/autoscaler/aws/example-full.yaml + $ ray down ray/python/ray/autoscaler/aws/example-full.yaml Quick start (GCP) ----------------- @@ -41,14 +41,50 @@ SSH into the head node and then run Ray programs with ``ray.init(redis_address=" # Create or update the cluster. When the command finishes, it will print # out the command that can be used to SSH into the cluster head node. - $ ray create_or_update ray/python/ray/autoscaler/gcp/example-full.yaml + $ ray up ray/python/ray/autoscaler/gcp/example-full.yaml # Reconfigure autoscaling behavior without interrupting running jobs - $ ray create_or_update ray/python/ray/autoscaler/gcp/example-full.yaml \ + $ ray up ray/python/ray/autoscaler/gcp/example-full.yaml \ --max-workers=N --no-restart # Teardown the cluster - $ ray teardown ray/python/ray/autoscaler/gcp/example-full.yaml + $ ray down ray/python/ray/autoscaler/gcp/example-full.yaml + +Running commands on new and existing clusters +--------------------------------------------- + +You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(redis_address="localhost:6379")``. + +.. code-block:: bash + + # Run a command on the cluster + $ ray exec cluster.yaml 'echo "hello world"' + + # Run a command on the cluster, starting it if needed + $ ray exec cluster.yaml 'echo "hello world"' --start + + # Run a command on the cluster, stopping the cluster after it finishes + $ ray exec cluster.yaml 'echo "hello world"' --stop + + # Run a command on a new cluster called 'experiment-1', stopping it after + $ ray exec cluster.yaml 'echo "hello world"' \ + --start --stop --cluster-name experiment-1 + + # Run a command in a screen (experimental) + $ ray exec cluster.yaml 'echo "hello world"' --screen + +Attaching to the cluster +------------------------ + +You can use ``ray attach`` to attach to an interactive console on the cluster. + +.. code-block:: bash + + # Open a screen on the cluster + $ ray attach cluster.yaml + + # Open a screen on a new cluster called 'session-1' + $ ray attach cluster.yaml --start --cluster-name=session-1 Port-forwarding applications ---------------------------- @@ -62,9 +98,9 @@ To run connect to applications running on the cluster (e.g. Jupyter notebook) us Updating your cluster --------------------- -When you run ``ray create_or_update`` with an existing cluster, the command checks if the local configuration differs from the applied configuration of the cluster. This includes any changes to synced files specified in the ``file_mounts`` section of the config. If so, the new files and config will be uploaded to the cluster. Following that, Ray services will be restarted. +When you run ``ray up`` with an existing cluster, the command checks if the local configuration differs from the applied configuration of the cluster. This includes any changes to synced files specified in the ``file_mounts`` section of the config. If so, the new files and config will be uploaded to the cluster. Following that, Ray services will be restarted. -You can also run ``ray create_or_update`` to restart a cluster if it seems to be in a bad state (this will restart all Ray services even if there are no config changes). +You can also run ``ray up`` to restart a cluster if it seems to be in a bad state (this will restart all Ray services even if there are no config changes). If you don't want the update to restart services (e.g. because the changes don't require a restart), pass ``--no-restart`` to the update call. @@ -115,11 +151,11 @@ A common use case is syncing a particular local git branch to all workers of the - test -e || git clone https://github.com//.git - cd && git fetch && git checkout `cat /tmp/current_branch_sha` -This tells ``ray create_or_update`` to sync the current git branch SHA from your personal computer to a temporary file on the cluster (assuming you've pushed the branch head already). Then, the setup commands read that file to figure out which SHA they should checkout on the nodes. Note that each command runs in its own session. The final workflow to update the cluster then becomes just this: +This tells ``ray up`` to sync the current git branch SHA from your personal computer to a temporary file on the cluster (assuming you've pushed the branch head already). Then, the setup commands read that file to figure out which SHA they should checkout on the nodes. Note that each command runs in its own session. The final workflow to update the cluster then becomes just this: 1. Make local changes to a git branch 2. Commit the changes with ``git commit`` and ``git push`` -3. Update files on your Ray cluster with ``ray create_or_update`` +3. Update files on your Ray cluster with ``ray up`` Common cluster configurations ----------------------------- @@ -174,7 +210,7 @@ with GPU worker nodes instead. .. code-block:: yaml - min_workers: 0 + min_workers: 1 # must have at least 1 GPU worker (issue #2106) max_workers: 10 head_node: InstanceType: m4.large diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index 7dbe039ab744a..19b883746ed16 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -73,3 +73,11 @@ Package Reference * `ray.rllib.models `__ * `ray.rllib.optimizers `__ * `ray.rllib.utils `__ + +Troubleshooting +--------------- + +If you encounter errors like +`blas_thread_init: pthread_create: Resource temporarily unavailable` when using many workers, +try setting ``OMP_NUM_THREADS=1``. Similarly, check configured system limits with +`ulimit -a` for other resource limit errors. diff --git a/doc/source/using-ray-on-a-cluster.rst b/doc/source/using-ray-on-a-cluster.rst index fb52d0939099f..29c2585ac7cfe 100644 --- a/doc/source/using-ray-on-a-cluster.rst +++ b/doc/source/using-ray-on-a-cluster.rst @@ -1,9 +1,9 @@ -Using Ray on a Cluster -====================== +Manual Cluster Setup +==================== .. note:: - If you're using AWS you can use the automated `setup commands `__. + If you're using AWS or GCP you should use the automated `setup commands `__. The instructions in this document work well for small clusters. For larger clusters, follow the instructions for `managing a cluster with parallel ssh`_. diff --git a/doc/source/using-ray-on-a-large-cluster.rst b/doc/source/using-ray-on-a-large-cluster.rst index dfbc0011971e2..c3d6d8a8d2389 100644 --- a/doc/source/using-ray-on-a-large-cluster.rst +++ b/doc/source/using-ray-on-a-large-cluster.rst @@ -1,9 +1,9 @@ -Using Ray on a Large Cluster -============================ +Manual Cluster Setup on a Large Cluster +======================================= .. note:: - If you're using AWS you can use the automated `setup commands `__. + If you're using AWS or GCP you should use the automated `setup commands `__. Deploying Ray on a cluster requires a bit of manual work. The instructions here illustrate how to use parallel ssh commands to simplify the process of running diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 873aa48adcf8b..91c8eb1bde387 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -188,6 +188,9 @@ def _info(self): max_frac = frac nodes_used += max_frac idle_times = [now - t for t in self.last_used_time_by_ip.values()] + heartbeat_times = [ + now - t for t in self.last_heartbeat_time_by_ip.values() + ] return { "ResourceUsage": ", ".join([ "{}/{} {}".format( @@ -201,6 +204,10 @@ def _info(self): int(np.min(idle_times)) if idle_times else -1, int(np.mean(idle_times)) if idle_times else -1, int(np.max(idle_times)) if idle_times else -1), + "TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format( + int(np.min(heartbeat_times)) if heartbeat_times else -1, + int(np.mean(heartbeat_times)) if heartbeat_times else -1, + int(np.max(heartbeat_times)) if heartbeat_times else -1), } @@ -504,14 +511,17 @@ def update_if_needed(self, node_id): return if self.files_up_to_date(node_id): return - if self.config.get("no_restart", False) and \ - self.num_successful_updates.get(node_id, 0) > 0: + successful_updated = self.num_successful_updates.get(node_id, 0) > 0 + if successful_updated and self.config.get("restart_only", False): + init_commands = self.config["worker_start_ray_commands"] + elif successful_updated and self.config.get("no_restart", False): init_commands = (self.config["setup_commands"] + self.config["worker_setup_commands"]) else: init_commands = (self.config["setup_commands"] + self.config["worker_setup_commands"] + self.config["worker_start_ray_commands"]) + updater = self.node_updater_cls( node_id, self.config["provider"], diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index c3b103a0e0ce3..236541ec1abf9 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -3,7 +3,9 @@ from __future__ import print_function import copy +import hashlib import json +import os import tempfile import time import sys @@ -24,17 +26,31 @@ def create_or_update_cluster(config_file, override_min_workers, - override_max_workers, no_restart, yes): + override_max_workers, no_restart, restart_only, + yes, override_cluster_name): """Create or updates an autoscaling Ray cluster from a config json.""" config = yaml.load(open(config_file).read()) - validate_config(config) - config = fillout_defaults(config) - if override_min_workers is not None: config["min_workers"] = override_min_workers if override_max_workers is not None: config["max_workers"] = override_max_workers + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name + config = _bootstrap_config(config) + get_or_create_head_node(config, config_file, no_restart, restart_only, yes) + + +def _bootstrap_config(config): + hasher = hashlib.sha1() + hasher.update(json.dumps([config], sort_keys=True).encode("utf-8")) + cache_key = os.path.join(tempfile.gettempdir(), + "ray-config-{}".format(hasher.hexdigest())) + if os.path.exists(cache_key): + print("Cached settings:", cache_key) + return json.loads(open(cache_key).read()) + validate_config(config) + config = fillout_defaults(config) importer = NODE_PROVIDERS.get(config["provider"]["type"]) if not importer: @@ -42,36 +58,41 @@ def create_or_update_cluster(config_file, override_min_workers, config["provider"])) bootstrap_config, _ = importer() - config = bootstrap_config(config) - get_or_create_head_node(config, no_restart, yes) + resolved_config = bootstrap_config(config) + with open(cache_key, "w") as f: + f.write(json.dumps(resolved_config)) + return resolved_config -def teardown_cluster(config_file, yes): +def teardown_cluster(config_file, yes, workers_only, override_cluster_name): """Destroys all nodes of a Ray cluster described by a config json.""" config = yaml.load(open(config_file).read()) + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name validate_config(config) config = fillout_defaults(config) confirm("This will destroy your cluster", yes) provider = get_node_provider(config["provider"], config["cluster_name"]) - head_node_tags = { - TAG_RAY_NODE_TYPE: "head", - } - for node in provider.nodes(head_node_tags): - print("Terminating head node {}".format(node)) - provider.terminate_node(node) - nodes = provider.nodes({}) + + if not workers_only: + for node in provider.nodes({TAG_RAY_NODE_TYPE: "head"}): + print("Terminating head node {}".format(node)) + provider.terminate_node(node) + + nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) while nodes: for node in nodes: print("Terminating worker {}".format(node)) provider.terminate_node(node) time.sleep(5) - nodes = provider.nodes({}) + nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) -def get_or_create_head_node(config, no_restart, yes): +def get_or_create_head_node(config, config_file, no_restart, restart_only, + yes): """Create the cluster head node, which in turn creates the workers.""" provider = get_node_provider(config["provider"], config["cluster_name"]) @@ -133,7 +154,9 @@ def get_or_create_head_node(config, no_restart, yes): "~/ray_bootstrap_config.yaml": remote_config_file.name }) - if no_restart: + if restart_only: + init_commands = config["head_start_ray_commands"] + elif no_restart: init_commands = ( config["setup_commands"] + config["head_setup_commands"]) else: @@ -170,20 +193,82 @@ def get_or_create_head_node(config, no_restart, yes): monitor_str = "docker exec {} /bin/sh -c {}".format( config["docker"]["container_name"], quote(monitor_str)) print("To monitor auto-scaling activity, you can run:\n\n" - " ssh -i {} {}@{} {}\n".format(config["auth"]["ssh_private_key"], - config["auth"]["ssh_user"], - provider.external_ip(head_node), - quote(monitor_str))) + " ray exec {} {} --cluster-name={}\n".format( + config_file, quote(monitor_str), quote(config["cluster_name"]))) print("To login to the cluster, run:\n\n" " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"], config["auth"]["ssh_user"], provider.external_ip(head_node))) -def get_head_node_ip(config_file): +def attach_cluster(config_file, start, override_cluster_name): + """Attaches to a screen for the specified cluster. + + Arguments: + config_file: path to the cluster yaml + start: whether to start the cluster if it isn't up + override_cluster_name: set the name of the cluster + """ + + exec_cluster(config_file, "screen -L -xRR", False, False, start, + override_cluster_name) + + +def exec_cluster(config_file, cmd, screen, stop, start, override_cluster_name): + """Runs a command on the specified cluster. + + Arguments: + config_file: path to the cluster yaml + cmd: command to run + screen: whether to run in a screen + stop: whether to stop the cluster after command run + start: whether to start the cluster if it isn't up + override_cluster_name: set the name of the cluster + """ + + config = yaml.load(open(config_file).read()) + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name + config = _bootstrap_config(config) + head_node = _get_head_node(config, config_file, create_if_needed=start) + updater = NodeUpdaterProcess( + head_node, + config["provider"], + config["auth"], + config["cluster_name"], + config["file_mounts"], [], + "", + redirect_output=False) + if stop: + cmd += ("; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes " + "--workers-only; sudo shutdown -h now") + _exec(updater, cmd, screen, expect_error=stop) + + +def _exec(updater, cmd, screen, expect_error=False): + if cmd: + if screen: + cmd = [ + "screen", "-L", "-dm", "bash", "-c", + quote(cmd + "; exec bash") + ] + cmd = " ".join(cmd) + updater.ssh_cmd( + cmd, verbose=True, allocate_tty=True, expect_error=expect_error) + + +def get_head_node_ip(config_file, override_cluster_name): """Returns head node IP for given configuration file if exists.""" config = yaml.load(open(config_file).read()) + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name + provider = get_node_provider(config["provider"], config["cluster_name"]) + head_node = _get_head_node(config, config_file) + return provider.external_ip(head_node) + + +def _get_head_node(config, config_file, create_if_needed=False): provider = get_node_provider(config["provider"], config["cluster_name"]) head_node_tags = { TAG_RAY_NODE_TYPE: "head", @@ -191,7 +276,15 @@ def get_head_node_ip(config_file): nodes = provider.nodes(head_node_tags) if len(nodes) > 0: head_node = nodes[0] - return provider.external_ip(head_node) + return head_node + elif create_if_needed: + get_or_create_head_node( + config, + config_file, + restart_only=False, + no_restart=False, + yes=True) + return _get_head_node(config, config_file, create_if_needed=False) else: print("Head node of cluster ({}) not found!".format( config["cluster_name"])) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 9972d2424e8ae..d5838f7c1b622 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -181,19 +181,34 @@ def do_update(self): for cmd in self.setup_cmds: self.ssh_cmd(cmd, verbose=True) - def ssh_cmd(self, cmd, connect_timeout=120, redirect=None, verbose=False): + def ssh_cmd(self, + cmd, + connect_timeout=120, + redirect=None, + verbose=False, + allocate_tty=False, + emulate_interactive=True, + expect_error=False): if verbose: print( "NodeUpdater: running {} on {}...".format( pretty_cmd(cmd), self.ssh_ip), file=self.stdout) - force_interactive = "set -i || true && source ~/.bashrc && " - self.process_runner.check_call( - [ - "ssh", "-o", "ConnectTimeout={}s".format(connect_timeout), - "-o", "StrictHostKeyChecking=no", "-i", self.ssh_private_key, - "{}@{}".format(self.ssh_user, self.ssh_ip), - "bash --login -c {}".format(quote(force_interactive + cmd)) + ssh = ["ssh"] + if allocate_tty: + ssh.append("-tt") + if emulate_interactive: + force_interactive = "set -i || true && source ~/.bashrc && " + cmd = "bash --login -c {}".format(quote(force_interactive + cmd)) + if expect_error: + call = self.process_runner.call + else: + call = self.process_runner.check_call + call( + ssh + [ + "-o", "ConnectTimeout={}s".format(connect_timeout), "-o", + "StrictHostKeyChecking=no", "-i", self.ssh_private_key, + "{}@{}".format(self.ssh_user, self.ssh_ip), cmd ], stdout=redirect or self.stdout, stderr=redirect or self.stderr) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index be9d0f25202db..70743dfd70729 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -316,8 +316,8 @@ def local_scheduler_info_handler(self, unused_channel, data): if ip: self.load_metrics.update(ip, static_resources, dynamic_resources) else: - print("Warning: could not find ip for client {}." - .format(client_id)) + print("Warning: could not find ip for client {} in {}.".format( + client_id, self.local_scheduler_id_to_ip_map)) def xray_heartbeat_handler(self, unused_channel, data): """Handle an xray heartbeat message from Redis.""" @@ -342,8 +342,8 @@ def xray_heartbeat_handler(self, unused_channel, data): if ip: self.load_metrics.update(ip, static_resources, dynamic_resources) else: - print("Warning: could not find ip for client {}." - .format(client_id)) + print("Warning: could not find ip for client {} in {}.".format( + client_id, self.local_scheduler_id_to_ip_map)) def plasma_manager_heartbeat_handler(self, unused_channel, data): """Handle a plasma manager heartbeat from Redis. diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index c7c3d2b7889a2..a15295d84f955 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -8,7 +8,8 @@ import subprocess import ray.services as services -from ray.autoscaler.commands import (create_or_update_cluster, +from ray.autoscaler.commands import (attach_cluster, exec_cluster, + create_or_update_cluster, teardown_cluster, get_head_node_ip) import ray.utils @@ -370,6 +371,12 @@ def stop(): default=False, help=("Whether to skip restarting Ray services during the update. " "This avoids interrupting running jobs.")) +@click.option( + "--restart-only", + is_flag=True, + default=False, + help=("Whether to skip running setup commands and only restart Ray. " + "This cannot be used with 'no-restart'.")) @click.option( "--min-workers", required=False, @@ -380,6 +387,11 @@ def stop(): required=False, type=int, help=("Override the configured max worker node count for the cluster.")) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) @click.option( "--yes", "-y", @@ -387,33 +399,98 @@ def stop(): default=False, help=("Don't ask for confirmation.")) def create_or_update(cluster_config_file, min_workers, max_workers, no_restart, - yes): + restart_only, yes, cluster_name): + if restart_only or no_restart: + assert restart_only != no_restart, "Cannot set both 'restart_only' " \ + "and 'no_restart' at the same time!" create_or_update_cluster(cluster_config_file, min_workers, max_workers, - no_restart, yes) + no_restart, restart_only, yes, cluster_name) @click.command() @click.argument("cluster_config_file", required=True, type=str) +@click.option( + "--workers-only", + is_flag=True, + default=False, + help=("Only destroy the workers.")) @click.option( "--yes", "-y", is_flag=True, default=False, help=("Don't ask for confirmation.")) -def teardown(cluster_config_file, yes): - teardown_cluster(cluster_config_file, yes) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) +def teardown(cluster_config_file, yes, workers_only, cluster_name): + teardown_cluster(cluster_config_file, yes, workers_only, cluster_name) @click.command() @click.argument("cluster_config_file", required=True, type=str) -def get_head_ip(cluster_config_file): - click.echo(get_head_node_ip(cluster_config_file)) +@click.option( + "--start", + is_flag=True, + default=False, + help=("Start the cluster if needed.")) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) +def attach(cluster_config_file, start, cluster_name): + attach_cluster(cluster_config_file, start, cluster_name) + + +@click.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.argument("cmd", required=True, type=str) +@click.option( + "--stop", + is_flag=True, + default=False, + help=("Stop the cluster after the command finishes running.")) +@click.option( + "--start", + is_flag=True, + default=False, + help=("Start the cluster if needed.")) +@click.option( + "--screen", + is_flag=True, + default=False, + help=("Run the command in a screen.")) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) +def exec_cmd(cluster_config_file, cmd, screen, stop, start, cluster_name): + exec_cluster(cluster_config_file, cmd, screen, stop, start, cluster_name) + + +@click.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) +def get_head_ip(cluster_config_file, cluster_name): + click.echo(get_head_node_ip(cluster_config_file, cluster_name)) cli.add_command(start) cli.add_command(stop) cli.add_command(create_or_update) +cli.add_command(create_or_update, name="up") +cli.add_command(attach) +cli.add_command(exec_cmd, name="exec") cli.add_command(teardown) +cli.add_command(teardown, name="down") cli.add_command(get_head_ip)