Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand file-based storage to all other storage types #2944

Merged
merged 22 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
bb967ea
Add file-based Local storage
joshmeek Jul 10, 2020
b71c156
Expand file-based storage to cloud storage options
joshmeek Jul 10, 2020
66e4b87
Add file-based Docker storage
joshmeek Jul 10, 2020
01492b3
Fix Docker storage copy_flows build step
joshmeek Jul 10, 2020
9185f8b
Add some tests for file-based storage
joshmeek Jul 10, 2020
ca6efb5
Change stored_as_file kwarg to stored_as_script
joshmeek Jul 10, 2020
aed47da
Fix docstring line length for flake8
joshmeek Jul 10, 2020
d8d057c
Start storage doc updates
joshmeek Jul 13, 2020
24be7eb
Merge branch 'master' into storage_enhancements
joshmeek Jul 15, 2020
25119df
Remove storage doc restructure and add note to file-based idiom
joshmeek Jul 15, 2020
b0b1397
Add healthcheck for flow file import
joshmeek Jul 15, 2020
83d45c4
Add note of Docker file-based storage to idiom
joshmeek Jul 15, 2020
d55e4ac
Add changelog entry
joshmeek Jul 15, 2020
28db99a
Add healthcheck tests for flow script import
joshmeek Jul 15, 2020
2ffcdd2
Update docs/core/idioms/file-based.md
joshmeek Jul 15, 2020
b153e91
Update tests/environments/storage/test_azure_storage.py
joshmeek Jul 15, 2020
506c54f
Update docs/core/idioms/file-based.md
joshmeek Jul 15, 2020
3ddfba2
Update docs/core/idioms/file-based.md
joshmeek Jul 15, 2020
6ad78af
Merge branch 'master' into storage_enhancements
joshmeek Jul 15, 2020
e91a99f
Update docker healthcheck for multiple flow files
joshmeek Jul 15, 2020
5d9d48d
Update file-based idiom to specify flow files baked into base images
joshmeek Jul 15, 2020
c6c235a
Reformat idiom for new line length
joshmeek Jul 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/pr2944.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "All storage types now support file-based storage - [#2944](https://github.com/PrefectHQ/prefect/pull/2944)"
57 changes: 51 additions & 6 deletions docs/core/idioms/file-based.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# Using file based flow storage

Prefect version `0.12.1` began to implement support for storing flows as paths to files. This means that flow code can change in between (or even during) runs without needing to be reregistered. As long as the structure of the flow itself does not change, only the task content, then a Prefect API backend will be able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD processes, and more!
As of Prefect version `0.12.5` all storage options support storing flows as files. This means that flow
code can change in between (or even during) runs without needing to be reregistered. As long as the
structure of the flow itself does not change, only the task content, then a Prefect API backend will be
able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD
processes, and more!

### Enable file storage

GitHub storage only supports files however the other storage options (Local, Docker, S3, etc.) store
flows both as pickles and files. To switch to using file storage and enable the workflow above set
`stored_as_script=True` on the storage object.

### Example file based workflow

Expand All @@ -12,7 +22,9 @@ pip install 'prefect[github]'
```
:::

In this example we will walk through a potential workflow you may use when registering flows with [GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub repository with the following structure:
In this example we will walk through a potential workflow you may use when registering flows with
[GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub
repository with the following structure:

```
repo
Expand Down Expand Up @@ -52,7 +64,9 @@ Here's a breakdown of the three kwargs set on the `GitHub` storage:

- `repo`: the name of the repo that this code will live in
- `path`: the location of the flow file in the repo. This must be an exact match to the path of the file.
- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo.
- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which
is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed
it has the proper permissions to pull the file from the repo.

Push this code to the repository:

Expand All @@ -62,16 +76,47 @@ git commit -m 'Add my flow'
git push
```

Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either Core's server or Prefect Cloud).
Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either
Core's server or Prefect Cloud).

```bash
prefect register flow -f flows/my_flow.py
Result check: OK
Flow: http://localhost:8080/flow/9f5f7bea-186e-44d1-a746-417239663614
```

The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all you need to do is commit that code to the same location in the repository and each subsequent run will use that code.
The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all
you need to do is commit that code to the same location in the repository and each subsequent run will
use that code.

::: warning Flow Structure
If you change any of the structure of your flow such as task names, rearrange task order, etc. then you will need to reregister that flow.
If you change any of the structure of your flow such as task names, rearrange task order, etc. then you
will need to reregister that flow.
:::

### File based Docker storage

```python
flow.storage = Docker(
path="my_flow.py",
files={"/source/of/my_flow.py": "my_flow.py"},
stored_as_script=True
)
```

To store flows as files in Docker storage three kwargs needs to be set if you are using Prefect's default
Docker storage build step:

- `path`: the path that the file is stored in the Docker image
- `files`: a dictionary of local file source to path destination in image
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
- `stored_as_script`: boolean enabling file based storage

If your Docker storage is using an image that already has your flow files added into it then you only
need to specify the following:

```python
flow.storage = Docker(
path="/location/in/image/my_flow.py",
stored_as_script=True
)
```
3 changes: 3 additions & 0 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ As of Prefect version `0.9.0` every storage option except for `Docker` will auto

Version `0.12.0` introduces a new way to store flows using the various cloud storage options (S3, GCS, Azure) and then in turn run them using Agents which orchestrate containerized environments. For more information see [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments).

Version `0.12.5` introduces file-based storage for all storage options. For more information see the
[Using file based flow storage idiom](/core/idioms/file-based.html).

## Local

[Local Storage](/api/latest/environments/storage.html#local) is the default `Storage` option for all flows. This stores the flow as bytes in the local filesystem which means it can only be run by a [local agent](/orchestration/agents/local.html) running on the same machine.
Expand Down
29 changes: 22 additions & 7 deletions src/prefect/environments/storage/_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ def system_check(python_version: str):
print("System Version check: OK")


def cloudpickle_deserialization_check(flow_file_paths: str):
flow_file_paths = ast.literal_eval(
flow_file_paths
) # convert string to list of strings

def cloudpickle_deserialization_check(flow_file_paths: list):
flows = []
for flow_file in flow_file_paths:
with open(flow_file, "rb") as f:
Expand All @@ -46,6 +42,17 @@ def cloudpickle_deserialization_check(flow_file_paths: str):
return flows


def import_flow_from_script_check(flow_file_paths: list):
from prefect.utilities.storage import extract_flow_from_file

flows = []
for flow_file_path in flow_file_paths:
flows.append(extract_flow_from_file(file_path=flow_file_path))

print("Flow import from script check: OK")
return flows


def result_check(flows: list):
for flow in flows:
if flow.result is not None:
Expand Down Expand Up @@ -114,11 +121,19 @@ def environment_dependency_check(flows: list):


if __name__ == "__main__":
flow_file_path, python_version = sys.argv[1:3]
flow_file_paths, python_version = sys.argv[1:3]

print("Beginning health checks...")

flow_file_paths = ast.literal_eval(flow_file_paths)

system_check(python_version)
flows = cloudpickle_deserialization_check(flow_file_path)

if any(".py" in file_path for file_path in flow_file_paths):
flows = import_flow_from_script_check(flow_file_paths)
else:
flows = cloudpickle_deserialization_check(flow_file_paths)

result_check(flows)
environment_dependency_check(flows)
print("All health checks passed.")
21 changes: 18 additions & 3 deletions src/prefect/environments/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from prefect.engine.results import AzureResult
from prefect.environments.storage import Storage
from prefect.utilities.storage import extract_flow_from_file

if TYPE_CHECKING:
from prefect.core.flow import Flow
Expand All @@ -31,6 +32,8 @@ class Azure(Storage):
`AZURE_STORAGE_CONNECTION_STRING` will be used
- blob_name (str, optional): a unique key to use for uploading this Flow to Azure. This
is only useful when storing a single Flow using this storage object.
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
- **kwargs (Any, optional): any additional `Storage` initialization options
"""

Expand All @@ -39,6 +42,7 @@ def __init__(
container: str,
connection_string: str = None,
blob_name: str = None,
stored_as_script: bool = False,
**kwargs: Any
) -> None:
self.flows = dict() # type: Dict[str, str]
Expand All @@ -54,7 +58,7 @@ def __init__(
result = AzureResult(
connection_string=self.connection_string, container=container
)
super().__init__(result=result, **kwargs)
super().__init__(result=result, stored_as_script=stored_as_script, **kwargs)

@property
def default_labels(self) -> List[str]:
Expand Down Expand Up @@ -83,8 +87,12 @@ def get_flow(self, flow_location: str) -> "Flow":

self.logger.info("Downloading {} from {}".format(flow_location, self.container))

content = client.download_blob()
return cloudpickle.loads(content.content_as_bytes())
content = client.download_blob().content_as_bytes()

if self.stored_as_script:
return extract_flow_from_file(file_contents=content) # type: ignore

return cloudpickle.loads(content)

def add_flow(self, flow: "Flow") -> str:
"""
Expand Down Expand Up @@ -135,6 +143,13 @@ def build(self) -> "Storage":
"""
self.run_basic_healthchecks()

if self.stored_as_script:
if not self.blob_name:
raise ValueError(
"A `blob_name` must be provided to show where flow `.py` file is stored in Azure."
)
return self

for flow_name, flow in self._flows.items():
data = cloudpickle.dumps(flow)

Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class Storage(metaclass=ABCMeta):
- add_default_labels (bool): If `True`, adds the storage specific default label (if
applicable) to the storage labels. Defaults to the value specified in the
configuration at `flows.defaults.storage.add_default_labels`.
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
"""

def __init__(
Expand All @@ -34,9 +36,11 @@ def __init__(
secrets: List[str] = None,
labels: List[str] = None,
add_default_labels: bool = None,
stored_as_script: bool = False,
) -> None:
self.result = result
self.secrets = secrets or []
self.stored_as_script = stored_as_script
self._labels = labels or []
if add_default_labels is None:
self.add_default_labels = config.flows.defaults.storage.add_default_labels
Expand Down
42 changes: 31 additions & 11 deletions src/prefect/environments/storage/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import prefect
from prefect.environments.storage import Storage
from prefect.utilities.storage import extract_flow_from_file

if TYPE_CHECKING:
import docker
Expand Down Expand Up @@ -75,6 +76,10 @@ class Docker(Storage):
api.html#docker.api.build.BuildApiMixin.build)
- prefect_directory (str, optional): Path to the directory where prefect configuration/flows
should be stored inside the Docker image. Defaults to `/opt/prefect`.
- path (str, optional): a direct path to the location of the flow file in the Docker image
if `stored_as_script=True`.
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
- **kwargs (Any, optional): any additional `Storage` initialization options

Raises:
Expand All @@ -99,6 +104,8 @@ def __init__(
tls_config: Union[bool, "docker.tls.TLSConfig"] = False,
build_kwargs: dict = None,
prefect_directory: str = "/opt/prefect",
path: str = None,
stored_as_script: bool = False,
**kwargs: Any,
) -> None:
self.registry_url = registry_url
Expand All @@ -112,6 +119,7 @@ def __init__(
self.python_dependencies.append("wheel")

self.prefect_directory = prefect_directory
self.path = path

self.env_vars = env_vars or {}
self.env_vars.setdefault(
Expand Down Expand Up @@ -175,7 +183,7 @@ def __init__(
"absolute paths only."
).format(", ".join(not_absolute))
)
super().__init__(**kwargs)
super().__init__(stored_as_script=stored_as_script, **kwargs)

def get_env_runner(self, flow_location: str) -> Callable[[Dict[str, str]], None]:
"""
Expand Down Expand Up @@ -234,7 +242,7 @@ def add_flow(self, flow: "prefect.core.flow.Flow") -> str:
flow.name
)
)
flow_path = "{}/flows/{}.prefect".format(
flow_path = self.path or "{}/flows/{}.prefect".format(
self.prefect_directory, slugify(flow.name)
)
self.flows[flow.name] = flow_path
Expand All @@ -252,6 +260,9 @@ def get_flow(self, flow_location: str) -> "prefect.core.flow.Flow":
Returns:
- Flow: the requested flow
"""
if self.stored_as_script:
return extract_flow_from_file(file_path=flow_location)

with open(flow_location, "rb") as f:
return cloudpickle.load(f)

Expand Down Expand Up @@ -438,15 +449,23 @@ def create_dockerfile_object(self, directory: str) -> str:

# Write all flows to file and load into the image
copy_flows = ""
for flow_name, flow_location in self.flows.items():
clean_name = slugify(flow_name)
flow_path = os.path.join(directory, "{}.flow".format(clean_name))
with open(flow_path, "wb") as f:
cloudpickle.dump(self._flows[flow_name], f)
copy_flows += "COPY {source} {dest}\n".format(
source=flow_path if self.dockerfile else "{}.flow".format(clean_name),
dest=flow_location,
)
if not self.stored_as_script:
for flow_name, flow_location in self.flows.items():
clean_name = slugify(flow_name)
flow_path = os.path.join(directory, "{}.flow".format(clean_name))
with open(flow_path, "wb") as f:
cloudpickle.dump(self._flows[flow_name], f)
copy_flows += "COPY {source} {dest}\n".format(
source=flow_path
if self.dockerfile
else "{}.flow".format(clean_name),
dest=flow_location,
)
else:
if not self.path:
raise ValueError(
"A `path` must be provided to show where flow `.py` file is stored in the image."
)

# Write all extra commands that should be run in the image
extra_commands = ""
Expand Down Expand Up @@ -498,6 +517,7 @@ def create_dockerfile_object(self, directory: str) -> str:
)

# append the line that runs the healthchecks
# skip over for now if storing flow as file
if not self.ignore_healthchecks:
file_contents += textwrap.dedent(
"""
Expand Down
Loading