Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor environment build() method #398

Merged
merged 10 commits into from
Dec 12, 2018
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

- Fixed issue with `GraphQLResult` reprs - [#374](https://github.com/PrefectHQ/prefect/pull/374)
- `CronSchedule` produces expected results across daylight savings time transitions - [#375](https://github.com/PrefectHQ/prefect/pull/375)
- `utilities.serialization.Nested` properly respects `marshmallow.missing` values - [#398](https://github.com/PrefectHQ/prefect/pull/398)

### Breaking Changes

Expand All @@ -46,6 +47,7 @@
- Local secrets are now pulled from `secrets` in context instead of `_secrets` - [#382](https://github.com/PrefectHQ/prefect/pull/382)
- Remove Task and Flow descriptions, Flow project & version attributes - [#383](https://github.com/PrefectHQ/prefect/issues/383)
- Changed `Schedule` parameter from `on_or_after` to `after` - [#396](https://github.com/PrefectHQ/prefect/issues/396)
- Environments are immutable and return `dict` keys instead of `str`; some arguments for `ContainerEnvironment` are removed - [#398](https://github.com/PrefectHQ/prefect/pull/398)

## 0.3.3 <Badge text="alpha" type="warn"/>

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/cli/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def ids():
"""
Prints all the flows in the registry.
"""
output = {id: f.id for id, f in registry.REGISTRY.items()}
click.echo(json.dumps(output, sort_keys=True))
click.echo(json.dumps(list(registry.REGISTRY.keys()), sort_keys=True))


@flows.command()
Expand Down
15 changes: 10 additions & 5 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,11 +981,16 @@ def serialize(self, build: bool = False) -> dict:

self.validate()

if build and self.environment:
self.environment = self.environment.build(self)

serialized = prefect.serialization.flow.FlowSchema().dump(self)

if build and self.environment:
environment_key = self.environment.build(self)
serialized.update(
prefect.serialization.flow.FlowSchema().dump(
{"environment_key": environment_key}
)
)

return serialized

def register(self, registry: dict = None) -> None:
Expand All @@ -1000,12 +1005,12 @@ def register(self, registry: dict = None) -> None:
)

@cache
def build_environment(self) -> bytes:
def build_environment(self) -> dict:
"""
Build the flow's environment.

Returns:
- bytes of a key that can be used to access the environment.
- dict: a key that can be used to recreate the environment.

Raises:
- ValueError: if no environment is specified in this flow
Expand Down
156 changes: 58 additions & 98 deletions src/prefect/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,26 @@
**Note:** Due to ongoing development this file is subject to large changes.
"""

import base64
import logging
import os
from pathlib import Path
import shlex
import subprocess
import tempfile
import textwrap
from typing import Optional
import uuid
from pathlib import Path
from typing import Optional

import docker
import toml
from cryptography.fernet import Fernet

import prefect
from prefect import config
from prefect.client import Secret
from prefect.serializers import Serializer

# from prefect.utilities.json import ObjectAttributesCodec, Serializable


class Environment:
"""
Expand All @@ -39,24 +40,25 @@ class Environment:
def __init__(self) -> None:
pass

def build(self, flow: "prefect.Flow") -> "prefect.environments.Environment":
def build(self, flow: "prefect.Flow") -> dict:
"""
Build the environment. Returns a key that must be passed to interact with the
Build the environment. Returns a JSON object that can be used to interact with the
environment.

Args:
- flow (prefect.Flow): the Flow to build the environment for

Returns:
- bytes: a key required for interacting with the environment
- dict: a JSON document that can be used to recreate the environment later.
"""
raise NotImplementedError()

def run(self, key: bytes, cli_cmd: str) -> Optional[bytes]:
"""Issue a CLI command to the environment.
def run(self, env_key: dict, cli_cmd: str) -> Optional[bytes]:
"""
Issue a CLI command to the environment.

Args:
- key (bytes): the environment key
- env_key (dict): the result of calling `self.build()`
- cli_cmd (str): the command to issue
"""
raise NotImplementedError()
Expand All @@ -70,77 +72,25 @@ class ContainerEnvironment(Environment):
and is subject to change.

Args:
- image (str): The image to pull that is used as a base for the Docker container
*Note*: An image that is provided must be able to handle `python` and `pip` commands
- name (str, optional): The name the image will take on the registry
- tag (str, optional): The tag for this container
- base_image (str): The image to pull that is used as a base for the Docker container
*Note*: Images must include Python 3.4+ and `pip`.
- registry_url (str, optional): The registry to push the image to
- python_dependencies (list, optional): The list of pip installable python packages
that will be installed on build of the Docker container
- secrets (list, optional): A list of secret value names to be loaded into the environment
- flow_id (str, optional): A consistent flow ID (generally set on build and not passed in)
"""

def __init__(
self,
image: str,
name: str = None,
tag: str = None,
base_image: str,
registry_url: str = None,
python_dependencies: list = None,
secrets: list = None,
flow_id: str = None,
) -> None:
self._image = image
self._name = name or str(uuid.uuid4())
self._tag = tag or str(uuid.uuid4())
self._registry_url = registry_url
self._python_dependencies = python_dependencies or []
self._secrets = secrets or []
self._flow_id = flow_id
self.last_container_id = None

super().__init__()

@property
def python_dependencies(self) -> list:
"""Get the specified Python dependencies"""
return self._python_dependencies

@property
def secrets(self) -> list:
"""Get the names of the secrets, no values"""
return self._secrets

@property
def image(self) -> str:
"""Get the container's base image"""
return self._image

@property
def name(self) -> str:
"""Get the name of the image"""
return self._name

@property
def tag(self) -> str:
"""Get the container's tag"""
return self._tag

@property
def registry_url(self) -> str:
"""Get the container's registry URL"""
return self._registry_url

@property
def flow_id(self) -> str:
"""Get the container's flow ID"""
return self._flow_id

@property
def client(self) -> "docker.client.DockerClient":
"""Get the environment's client"""
return docker.from_env()
self.base_image = base_image
self.registry_url = registry_url
self.python_dependencies = python_dependencies or []
self.secrets = secrets or []

def build(
self, flow: "prefect.Flow", push: bool = True
Expand All @@ -154,6 +104,10 @@ def build(
Returns:
- ContainerEnvironment: an instance of this environment
"""

image_name = str(uuid.uuid4())
image_tag = str(uuid.uuid4())

with tempfile.TemporaryDirectory() as tempdir:

# Write temp file of serialized registry to same location of Dockerfile
Expand All @@ -179,26 +133,27 @@ def build(
if not self.registry_url:
raise ValueError("Registry not specified.")

image_name = os.path.join(self.registry_url, self.name)
full_name = os.path.join(self.registry_url, image_name)

logging.info("Building the flow's container environment...")
client.images.build(
path=tempdir, tag="{}:{}".format(image_name, self.tag), forcerm=True
path=tempdir, tag="{}:{}".format(full_name, image_tag), forcerm=True
)

if push:
self.push(image_name, self.tag)
self.push(full_name, image_tag)

# Remove the image locally after being pushed
client.images.remove("{}:{}".format(image_name, self.tag))
client.images.remove("{}:{}".format(full_name, image_tag))

self._flow_id = flow.id
return self
return dict(name=image_name, tag=image_tag)

def run(self, cli_cmd: str) -> None:
def run(self, env_key: dict, cli_cmd: str = None) -> None:
"""Run a command in the Docker container

Args:
- env_key (dict): a JSON document containing details about container, as produced
by the `build()` method.
- cli_cmd (str, optional): An initial cli_cmd that will be executed on container run

Returns:
Expand All @@ -208,9 +163,8 @@ def run(self, cli_cmd: str) -> None:
client = docker.from_env()

running_container = client.containers.run(
self.tag, command=cli_cmd, detach=True
env_key["tag"], command=cli_cmd, detach=True
)
self.last_container_id = running_container.id

return running_container

Expand Down Expand Up @@ -238,7 +192,7 @@ def pull_image(self) -> None:
the environment variables.
"""
client = docker.from_env()
client.images.pull(self.image)
client.images.pull(self.base_image)

def create_dockerfile(self, directory: str = None) -> None:
"""Creates a dockerfile to use as the container.
Expand Down Expand Up @@ -276,7 +230,7 @@ def create_dockerfile(self, directory: str = None) -> None:

file_contents = textwrap.dedent(
"""\
FROM {image}
FROM {base_image}

RUN apt-get -qq -y update && apt-get -qq -y install --no-install-recommends --no-install-suggests git

Expand All @@ -296,7 +250,9 @@ def create_dockerfile(self, directory: str = None) -> None:
RUN git clone https://$PERSONAL_ACCESS_TOKEN@github.com/PrefectHQ/prefect.git
RUN pip install ./prefect
""".format(
image=self.image, pip_installs=pip_installs, env_vars=env_vars
base_image=self.base_image,
pip_installs=pip_installs,
env_vars=env_vars,
)
)

Expand Down Expand Up @@ -330,58 +286,62 @@ def serialized_registry_to_file(

class LocalEnvironment(Environment):
"""
An environment for running a flow locally.
An environment for running a flow locally. This class may be used for debugging and
testing.

Args:
- encryption_key (str, optional): a Fernet encryption key. One will be generated
- encryption_key (bytes, optional): a Fernet encryption key. One will be generated
automatically if None is passed.
"""

def __init__(self, encryption_key: str = None) -> None:
def __init__(self, encryption_key: bytes = None) -> None:
# the config value might be an empty string, since configs don't support None
if encryption_key is None:
encryption_key = Fernet.generate_key()
self.encryption_key = encryption_key
self.serialized = None

def build(self, flow: "prefect.Flow") -> "prefect.environments.LocalEnvironment":
def build(self, flow: "prefect.Flow") -> dict:
"""
Build the LocalEnvironment

Args:
- flow (Flow): The prefect Flow object to build the environment for

Returns:
- LocalEnvironment: An instance of this environment
- dict: a dictionary containing the serialized registry
"""
registry = {} # type: dict
flow.register(registry=registry)
serialized = prefect.core.registry.serialize_registry(
registry=registry, include_ids=[flow.id], encryption_key=self.encryption_key
)
self.serialized = serialized
return self
return {"serialized registry": base64.b64encode(serialized).decode()}

def run(self, key: bytes, cli_cmd: str) -> bytes:
def run(self, env_key: dict, cli_cmd: str) -> bytes:
"""
Run a command in the `LocalEnvironment`. This functions by writing a pickled
flow to temporary memory and then executing prefect CLI commands against it.

Args:
- key (bytes): The encrypted and pickled flow registry
- env_key (dict): a JSON document containing details about container, as produced
by the `build()` method.
- cli_cmd (str): The prefect CLI command to be run

Returns:
- bytes: the output of `subprocess.check_output` from the command run against the flow
"""
with tempfile.NamedTemporaryFile() as tmp:
with open(tmp.name, "wb") as f:
f.write(key)
f.write(base64.b64decode(env_key["serialized registry"]))

env = ['PREFECT__REGISTRY__STARTUP_REGISTRY_PATH="{}"'.format(tmp.name)]
env_vars = {
"PREFECT__REGISTRY__STARTUP_REGISTRY_PATH": tmp.name,
"PREFECT__REGISTRY__ENCRYPTION_KEY": self.encryption_key,
}

if self.encryption_key:
env.append(
'PREFECT__REGISTRY__ENCRYPTION_KEY="{}"'.format(self.encryption_key)
)
env = os.environ.copy()
env.update(env_vars)

return subprocess.check_output(
" ".join(env + [cli_cmd]), shell=True, stderr=subprocess.STDOUT
shlex.split(cli_cmd), stderr=subprocess.STDOUT, env=env
)
Loading