Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler] Kubernetes autoscaler backend #5492

Merged
merged 48 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
86d9c6f
Add Kubernetes NodeProvider to autoscaler
edoakes Aug 12, 2019
be1b96b
Split off SSHCommandRunner
edoakes Aug 19, 2019
ffe00a6
Add KubernetesCommandRunner
edoakes Aug 20, 2019
0915df9
Cleanup
edoakes Aug 22, 2019
4558a76
More config options
edoakes Aug 22, 2019
15d1856
Check if auth present
edoakes Aug 22, 2019
8ed1ed9
More auth checks
edoakes Aug 22, 2019
8ec5f23
Better output
edoakes Aug 22, 2019
3c454c2
Always bootstrap config
edoakes Aug 22, 2019
56e3bf0
All working
edoakes Aug 23, 2019
4379355
Add k8s-rsync comment
edoakes Aug 23, 2019
44327bb
Clean up manual k8s examples
edoakes Aug 23, 2019
2d6fcc3
Fix up submit.yaml
edoakes Aug 23, 2019
9c98da7
Automatically configure permissisons
edoakes Aug 24, 2019
a7265ad
Fix get_node_provider arg
edoakes Aug 24, 2019
3174aa1
Fix permissions
edoakes Aug 24, 2019
aa13c22
Fill in empty auth
edoakes Aug 26, 2019
158876f
Merge branch 'master' into k8s
edoakes Aug 26, 2019
6ff3c2a
Remove ray-cluster from this PR
edoakes Aug 26, 2019
c218965
No hard dep on kubernetes library
edoakes Aug 26, 2019
f904982
Move permissions into autoscaler config
edoakes Aug 26, 2019
2452e1b
lint
edoakes Aug 26, 2019
a4e16d6
Fix indentation
edoakes Aug 26, 2019
e13f4a1
namespace validation
edoakes Aug 28, 2019
0fc7a0f
Use cluster name tag
edoakes Aug 28, 2019
9dd310f
Remove kubernetes from setup.py
edoakes Aug 28, 2019
da0ad01
Comment in example configs
edoakes Aug 28, 2019
34b6592
Same default autoscaling config as aws
edoakes Aug 28, 2019
8a0bb24
Add Kubernetes quickstart
edoakes Aug 28, 2019
6df676c
lint
edoakes Aug 28, 2019
15d3573
Revert changes to submit.yaml (other PR)
edoakes Aug 28, 2019
c610f34
Install kubernetes in travis
edoakes Aug 29, 2019
0cc97d6
address comments
edoakes Aug 29, 2019
e1c6fa2
Improve autoscaling doc
edoakes Aug 29, 2019
3bb44d6
Merge remote-tracking branch 'upstream/master' into k8s
edoakes Aug 29, 2019
2e449ea
kubectl command in setup
edoakes Sep 3, 2019
8fa6126
Force use_internal_ips
edoakes Sep 3, 2019
6559635
Merge remote-tracking branch 'upstream/master' into k8s
edoakes Sep 3, 2019
3fdb850
comments
edoakes Sep 3, 2019
c01d471
backend env in docs
edoakes Sep 4, 2019
670d819
Merge remote-tracking branch 'upstream/master' into k8s
edoakes Sep 4, 2019
86d5cc4
Change namespace config
edoakes Sep 4, 2019
1b80017
comments
edoakes Sep 18, 2019
16adb26
Merge remote-tracking branch 'upstream/master' into k8s
edoakes Sep 18, 2019
e88ec5f
comments
edoakes Sep 18, 2019
fee41e4
Merge branch 'master' into k8s
edoakes Sep 20, 2019
734320a
Merge branch 'master' into k8s
edoakes Oct 2, 2019
b29d172
Fix yaml test
edoakes Oct 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ci/travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout pytest-sugar mock flaky networkx tabulate psutil
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout pytest-sugar mock flaky networkx tabulate psutil kubernetes
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y python-dev python-numpy build-essential curl unzip tmux gdb
Expand All @@ -34,21 +34,21 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout pytest-sugar flaky networkx tabulate psutil aiohttp
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout pytest-sugar flaky networkx tabulate psutil aiohttp kubernetes
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout pytest-sugar mock flaky networkx tabulate psutil
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout pytest-sugar mock flaky networkx tabulate psutil kubernetes
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda3-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout pytest-sugar flaky networkx tabulate psutil aiohttp
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout pytest-sugar flaky networkx tabulate psutil aiohttp kubernetes
elif [[ "$LINT" == "1" ]]; then
sudo apt-get update
sudo apt-get install -y build-essential curl unzip
Expand Down
199 changes: 123 additions & 76 deletions doc/source/autoscaling.rst

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,23 @@
"head_ip": (str, OPTIONAL), # local cluster head node
"worker_ips": (list, OPTIONAL), # local cluster worker nodes
"use_internal_ips": (bool, OPTIONAL), # don't require public ips
"extra_config": (dict, OPTIONAL), # provider-specific config
"namespace": (dict, OPTIONAL), # k8s namespace, if using k8s

# k8s autoscaler permissions, if using k8s
"autoscaler_service_account": (dict, OPTIONAL),
"autoscaler_role": (dict, OPTIONAL),
"autoscaler_role_binding": (dict, OPTIONAL),
},
REQUIRED),

# How Ray will authenticate with newly launched nodes.
"auth": (
{
"ssh_user": (str, REQUIRED), # e.g. ubuntu
"ssh_user": (str, OPTIONAL), # e.g. ubuntu
"ssh_private_key": (str, OPTIONAL),
"kubernetes_config": (dict, OPTIONAL),
},
REQUIRED),
OPTIONAL),

# Docker configuration. If this is specified, all setup and start commands
# will be executed in the container.
Expand Down Expand Up @@ -799,6 +805,7 @@ def fillout_defaults(config):
defaults.update(config)
merge_setup_commands(defaults)
dockerize_if_needed(defaults)
defaults["auth"] = defaults.get("auth", {})
return defaults


Expand Down
24 changes: 13 additions & 11 deletions python/ray/autoscaler/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
config = yaml.safe_load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
validate_config(config)
config = fillout_defaults(config)
validate_config(config)

confirm("This will destroy your cluster", yes)

provider = get_node_provider(config["provider"], config["cluster_name"])

try:

def remaining_nodes():
Expand Down Expand Up @@ -211,9 +210,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
logger.info("get_or_create_head_node: Updating files on head node...")

# Rewrite the auth config so that the head node can update the workers
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config = copy.deepcopy(config)
remote_config["auth"]["ssh_private_key"] = remote_key_path
if config["provider"]["type"] != "kubernetes":
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config["auth"]["ssh_private_key"] = remote_key_path

# Adjust for new file locations
new_mounts = {}
Expand All @@ -228,9 +228,12 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
remote_config_file.write(json.dumps(remote_config))
remote_config_file.flush()
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if config["provider"]["type"] != "kubernetes":
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
})

if restart_only:
init_commands = config["head_start_ray_commands"]
Expand Down Expand Up @@ -271,7 +274,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
"Head node up-to-date, IP address is: {}".format(head_node_ip))

monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
use_docker = bool(config["docker"]["container_name"])
use_docker = "docker" in config and bool(
config["docker"]["container_name"])
if override_cluster_name:
modifiers = " --cluster-name={}".format(
quote(override_cluster_name))
Expand All @@ -284,10 +288,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
print("To open a console on the cluster:\n\n"
" ray attach {}{}\n".format(config_file, modifiers))

print("To ssh manually to the cluster, run:\n\n"
" ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
config["auth"]["ssh_user"],
head_node_ip))
print("To get a remote shell to the cluster manually, run:\n\n"
" {}\n".format(updater.cmd_runner.remote_shell_command_str()))
finally:
provider.cleanup()

Expand Down Expand Up @@ -409,7 +411,7 @@ def _exec(updater, cmd, screen, tmux, port_forward=None):
quote(cmd + "; exec bash")
]
cmd = " ".join(cmd)
updater.ssh_cmd(
updater.cmd_runner.run(
cmd,
allocate_tty=True,
exit_on_fail=True,
Expand Down
15 changes: 15 additions & 0 deletions python/ray/autoscaler/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import kubernetes
from kubernetes.config.config_exception import ConfigException

try:
kubernetes.config.load_incluster_config()
except ConfigException:
kubernetes.config.load_kube_config()
core_api = kubernetes.client.CoreV1Api()
auth_api = kubernetes.client.RbacAuthorizationV1Api()

log_prefix = "KubernetesNodeProvider: "
152 changes: 152 additions & 0 deletions python/ray/autoscaler/kubernetes/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging

from ray.autoscaler.kubernetes import auth_api, core_api, log_prefix

logger = logging.getLogger(__name__)


class InvalidNamespaceError(ValueError):
def __init__(self, field_name, namespace):
self.message = ("Namespace of {} config doesn't match provided "
"namespace '{}'. Either set it to {} or remove the "
"field".format(field_name, namespace, namespace))

def __str__(self):
return self.message


def using_existing_msg(resource_type, name):
return "using existing {} '{}'".format(resource_type, name)


def not_found_msg(resource_type, name):
return "{} '{}' not found, attempting to create it".format(
resource_type, name)


def created_msg(resource_type, name):
return "successfully created {} '{}'".format(resource_type, name)


def not_provided_msg(resource_type):
return "no {} config provided, must already exist".format(resource_type)


def bootstrap_kubernetes(config):
if not config["provider"]["use_internal_ips"]:
return ValueError("Exposing external IP addresses for ray pods isn't "
"currently supported. Please set "
"'use_internal_ips' to false.")
namespace = _configure_namespace(config["provider"])
_configure_autoscaler_service_account(namespace, config["provider"])
_configure_autoscaler_role(namespace, config["provider"])
_configure_autoscaler_role_binding(namespace, config["provider"])
return config


def _configure_namespace(provider_config):
namespace_field = "namespace"
if namespace_field not in provider_config:
raise ValueError("Must specify namespace in Kubernetes config.")

name = provider_config[namespace_field]["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
namespaces = core_api.list_namespace(field_selector=field_selector).items
if len(namespaces) > 0:
assert len(namespaces) == 1
logger.info(log_prefix + using_existing_msg(namespace_field, name))
return name

logger.info(log_prefix + not_found_msg(namespace_field, name))
core_api.create_namespace(provider_config[namespace_field])
logger.info(log_prefix + created_msg(namespace_field, name))
return name


def _configure_autoscaler_service_account(namespace, provider_config):
account_field = "autoscaler_service_account"
if account_field not in provider_config:
logger.info(log_prefix + not_provided_msg(account_field))
return

account = provider_config[account_field]
if "namespace" not in account["metadata"]:
account["metadata"]["namespace"] = namespace
elif account["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(account_field, namespace)

name = account["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
accounts = core_api.list_namespaced_service_account(
namespace, field_selector=field_selector).items
if len(accounts) > 0:
assert len(accounts) == 1
logger.info(log_prefix + using_existing_msg(account_field, name))
return

logger.info(log_prefix + not_found_msg(account_field, name))
core_api.create_namespaced_service_account(namespace, account)
logger.info(log_prefix + created_msg(account_field, name))


def _configure_autoscaler_role(namespace, provider_config):
role_field = "autoscaler_role"
if role_field not in provider_config:
logger.info(log_prefix + not_provided_msg(role_field))
return

role = provider_config[role_field]
if "namespace" not in role["metadata"]:
role["metadata"]["namespace"] = namespace
elif role["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(role_field, namespace)

name = role["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
accounts = auth_api.list_namespaced_role(
namespace, field_selector=field_selector).items
if len(accounts) > 0:
assert len(accounts) == 1
logger.info(log_prefix + using_existing_msg(role_field, name))
return

logger.info(log_prefix + not_found_msg(role_field, name))
auth_api.create_namespaced_role(namespace, role)
logger.info(log_prefix + created_msg(role_field, name))


def _configure_autoscaler_role_binding(namespace, provider_config):
binding_field = "autoscaler_role_binding"
if binding_field not in provider_config:
logger.info(log_prefix + not_provided_msg(binding_field))
return

binding = provider_config[binding_field]
if "namespace" not in binding["metadata"]:
binding["metadata"]["namespace"] = namespace
elif binding["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(binding_field, namespace)
for subject in binding["subjects"]:
if "namespace" not in subject:
subject["namespace"] = namespace
elif subject["namespace"] != namespace:
raise InvalidNamespaceError(
binding_field + " subject '{}'".format(subject["name"]),
namespace)

name = binding["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
accounts = auth_api.list_namespaced_role_binding(
namespace, field_selector=field_selector).items
if len(accounts) > 0:
assert len(accounts) == 1
logger.info(log_prefix + using_existing_msg(binding_field, name))
return

logger.info(log_prefix + not_found_msg(binding_field, name))
auth_api.create_namespaced_role_binding(namespace, binding)
logger.info(log_prefix + created_msg(binding_field, name))
Loading