Skip to content

Commit

Permalink
Refactor autoscaler tagging to support multiple tag specs
Browse files Browse the repository at this point in the history
  • Loading branch information
hartikainen committed May 19, 2018
1 parent d71d4e4 commit 9cf4840
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 58 deletions.
17 changes: 8 additions & 9 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
get_default_config
from ray.autoscaler.updater import NodeUpdaterProcess
from ray.autoscaler.docker import dockerize_if_needed
from ray.autoscaler.tags import TAG_RAY_LAUNCH_CONFIG, \
TAG_RAY_RUNTIME_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, TAG_NAME
import ray.services as services

REQUIRED, OPTIONAL = True, False
Expand Down Expand Up @@ -375,13 +373,14 @@ def target_num_workers(self):

def launch_config_ok(self, node_id):
launch_conf = self.provider.node_tags(node_id).get(
TAG_RAY_LAUNCH_CONFIG)
self.provider.tag_keys['launch-config'])
if self.launch_hash != launch_conf:
return False
return True

def files_up_to_date(self, node_id):
applied = self.provider.node_tags(node_id).get(TAG_RAY_RUNTIME_CONFIG)
applied = self.provider.node_tags(node_id).get(
self.provider.tag_keys['runtime-config'])
if applied != self.runtime_hash:
print(
"StandardAutoscaler: {} has runtime state {}, want {}".format(
Expand Down Expand Up @@ -451,17 +450,17 @@ def launch_new_node(self, count):
num_before = len(self.workers())
self.provider.create_node(
self.config["worker_nodes"], {
TAG_NAME: "ray-{}-worker".format(self.config["cluster_name"]),
TAG_RAY_NODE_TYPE: "Worker",
TAG_RAY_NODE_STATUS: "Uninitialized",
TAG_RAY_LAUNCH_CONFIG: self.launch_hash,
self.provider.tag_keys['node-name']: "ray-{}-worker".format(self.config["cluster_name"]),
self.provider.tag_keys['node-type']: self.provider.tag_values["worker"],
self.provider.tag_keys['node-status']: self.provider.tag_values["uninitialized"],
self.provider.tag_keys['launch-config']: self.launch_hash,
}, count)
if len(self.workers()) <= num_before:
print("Warning: Num nodes failed to increase after node creation")

def workers(self):
return self.provider.nodes(tag_filters={
TAG_RAY_NODE_TYPE: "Worker",
self.provider.tag_keys['node-type']: self.provider.tag_values["worker"],
})

def debug_string(self, nodes=None):
Expand Down
9 changes: 6 additions & 3 deletions python/ray/autoscaler/aws/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from botocore.config import Config

from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME
from ray.autoscaler.aws.tags import TAG_KEYS, TAG_VALUES
from ray.ray_constants import BOTO_MAX_RETRIES


Expand All @@ -17,6 +17,9 @@ def __init__(self, provider_config, cluster_name):
self.ec2 = boto3.resource(
"ec2", region_name=provider_config["region"], config=config)

self.tag_keys = TAG_KEYS.copy()
self.tag_values = TAG_VALUES.copy()

# Cache of node objects from the last nodes() call. This avoids
# excessive DescribeInstances requests.
self.cached_nodes = {}
Expand All @@ -32,7 +35,7 @@ def nodes(self, tag_filters):
"Values": ["pending", "running"],
},
{
"Name": "tag:{}".format(TAG_RAY_CLUSTER_NAME),
"Name": "tag:{}".format(self.tag_keys['cluster-name']),
"Values": [self.cluster_name],
},
]
Expand Down Expand Up @@ -92,7 +95,7 @@ def set_node_tags(self, node_id, tags):
def create_node(self, node_config, tags, count):
conf = node_config.copy()
tag_pairs = [{
"Key": TAG_RAY_CLUSTER_NAME,
"Key": self.tag_keys['cluster-name'],
"Value": self.cluster_name,
}]
for k, v in tags.items():
Expand Down
31 changes: 31 additions & 0 deletions python/ray/autoscaler/aws/tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""The Ray autoscaler uses tags/labels to associate metadata with instances."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

TAG_KEYS = {
# Tag uniquely identifying all nodes of a cluster
'node-name': 'ray:NodeName',
# Tag for the type of node (e.g. Head, Worker)
'node-type': 'ray:NodeType',
# Tag that reports the current state of the node (e.g. Updating, Up-to-date)
'node-status': 'ray:NodeStatus',
'cluster-name': 'ray:ClusterName',
# Hash of the node launch config, used to identify out-of-date nodes
'launch-config': 'ray:LaunchConfig',
# Hash of the node runtime config, used to determine if updates are needed
'runtime-config': 'ray:RuntimeConfig',
}


TAG_VALUES = {
'head': 'Head',
'waiting-for-ssh': 'WaitingForSSH',
'update-failed': 'UpdateFailed',
'setting-up': 'SettingUp',
'syncing-files': 'SyncingFiles',
'up-to-date': 'Up-To-Date',
'worker': 'Worker',
'uninitialized': 'Uninitialized',
}
16 changes: 8 additions & 8 deletions python/ray/autoscaler/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
from ray.autoscaler.autoscaler import validate_config, hash_runtime_conf, \
hash_launch_conf, fillout_defaults
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
TAG_NAME
from ray.autoscaler.updater import NodeUpdaterProcess


Expand Down Expand Up @@ -57,7 +55,7 @@ def teardown_cluster(config_file, yes):

provider = get_node_provider(config["provider"], config["cluster_name"])
head_node_tags = {
TAG_RAY_NODE_TYPE: "Head",
provider.tag_keys['node-type']: provider.tag_values['head'],
}
for node in provider.nodes(head_node_tags):
print("Terminating head node {}".format(node))
Expand All @@ -76,7 +74,7 @@ def get_or_create_head_node(config, no_restart, yes):

provider = get_node_provider(config["provider"], config["cluster_name"])
head_node_tags = {
TAG_RAY_NODE_TYPE: "Head",
provider.tag_keys['node-type']: provider.tag_values['head'],
}
nodes = provider.nodes(head_node_tags)
if len(nodes) > 0:
Expand All @@ -91,14 +89,16 @@ def get_or_create_head_node(config, no_restart, yes):

launch_hash = hash_launch_conf(config["head_node"], config["auth"])
if head_node is None or provider.node_tags(head_node).get(
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
provider.tag_keys['launch-config']) != launch_hash:
if head_node is not None:
confirm("Head node config out-of-date. It will be terminated", yes)
print("Terminating outdated head node {}".format(head_node))
provider.terminate_node(head_node)
print("Launching new head node...")
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
head_node_tags[TAG_NAME] = "ray-{}-head".format(config["cluster_name"])
head_node_tags.update({
provider.tag_keys['launch-config']: launch_hash,
provider.tag_keys['node-name']: "ray-{}-head".format(config["cluster_name"])
})
provider.create_node(config["head_node"], head_node_tags, 1)

nodes = provider.nodes(head_node_tags)
Expand Down Expand Up @@ -187,7 +187,7 @@ def get_head_node_ip(config_file):
config = yaml.load(open(config_file).read())
provider = get_node_provider(config["provider"], config["cluster_name"])
head_node_tags = {
TAG_RAY_NODE_TYPE: "Head",
provider.tag_keys['node-type']: provider.tag_values['head'],
}
nodes = provider.nodes(head_node_tags)
if len(nodes) > 0:
Expand Down
12 changes: 8 additions & 4 deletions python/ray/autoscaler/gcp/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
compute = discovery.build('compute', 'v1')

from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_NAME
from ray.autoscaler.gcp.config import wait_for_compute_zone_operation
from ray.autoscaler.gcp.tags import TAG_KEYS, TAG_VALUES
from ray.ray_constants import BOTO_MAX_RETRIES

TERMINATED_STATES = (
Expand All @@ -22,6 +22,9 @@ class GCPNodeProvider(NodeProvider):
def __init__(self, provider_config, cluster_name):
NodeProvider.__init__(self, provider_config, cluster_name)

self.tag_keys = TAG_KEYS.copy()
self.tag_values = TAG_VALUES.copy()

# Cache of node objects from the last nodes() call. This avoids
# excessive DescribeInstances requests.
self.cached_nodes = {}
Expand All @@ -46,7 +49,8 @@ def nodes(self, label_filters):

cluster_name_filter_expr = (
'(labels.{key} = {value})'
''.format(key=TAG_RAY_CLUSTER_NAME, value=self.cluster_name))
''.format(key=self.tag_keys['cluster-name'],
value=self.cluster_name))

not_empty_filters = [
f for f in [
Expand Down Expand Up @@ -133,7 +137,7 @@ def create_node(self, base_config, labels, count):
availability_zone = self.provider_config['availability_zone']

config = base_config.copy()
config['name'] = labels[TAG_NAME]
config['name'] = labels[self.tag_keys['node-name']]
config['machineType'] = (
'zones/{zone}/machineTypes/{machine_type}'
''.format(zone=availability_zone,
Expand All @@ -142,7 +146,7 @@ def create_node(self, base_config, labels, count):
config['labels'] = dict(
config.get('labels', {}),
**labels,
**{TAG_RAY_CLUSTER_NAME: self.cluster_name})
**{self.tag_keys['cluster-name']: self.cluster_name})

if count != 1: raise NotImplementedError(count)

Expand Down
35 changes: 35 additions & 0 deletions python/ray/autoscaler/gcp/tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""The Ray autoscaler uses tags/labels to associate metadata with instances.
Notice that the name 'tag' here is somewhat misnomer, since gcp uses the name
'label' for key-value pairs, and 'tag' for value-only tags.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

TAG_KEYS = {
# Tag uniquely identifying all nodes of a cluster
'node-name': 'ray_node-name',
# Tag for the type of node (e.g. Head, Worker)
'node-type': 'ray_node-type',
# Tag that reports the current state of the node (e.g. Updating, Up-to-date)
'node-status': 'ray_node-status',
'cluster-name': 'ray_cluster-name',
# Hash of the node launch config, used to identify out-of-date nodes
'launch-config': 'ray_launch-config',
# Hash of the node runtime config, used to determine if updates are needed
'runtime-config': 'ray_runtime-config',
}


TAG_VALUES = {
'head': 'head',
'waiting-for-ssh': 'waiting-for-ssh',
'update-failed': 'update-failed',
'setting-up': 'setting-up',
'syncing-files': 'syncing-files',
'up-to-date': 'up-to-date',
'worker': 'worker',
'uninitialized': 'uninitialized',
}
23 changes: 0 additions & 23 deletions python/ray/autoscaler/tags.py

This file was deleted.

33 changes: 22 additions & 11 deletions python/ray/autoscaler/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from threading import Thread

from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG

# How long to wait for a node to start, in seconds
NODE_START_WAIT_S = 300
Expand Down Expand Up @@ -72,26 +71,31 @@ def run(self):
"NodeUpdater: Error updating {}"
"See {} for remote logs.".format(error_str, self.output_name),
file=self.stdout)
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "UpdateFailed"})
self.provider.set_node_tags(
self.node_id,
{self.provider.tag_keys['node-status']: self.provider.tag_values['update-failed']})
if self.logfile is not None:
print("----- BEGIN REMOTE LOGS -----\n" +
open(self.logfile.name).read() +
"\n----- END REMOTE LOGS -----")
raise e
self.provider.set_node_tags(
self.node_id, {
TAG_RAY_NODE_STATUS: "Up-to-date",
TAG_RAY_RUNTIME_CONFIG: self.runtime_hash
self.provider.tag_keys['node-status']: self.provider.tag_values['up-to-date'],
self.provider.tag_keys['runtime-config']: self.runtime_hash
})
print(
"NodeUpdater: Applied config {} to node {}".format(
self.runtime_hash, self.node_id),
file=self.stdout)

def do_update(self):
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "WaitingForSSH"})
self.provider.set_node_tags(
self.node_id,
{
self.provider.tag_keys['node-status']:
self.provider.tag_values['waiting-for-ssh']
})
deadline = time.time() + NODE_START_WAIT_S

# Wait for external IP
Expand Down Expand Up @@ -136,8 +140,12 @@ def do_update(self):
assert ssh_ok, "Unable to SSH to node"

# Rsync file mounts
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "SyncingFiles"})
self.provider.set_node_tags(
self.node_id,
{
self.provider.tag_keys['node-status']:
self.provider.tag_values['syncing-files']
})
for remote_path, local_path in self.file_mounts.items():
print(
"NodeUpdater: Syncing {} to {}...".format(
Expand All @@ -161,8 +169,11 @@ def do_update(self):
stderr=self.stderr)

# Run init commands
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "SettingUp"})
self.provider.set_node_tags(
self.node_id,
{self.provider.tag_keys['node-status']:
self.provider.tag_values['setting-up']}
)
for cmd in self.setup_cmds:
self.ssh_cmd(cmd, verbose=True)

Expand Down

0 comments on commit 9cf4840

Please sign in to comment.