Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DaskKubernetesEnvironment #1338

Merged
merged 9 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Enhancements

- None
- Allow for min and max workers to be specified in `DaskKubernetesEnvironment` - [#1338](https://github.com/PrefectHQ/prefect/pulls/1338)

### Task Library

Expand All @@ -26,7 +26,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Breaking Changes

- None
- Rename `CloudEnvironment` to `DaskKubernetesEnvironment` - [#1250](https://github.com/PrefectHQ/prefect/issues/1250)

### Contributors

Expand Down
2 changes: 1 addition & 1 deletion docs/cloud/cloud_concepts/secrets.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ KEY = VALUE
with however many key / value pairs you'd like.

::: tip You don't have to store raw values in your config
Prefect will interpolate certain values from your OS environment, so you can specify values from environment variables via `"$ENV_VAR"`.
Prefect will interpolate certain values from your OS environment, so you can specify values from environment variables via `"$ENV_VAR"`. Note that secrets set this way will always result in lowercase names.
:::

### Cloud Execution
Expand Down
4 changes: 2 additions & 2 deletions docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classes = ["Client"]

[pages.client.secrets]
title = "Secrets"
module = "prefect.client"
module = "prefect.client.secrets"
classes = ["Secret"]

[pages.schedules.schedules]
Expand Down Expand Up @@ -168,7 +168,7 @@ classes = ["Docker", "Local", "Memory", "Bytes"]
[pages.environments.execution]
title = "Execution Environments"
module = "prefect.environments.execution"
classes = ["LocalEnvironment", "RemoteEnvironment"]
classes = ["DaskKubernetesEnvironment", "LocalEnvironment", "RemoteEnvironment"]

[pages.tasks.control_flow]
title = "Control Flow Tasks"
Expand Down
33 changes: 33 additions & 0 deletions src/prefect/client/secrets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,36 @@
"""
A Secret is a serializable object used to represent a secret key & value.

The value of the `Secret` is not set upon initialization and instead is set
either in `prefect.context` or on the server, with behavior dependent on the value
of the `use_local_secrets` flag in your Prefect configuration file.

To set a Secret in Prefect Cloud, you can use `prefect.Client.set_secret`, or set it directly via GraphQL:

```graphql
mutation {
setSecret(input: { name: "KEY", value: "VALUE" }) {
success
}
}
```

To set a _local_ Secret, either place the value in your user configuration file (located at `~/.prefect/config.toml`):

```
[context.secrets]
MY_KEY = "MY_VALUE"
```

or directly in context:

```python
import prefect

prefect.context.secrets["MY_KEY"] = "MY_VALUE"
```
"""

import json
import os
from typing import Any, Optional
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/environments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from prefect.environments.execution import (
Environment,
DaskKubernetesEnvironment,
LocalEnvironment,
RemoteEnvironment,
)
from prefect.environments.execution.cloud import CloudEnvironment
1 change: 1 addition & 0 deletions src/prefect/environments/execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
appropriate choice of executor.
"""
from prefect.environments.execution.base import Environment
from prefect.environments.execution.dask import DaskKubernetesEnvironment
from prefect.environments.execution.local import LocalEnvironment
from prefect.environments.execution.remote import RemoteEnvironment
1 change: 0 additions & 1 deletion src/prefect/environments/execution/cloud/__init__.py

This file was deleted.

1 change: 1 addition & 0 deletions src/prefect/environments/execution/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from prefect.environments.execution.dask.k8s import DaskKubernetesEnvironment
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from prefect.utilities import logging


class CloudEnvironment(Environment):
class DaskKubernetesEnvironment(Environment):
"""
CloudEnvironment is an environment which deploys your flow (stored in a Docker image)
on Kubernetes and it uses the Prefect dask executor by dynamically spawning workers as pods.

*Note*: This environment is not currently customizable. This may be subject to change.
DaskKubernetesEnvironment is an environment which deploys your flow (stored in a Docker image)
on Kubernetes by spinning up a temporary Dask Cluster and using the Prefect `DaskExecutor` on this cluster.

If pulling from a private docker registry, `setup` will ensure the appropriate
kubernetes secret exists; `execute` creates a single job that has the role
Expand All @@ -32,6 +30,8 @@ class CloudEnvironment(Environment):
set with a UUID so resources can be cleaned up independently of other deployments.

Args:
- min_workers (int, optional): the minimum allowed number of Dask worker pods; defaults to 1
- max_workers (int, optional): the maximum allowed number of Dask worker pods; defaults to 1
- private_registry (bool, optional): a boolean specifying whether your Flow's Docker container will be in a private
Docker registry; if so, requires a Prefect Secret containing your docker credentials to be set.
Defaults to `False`.
Expand All @@ -41,8 +41,14 @@ class CloudEnvironment(Environment):
"""

def __init__(
self, private_registry: bool = False, docker_secret: str = None
self,
min_workers: int = 1,
max_workers: int = 2,
private_registry: bool = False,
docker_secret: str = None,
) -> None:
self.min_workers = min_workers
self.max_workers = max_workers
self.identifier_label = str(uuid.uuid4())
self.private_registry = private_registry
if self.private_registry:
Expand Down Expand Up @@ -200,7 +206,7 @@ def run_flow(self) -> None:
cluster = KubeCluster.from_dict(
worker_pod, namespace=prefect.context.get("namespace")
)
cluster.adapt(minimum=1, maximum=1)
cluster.adapt(minimum=self.min_workers, maximum=self.max_workers)

# Load serialized flow from file and run it with a DaskExecutor
with open(
Expand All @@ -214,7 +220,6 @@ def run_flow(self) -> None:
executor = DaskExecutor(address=cluster.scheduler_address)
runner_cls = get_default_flow_runner_class()
runner_cls(flow=flow).run(executor=executor)
sys.exit(0) # attempt to force resource cleanup
except Exception as exc:
self.logger.error("Unexpected error raised during flow run: {}".format(exc))
raise exc
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/serialization/environment.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from marshmallow import fields

from prefect.environments import (
CloudEnvironment,
DaskKubernetesEnvironment,
Environment,
LocalEnvironment,
RemoteEnvironment,
Expand All @@ -19,9 +19,9 @@ class Meta:
object_class = LocalEnvironment


class CloudEnvironmentSchema(ObjectSchema):
class DaskKubernetesEnvironmentSchema(ObjectSchema):
class Meta:
object_class = CloudEnvironment
object_class = DaskKubernetesEnvironment

docker_secret = fields.String(allow_none=True)
private_registry = fields.Boolean(allow_none=False)
Expand All @@ -42,7 +42,7 @@ class EnvironmentSchema(OneOfSchema):

# map class name to schema
type_schemas = {
"CloudEnvironment": CloudEnvironmentSchema,
"DaskKubernetesEnvironment": DaskKubernetesEnvironmentSchema,
"Environment": BaseEnvironmentSchema,
"LocalEnvironment": LocalEnvironmentSchema,
"RemoteEnvironment": RemoteEnvironmentSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,31 @@
import yaml

import prefect
from prefect.environments import CloudEnvironment
from prefect.environments import DaskKubernetesEnvironment
from prefect.environments.storage import Docker, Memory
from prefect.utilities.configuration import set_temporary_config


def test_create_cloud_environment():
environment = CloudEnvironment()
def test_create_dask_environment():
environment = DaskKubernetesEnvironment()
assert environment
assert environment.private_registry is False
assert environment.docker_secret is None


def test_create_cloud_environment_identifier_label():
environment = CloudEnvironment()
def test_create_dask_environment_identifier_label():
environment = DaskKubernetesEnvironment()
assert environment.identifier_label


def test_setup_cloud_environment_passes():
environment = CloudEnvironment()
def test_setup_dask_environment_passes():
environment = DaskKubernetesEnvironment()
environment.setup(storage=Docker())
assert environment


def test_setup_doesnt_pass_if_private_registry(monkeypatch):
environment = CloudEnvironment(private_registry=True)
environment = DaskKubernetesEnvironment(private_registry=True)
assert environment.docker_secret == "DOCKER_REGISTRY_CREDENTIALS"

config = MagicMock()
Expand All @@ -47,7 +47,8 @@ def test_setup_doesnt_pass_if_private_registry(monkeypatch):

create_secret = MagicMock()
monkeypatch.setattr(
"prefect.environments.CloudEnvironment._create_namespaced_secret", create_secret
"prefect.environments.DaskKubernetesEnvironment._create_namespaced_secret",
create_secret,
)
with set_temporary_config({"cloud.auth_token": "test"}):
environment.setup(storage=Docker())
Expand All @@ -56,7 +57,7 @@ def test_setup_doesnt_pass_if_private_registry(monkeypatch):


def test_create_secret_isnt_called_if_exists(monkeypatch):
environment = CloudEnvironment(private_registry=True)
environment = DaskKubernetesEnvironment(private_registry=True)

config = MagicMock()
monkeypatch.setattr("kubernetes.config", config)
Expand All @@ -71,7 +72,8 @@ def test_create_secret_isnt_called_if_exists(monkeypatch):

create_secret = MagicMock()
monkeypatch.setattr(
"prefect.environments.CloudEnvironment._create_namespaced_secret", create_secret
"prefect.environments.DaskKubernetesEnvironment._create_namespaced_secret",
create_secret,
)
with set_temporary_config({"cloud.auth_token": "test"}):
with prefect.context(namespace="foo"):
Expand All @@ -81,24 +83,25 @@ def test_create_secret_isnt_called_if_exists(monkeypatch):


def test_execute_improper_storage():
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()
with pytest.raises(TypeError):
environment.execute(storage=Memory(), flow_location="")


def test_execute_storage_missing_fields():
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()
with pytest.raises(ValueError):
environment.execute(storage=Docker(), flow_location="")


def test_execute(monkeypatch):
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()
storage = Docker(registry_url="test1", image_name="test2", image_tag="test3")

create_flow_run = MagicMock()
monkeypatch.setattr(
"prefect.environments.CloudEnvironment.create_flow_run_job", create_flow_run
"prefect.environments.DaskKubernetesEnvironment.create_flow_run_job",
create_flow_run,
)

environment.execute(storage=storage, flow_location="")
Expand All @@ -107,7 +110,7 @@ def test_execute(monkeypatch):


def test_create_flow_run_job(monkeypatch):
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()

config = MagicMock()
monkeypatch.setattr("kubernetes.config", config)
Expand All @@ -128,7 +131,7 @@ def test_create_flow_run_job(monkeypatch):


def test_create_flow_run_job_fails_outside_cluster():
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()

with pytest.raises(EnvironmentError):
with set_temporary_config({"cloud.auth_token": "test"}):
Expand All @@ -138,16 +141,13 @@ def test_create_flow_run_job_fails_outside_cluster():


def test_run_flow(monkeypatch):
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()

flow_runner = MagicMock()
monkeypatch.setattr(
"prefect.engine.get_default_flow_runner_class",
MagicMock(return_value=flow_runner),
)
monkeypatch.setattr(
"prefect.environments.execution.cloud.environment.sys.exit", MagicMock()
)

kube_cluster = MagicMock()
monkeypatch.setattr("dask_kubernetes.KubeCluster", kube_cluster)
Expand All @@ -169,11 +169,9 @@ def test_run_flow(monkeypatch):


def test_populate_job_yaml():
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()

file_path = os.path.dirname(
prefect.environments.execution.cloud.environment.__file__
)
file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__)

with open(path.join(file_path, "job.yaml")) as job_file:
job = yaml.safe_load(job_file)
Expand Down Expand Up @@ -212,11 +210,9 @@ def test_populate_job_yaml():


def test_populate_worker_pod_yaml():
environment = CloudEnvironment()
environment = DaskKubernetesEnvironment()

file_path = os.path.dirname(
prefect.environments.execution.cloud.environment.__file__
)
file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__)

with open(path.join(file_path, "worker_pod.yaml")) as pod_file:
pod = yaml.safe_load(pod_file)
Expand All @@ -240,11 +236,9 @@ def test_populate_worker_pod_yaml():


def test_populate_worker_pod_yaml_with_private_registry():
environment = CloudEnvironment(private_registry=True)
environment = DaskKubernetesEnvironment(private_registry=True)

file_path = os.path.dirname(
prefect.environments.execution.cloud.environment.__file__
)
file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__)

with open(path.join(file_path, "worker_pod.yaml")) as pod_file:
pod = yaml.safe_load(pod_file)
Expand Down
Loading