Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .github/workflows/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
kubernetes-version: ["1.29.2"]
kubernetes-version: ["1.30.2"]
include:
- python-version: '3.10'
kubernetes-version: 1.28.7
kubernetes-version: 1.29.4
- python-version: '3.10'
kubernetes-version: 1.27.11
- python-version: '3.10'
kubernetes-version: 1.26.14
kubernetes-version: 1.28.9

env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Dask Kubernetes
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
:alt: Python Support

.. image:: https://img.shields.io/badge/Kubernetes%20support-1.26%7C1.27%7C1.28%7C1.29-blue
.. image:: https://img.shields.io/badge/Kubernetes%20support-1.28%7C1.29%7C1.30-blue
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
:alt: Kubernetes Support

Expand Down
57 changes: 12 additions & 45 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@

from typing import List

from kr8s.asyncio.objects import APIObject, Deployment, Pod, Service
from kr8s.asyncio.objects import Deployment, Pod, Service, new_class


class DaskCluster(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskclusters"
kind = "DaskCluster"
plural = "daskclusters"
singular = "daskcluster"
namespaced = True
class DaskCluster(new_class("DaskCluster", "kubernetes.dask.org/v1")):
scalable = True
scalable_spec = "worker.replicas"

async def worker_groups(self) -> List[DaskWorkerGroup]:
return await self.api.get(
DaskWorkerGroup.endpoint,
return await DaskWorkerGroup.list(
label_selector=f"dask.org/cluster-name={self.name}",
namespace=self.namespace,
)

async def scheduler_pod(self) -> Pod:
pods = []
while not pods:
pods = await self.api.get(
Pod.endpoint,
pods = await Pod.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand All @@ -41,8 +33,7 @@ async def scheduler_pod(self) -> Pod:
async def scheduler_deployment(self) -> Deployment:
deployments = []
while not deployments:
deployments = await self.api.get(
Deployment.endpoint,
deployments = await Deployment.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand All @@ -57,8 +48,7 @@ async def scheduler_deployment(self) -> Deployment:
async def scheduler_service(self) -> Service:
services = []
while not services:
services = await self.api.get(
Service.endpoint,
services = await Service.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand All @@ -79,19 +69,12 @@ async def ready(self) -> bool:
)


class DaskWorkerGroup(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskworkergroups"
kind = "DaskWorkerGroup"
plural = "daskworkergroups"
singular = "daskworkergroup"
namespaced = True
class DaskWorkerGroup(new_class("DaskWorkerGroup", "kubernetes.dask.org/v1")):
scalable = True
scalable_spec = "worker.replicas"

async def pods(self) -> List[Pod]:
return await self.api.get(
Pod.endpoint,
return await Pod.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.spec.cluster}",
Expand All @@ -103,8 +86,7 @@ async def pods(self) -> List[Pod]:
)

async def deployments(self) -> List[Deployment]:
return await self.api.get(
Deployment.endpoint,
return await Deployment.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.spec.cluster}",
Expand All @@ -119,34 +101,19 @@ async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)


class DaskAutoscaler(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskautoscalers"
kind = "DaskAutoscaler"
plural = "daskautoscalers"
singular = "daskautoscaler"
namespaced = True

class DaskAutoscaler(new_class("DaskAutoscaler", "kubernetes.dask.org/v1")):
async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)


class DaskJob(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskjobs"
kind = "DaskJob"
plural = "daskjobs"
singular = "daskjob"
namespaced = True

class DaskJob(new_class("DaskJob", "kubernetes.dask.org/v1")):
async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.name, namespace=self.namespace)

async def pod(self) -> Pod:
pods = []
while not pods:
pods = await self.api.get(
Pod.endpoint,
pods = await Pod.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand Down
2 changes: 1 addition & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Dask Kubernetes Operator
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
:alt: Python Support

.. image:: https://img.shields.io/badge/Kubernetes%20support-1.26%7C1.27%7C1.28%7C1.29-blue
.. image:: https://img.shields.io/badge/Kubernetes%20support-1.28%7C1.29%7C1.30-blue
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
:alt: Kubernetes Support

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies = [
"dask>=2022.08.1",
"distributed>=2022.08.1",
"kopf>=1.35.3",
"kr8s==0.14.*",
"kr8s==0.17.*",
"kubernetes-asyncio>=12.0.1",
"kubernetes>=12.0.1",
"pykube-ng>=22.9.0",
Expand Down