Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create utility function for getting Kubernetes client #2845

Merged
merged 8 commits into from
Jun 24, 2020
5 changes: 5 additions & 0 deletions changes/pr2845.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
enhancement:
- "Create utility function for getting Kubernetes client - [#2845](https://github.com/PrefectHQ/prefect/pull/2845)"

contributor:
- "[Paweł Cieśliński](https://github.com/pcieslinski)"
133 changes: 33 additions & 100 deletions src/prefect/tasks/kubernetes/deployment.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from typing import Any
from typing import Any, cast

from kubernetes import client, config
from kubernetes.config.config_exception import ConfigException
from kubernetes import client

from prefect import Task
from prefect.client import Secret
from prefect.utilities.tasks import defaults_from_attrs
from prefect.utilities.kubernetes import get_kubernetes_client


class CreateNamespacedDeployment(Task):
Expand All @@ -19,7 +18,7 @@ class CreateNamespacedDeployment(Task):

1. Attempt to use a Prefect Secret that contains a Kubernetes API Key. If
`kubernetes_api_key_secret` = `None` then it will attempt the next two connection
mathods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
methods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
an override for the remote connection.
2. Attempt in-cluster connection (will only work when running on a Pod in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
Expand Down Expand Up @@ -91,21 +90,10 @@ def run(
"A dictionary representing a ExtensionsV1beta1Deployment must be provided."
)

kubernetes_api_key = None
if kubernetes_api_key_secret:
kubernetes_api_key = Secret(kubernetes_api_key_secret).get()

if kubernetes_api_key:
configuration = client.Configuration()
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.ExtensionsV1beta1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

api_client = client.ExtensionsV1beta1Api()
api_client = cast(
client.ExtensionsV1beta1Api,
get_kubernetes_client("deployment", kubernetes_api_key_secret),
)

body = {**self.body, **(body or {})}
kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}
Expand All @@ -126,7 +114,7 @@ class DeleteNamespacedDeployment(Task):

1. Attempt to use a Prefect Secret that contains a Kubernetes API Key. If
`kubernetes_api_key_secret` = `None` then it will attempt the next two connection
mathods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
methods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
an override for the remote connection.
2. Attempt in-cluster connection (will only work when running on a Pod in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
Expand Down Expand Up @@ -198,21 +186,10 @@ def run(
if not deployment_name:
raise ValueError("The name of a Kubernetes deployment must be provided.")

kubernetes_api_key = None
if kubernetes_api_key_secret:
kubernetes_api_key = Secret(kubernetes_api_key_secret).get()

if kubernetes_api_key:
configuration = client.Configuration()
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.ExtensionsV1beta1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

api_client = client.ExtensionsV1beta1Api()
api_client = cast(
client.ExtensionsV1beta1Api,
get_kubernetes_client("deployment", kubernetes_api_key_secret),
)

kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}
delete_option_kwargs = delete_option_kwargs or {}
Expand All @@ -236,7 +213,7 @@ class ListNamespacedDeployment(Task):

1. Attempt to use a Prefect Secret that contains a Kubernetes API Key. If
`kubernetes_api_key_secret` = `None` then it will attempt the next two connection
mathods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
methods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
an override for the remote connection.
2. Attempt in-cluster connection (will only work when running on a Pod in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
Expand Down Expand Up @@ -295,21 +272,10 @@ def run(
- ExtensionsV1beta1DeploymentList: a Kubernetes ExtensionsV1beta1DeploymentList
of the deployments which are found
"""
kubernetes_api_key = None
if kubernetes_api_key_secret:
kubernetes_api_key = Secret(kubernetes_api_key_secret).get()

if kubernetes_api_key:
configuration = client.Configuration()
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.ExtensionsV1beta1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

api_client = client.ExtensionsV1beta1Api()
api_client = cast(
client.ExtensionsV1beta1Api,
get_kubernetes_client("deployment", kubernetes_api_key_secret),
)

kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}

Expand All @@ -327,7 +293,7 @@ class PatchNamespacedDeployment(Task):

1. Attempt to use a Prefect Secret that contains a Kubernetes API Key. If
`kubernetes_api_key_secret` = `None` then it will attempt the next two connection
mathods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
methods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
an override for the remote connection.
2. Attempt in-cluster connection (will only work when running on a Pod in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
Expand Down Expand Up @@ -412,21 +378,10 @@ def run(
if not deployment_name:
raise ValueError("The name of a Kubernetes deployment must be provided.")

kubernetes_api_key = None
if kubernetes_api_key_secret:
kubernetes_api_key = Secret(kubernetes_api_key_secret).get()

if kubernetes_api_key:
configuration = client.Configuration()
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.ExtensionsV1beta1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

api_client = client.ExtensionsV1beta1Api()
api_client = cast(
client.ExtensionsV1beta1Api,
get_kubernetes_client("deployment", kubernetes_api_key_secret),
)

body = {**self.body, **(body or {})}
kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}
Expand All @@ -447,7 +402,7 @@ class ReadNamespacedDeployment(Task):

1. Attempt to use a Prefect Secret that contains a Kubernetes API Key. If
`kubernetes_api_key_secret` = `None` then it will attempt the next two connection
mathods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
methods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
an override for the remote connection.
2. Attempt in-cluster connection (will only work when running on a Pod in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
Expand Down Expand Up @@ -519,21 +474,10 @@ def run(
if not deployment_name:
raise ValueError("The name of a Kubernetes deployment must be provided.")

kubernetes_api_key = None
if kubernetes_api_key_secret:
kubernetes_api_key = Secret(kubernetes_api_key_secret).get()

if kubernetes_api_key:
configuration = client.Configuration()
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.ExtensionsV1beta1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

api_client = client.ExtensionsV1beta1Api()
api_client = cast(
client.ExtensionsV1beta1Api,
get_kubernetes_client("deployment", kubernetes_api_key_secret),
)

kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}

Expand All @@ -553,7 +497,7 @@ class ReplaceNamespacedDeployment(Task):

1. Attempt to use a Prefect Secret that contains a Kubernetes API Key. If
`kubernetes_api_key_secret` = `None` then it will attempt the next two connection
mathods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
methods. By default the value is `KUBERNETES_API_KEY` so providing `None` acts as
an override for the remote connection.
2. Attempt in-cluster connection (will only work when running on a Pod in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
Expand Down Expand Up @@ -638,21 +582,10 @@ def run(
if not deployment_name:
raise ValueError("The name of a Kubernetes deployment must be provided.")

kubernetes_api_key = None
if kubernetes_api_key_secret:
kubernetes_api_key = Secret(kubernetes_api_key_secret).get()

if kubernetes_api_key:
configuration = client.Configuration()
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.ExtensionsV1beta1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

api_client = client.ExtensionsV1beta1Api()
api_client = cast(
client.ExtensionsV1beta1Api,
get_kubernetes_client("deployment", kubernetes_api_key_secret),
)

body = {**self.body, **(body or {})}
kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}
Expand Down
Loading