Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] sky status --k8s refactor #4079

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from sky import exceptions
from sky import global_user_state
from sky import provision as provision_lib
from sky import resources as resources_lib
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved
from sky import sky_logging
from sky import skypilot_config
from sky import status_lib
Expand All @@ -56,7 +57,6 @@
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import resources
from sky import task as task_lib
from sky.backends import cloud_vm_ray_backend
from sky.backends import local_docker_backend
Expand Down Expand Up @@ -751,7 +751,7 @@ def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]):
# TODO: too many things happening here - leaky abstraction. Refactor.
@timeline.event
def write_cluster_config(
to_provision: 'resources.Resources',
to_provision: resources_lib.Resources,
num_nodes: int,
cluster_config_template: str,
cluster_name: str,
Expand Down
19 changes: 6 additions & 13 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1464,21 +1464,14 @@ def _status_kubernetes(show_all: bool):
Args:
show_all (bool): Show all job information (e.g., start time, failures).
"""
context = kubernetes_utils.get_current_kube_config_context_name()
try:
pods = kubernetes_utils.get_skypilot_pods(context)
except exceptions.ResourcesUnavailableError as e:
with ux_utils.print_exception_no_traceback():
raise ValueError('Failed to get SkyPilot pods from '
f'Kubernetes: {str(e)}') from e
all_clusters, jobs_controllers, serve_controllers = (
status_utils.process_skypilot_pods(pods, context))
all_clusters, jobs_controllers, serve_controllers, context = (
core.status_kubernetes())
all_jobs = []
with rich_utils.safe_status(
'[bold cyan]Checking in-progress managed jobs[/]') as spinner:
for i, (_, job_controller_info) in enumerate(jobs_controllers.items()):
user = job_controller_info['user']
pod = job_controller_info['pods'][0]
for i, job_controller_info in enumerate(jobs_controllers):
user = job_controller_info.user
pod = job_controller_info.pods[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the spinner below, please use ux_utils.spinner_message() for the style of the messaage

status_message = ('[bold cyan]Checking managed jobs controller')
if len(jobs_controllers) > 1:
status_message += f's ({i+1}/{len(jobs_controllers)})'
Expand Down Expand Up @@ -1510,7 +1503,7 @@ def _status_kubernetes(show_all: bool):
managed_job_cluster_names.add(managed_cluster_name)
unmanaged_clusters = [
c for c in all_clusters
if c['cluster_name'] not in managed_job_cluster_names
if c.cluster_name not in managed_job_cluster_names
]
click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}'
f'Kubernetes cluster state (context: {context})'
Expand Down
27 changes: 27 additions & 0 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sky import status_lib
from sky import task
from sky.backends import backend_utils
from sky.provision.kubernetes import utils as kubernetes_utils
from sky.skylet import constants
from sky.skylet import job_lib
from sky.usage import usage_lib
Expand Down Expand Up @@ -111,6 +112,32 @@ def status(cluster_names: Optional[Union[str, List[str]]] = None,
cluster_names=cluster_names)


def status_kubernetes():
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved
"""Get all SkyPilot clusters and controllers in the Kubernetes cluster.

Managed jobs and services are also included in the clusters returned.
The caller must parse the controllers to identify which clusters are run
as managed jobs or services.

Returns:
A tuple containing:
- List of KubernetesClusterInfo with all cluster information.
- List of KubernetesClusterInfo with job controller information.
- List of KubernetesClusterInfo with serve controller information.
- Context used to fetch the cluster information.
"""
context = kubernetes_utils.get_current_kube_config_context_name()
try:
pods = kubernetes_utils.get_skypilot_pods(context)
except exceptions.ResourcesUnavailableError as e:
with ux_utils.print_exception_no_traceback():
raise ValueError('Failed to get SkyPilot pods from '
f'Kubernetes: {str(e)}') from e
all_clusters, jobs_controllers, serve_controllers = (
kubernetes_utils.process_skypilot_pods(pods, context))
return all_clusters, jobs_controllers, serve_controllers, context


def endpoints(cluster: str,
port: Optional[Union[int, str]] = None) -> Dict[int, str]:
"""Gets the endpoint for a given cluster and port number (endpoint).
Expand Down
112 changes: 112 additions & 0 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import yaml

import sky
from sky import clouds
from sky import exceptions
from sky import sky_logging
from sky import skypilot_config
from sky import status_lib
from sky.adaptors import kubernetes
from sky.provision import constants as provision_constants
from sky.provision.kubernetes import network_utils
Expand All @@ -30,6 +32,7 @@

if typing.TYPE_CHECKING:
from sky import backends
from sky import resources as resources_lib

# TODO(romilb): Move constants to constants.py
DEFAULT_NAMESPACE = 'default'
Expand Down Expand Up @@ -2023,3 +2026,112 @@ def get_skypilot_pods(context: Optional[str] = None) -> List[Any]:
'kubectl get pods --selector=skypilot-cluster --all-namespaces'
) from None
return pods


@dataclasses.dataclass
class KubernetesClusterInfo:
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved
cluster_name_on_cloud: str
cluster_name: str
user: str
status: status_lib.ClusterStatus
pods: List[Any]
launched_at: float
resources: 'resources_lib.Resources'
resources_str: str


def process_skypilot_pods(
pods: List[Any],
context: Optional[str] = None
) -> Tuple[List[KubernetesClusterInfo], List[KubernetesClusterInfo],
List[KubernetesClusterInfo]]:
"""Process SkyPilot pods on k8s to extract cluster and controller info.

Args:
pods: List of Kubernetes pod objects.
context: Kubernetes context name, used to detect GPU label formatter.

Returns:
A tuple containing:
- List of KubernetesClusterInfo with all cluster information.
- List of KubernetesClusterInfo with job controller information.
- List of KubernetesClusterInfo with serve controller information.
"""
# pylint: disable=import-outside-toplevel
from sky import resources as resources_lib
clusters: Dict[str, KubernetesClusterInfo] = {}
jobs_controllers: List[KubernetesClusterInfo] = []
serve_controllers: List[KubernetesClusterInfo] = []

for pod in pods:
cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster')
cluster_name = cluster_name_on_cloud.rsplit(
'-', 1
)[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4)
if cluster_name_on_cloud not in clusters:
# Parse the start time for the cluster
start_time = pod.status.start_time
if start_time is not None:
start_time = pod.status.start_time.timestamp()

# Parse resources
cpu_request = parse_cpu_or_gpu_resource(
pod.spec.containers[0].resources.requests.get('cpu', '0'))
memory_request = parse_memory_resource(
pod.spec.containers[0].resources.requests.get('memory', '0'),
unit='G')
gpu_count = parse_cpu_or_gpu_resource(
pod.spec.containers[0].resources.requests.get(
'nvidia.com/gpu', '0'))
gpu_name = None
if gpu_count > 0:
label_formatter, _ = (detect_gpu_label_formatter(context))
assert label_formatter is not None, (
'GPU label formatter cannot be None if there are pods '
f'requesting GPUs: {pod.metadata.name}')
gpu_label = label_formatter.get_label_key()
# Get GPU name from pod node selector
if pod.spec.node_selector is not None:
gpu_name = label_formatter.get_accelerator_from_label_value(
pod.spec.node_selector.get(gpu_label))

resources = resources_lib.Resources(
cloud=clouds.Kubernetes(),
cpus=int(cpu_request),
memory=int(memory_request),
accelerators=(f'{gpu_name}:{gpu_count}'
if gpu_count > 0 else None))
if pod.status.phase == 'Pending':
# If pod is pending, do not show it in the status
continue

cluster_info = KubernetesClusterInfo(
cluster_name_on_cloud=cluster_name_on_cloud,
cluster_name=cluster_name,
user=pod.metadata.labels.get('skypilot-user'),
status=status_lib.ClusterStatus.UP,
pods=[],
launched_at=start_time,
resources=resources,
resources_str='')
clusters[cluster_name_on_cloud] = cluster_info
# Check if cluster name is name of a controller
# Can't use controller_utils.Controllers.from_name(cluster_name)
# because hash is different across users
if 'sky-jobs-controller' in cluster_name_on_cloud:
jobs_controllers.append(cluster_info)
elif 'sky-serve-controller' in cluster_name_on_cloud:
serve_controllers.append(cluster_info)
else:
# Update start_time if this pod started earlier
pod_start_time = pod.status.start_time
if pod_start_time is not None:
pod_start_time = pod_start_time.timestamp()
if pod_start_time < clusters[cluster_name_on_cloud].launched_at:
clusters[cluster_name_on_cloud].launched_at = pod_start_time
clusters[cluster_name_on_cloud].pods.append(pod)
# Update resources_str in clusters:
for cluster in clusters.values():
num_pods = len(cluster.pods)
cluster.resources_str = f'{num_pods}x {cluster.resources}'
return list(clusters.values()), jobs_controllers, serve_controllers
Loading
Loading