Skip to content

Commit

Permalink
Merge pull request #2944 from PrefectHQ/storage_enhancements
Browse files Browse the repository at this point in the history
Expand file-based storage to all other storage types
  • Loading branch information
jcrist authored Jul 16, 2020
2 parents 999724e + c6c235a commit 6e18531
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 43 deletions.
2 changes: 2 additions & 0 deletions changes/pr2944.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "All storage types now support file-based storage - [#2944](https://github.com/PrefectHQ/prefect/pull/2944)"
57 changes: 51 additions & 6 deletions docs/core/idioms/file-based.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# Using file based flow storage

Prefect version `0.12.1` began to implement support for storing flows as paths to files. This means that flow code can change in between (or even during) runs without needing to be reregistered. As long as the structure of the flow itself does not change, only the task content, then a Prefect API backend will be able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD processes, and more!
As of Prefect version `0.12.5` all storage options support storing flows as files. This means that flow
code can change in between (or even during) runs without needing to be reregistered. As long as the
structure of the flow itself does not change, only the task content, then a Prefect API backend will be
able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD
processes, and more!

### Enable file storage

GitHub storage only supports files however the other storage options (Local, Docker, S3, etc.) store
flows both as pickles and files. To switch to using file storage and enable the workflow above set
`stored_as_script=True` on the storage object.

### Example file based workflow

Expand All @@ -12,7 +22,9 @@ pip install 'prefect[github]'
```
:::

In this example we will walk through a potential workflow you may use when registering flows with [GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub repository with the following structure:
In this example we will walk through a potential workflow you may use when registering flows with
[GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub
repository with the following structure:

```
repo
Expand Down Expand Up @@ -52,7 +64,9 @@ Here's a breakdown of the three kwargs set on the `GitHub` storage:

- `repo`: the name of the repo that this code will live in
- `path`: the location of the flow file in the repo. This must be an exact match to the path of the file.
- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo.
- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which
is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed
it has the proper permissions to pull the file from the repo.

Push this code to the repository:

Expand All @@ -62,16 +76,47 @@ git commit -m 'Add my flow'
git push
```

Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either Core's server or Prefect Cloud).
Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either
Core's server or Prefect Cloud).

```bash
prefect register flow -f flows/my_flow.py
Result check: OK
Flow: http://localhost:8080/flow/9f5f7bea-186e-44d1-a746-417239663614
```

The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all you need to do is commit that code to the same location in the repository and each subsequent run will use that code.
The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all
you need to do is commit that code to the same location in the repository and each subsequent run will
use that code.

::: warning Flow Structure
If you change any of the structure of your flow such as task names, rearrange task order, etc. then you will need to reregister that flow.
If you change any of the structure of your flow such as task names, rearrange task order, etc. then you
will need to reregister that flow.
:::

### File based Docker storage

```python
flow.storage = Docker(
path="my_flow.py",
files={"/source/of/my_flow.py": "my_flow.py"},
stored_as_script=True
)
```

To store flows as files in Docker storage three kwargs needs to be set if you are using Prefect's default
Docker storage build step:

- `path`: the path that the file is stored in the Docker image
- `files`: a dictionary of local file source to path destination in image
- `stored_as_script`: boolean enabling file based storage

If your Docker storage is using an image that already has your flow files added into it then you only
need to specify the following:

```python
flow.storage = Docker(
path="/location/in/image/my_flow.py",
stored_as_script=True
)
```
3 changes: 3 additions & 0 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ As of Prefect version `0.9.0` every storage option except for `Docker` will auto

Version `0.12.0` introduces a new way to store flows using the various cloud storage options (S3, GCS, Azure) and then in turn run them using Agents which orchestrate containerized environments. For more information see [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments).

Version `0.12.5` introduces file-based storage for all storage options. For more information see the
[Using file based flow storage idiom](/core/idioms/file-based.html).

## Local

[Local Storage](/api/latest/environments/storage.html#local) is the default `Storage` option for all flows. This stores the flow as bytes in the local filesystem which means it can only be run by a [local agent](/orchestration/agents/local.html) running on the same machine.
Expand Down
29 changes: 22 additions & 7 deletions src/prefect/environments/storage/_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ def system_check(python_version: str):
print("System Version check: OK")


def cloudpickle_deserialization_check(flow_file_paths: str):
flow_file_paths = ast.literal_eval(
flow_file_paths
) # convert string to list of strings

def cloudpickle_deserialization_check(flow_file_paths: list):
flows = []
for flow_file in flow_file_paths:
with open(flow_file, "rb") as f:
Expand All @@ -46,6 +42,17 @@ def cloudpickle_deserialization_check(flow_file_paths: str):
return flows


def import_flow_from_script_check(flow_file_paths: list):
from prefect.utilities.storage import extract_flow_from_file

flows = []
for flow_file_path in flow_file_paths:
flows.append(extract_flow_from_file(file_path=flow_file_path))

print("Flow import from script check: OK")
return flows


def result_check(flows: list):
for flow in flows:
if flow.result is not None:
Expand Down Expand Up @@ -114,11 +121,19 @@ def environment_dependency_check(flows: list):


if __name__ == "__main__":
flow_file_path, python_version = sys.argv[1:3]
flow_file_paths, python_version = sys.argv[1:3]

print("Beginning health checks...")

flow_file_paths = ast.literal_eval(flow_file_paths)

system_check(python_version)
flows = cloudpickle_deserialization_check(flow_file_path)

if any(".py" in file_path for file_path in flow_file_paths):
flows = import_flow_from_script_check(flow_file_paths)
else:
flows = cloudpickle_deserialization_check(flow_file_paths)

result_check(flows)
environment_dependency_check(flows)
print("All health checks passed.")
21 changes: 18 additions & 3 deletions src/prefect/environments/storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from prefect.engine.results import AzureResult
from prefect.environments.storage import Storage
from prefect.utilities.storage import extract_flow_from_file

if TYPE_CHECKING:
from prefect.core.flow import Flow
Expand All @@ -31,6 +32,8 @@ class Azure(Storage):
`AZURE_STORAGE_CONNECTION_STRING` will be used
- blob_name (str, optional): a unique key to use for uploading this Flow to Azure. This
is only useful when storing a single Flow using this storage object.
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
- **kwargs (Any, optional): any additional `Storage` initialization options
"""

Expand All @@ -39,6 +42,7 @@ def __init__(
container: str,
connection_string: str = None,
blob_name: str = None,
stored_as_script: bool = False,
**kwargs: Any
) -> None:
self.flows = dict() # type: Dict[str, str]
Expand All @@ -54,7 +58,7 @@ def __init__(
result = AzureResult(
connection_string=self.connection_string, container=container
)
super().__init__(result=result, **kwargs)
super().__init__(result=result, stored_as_script=stored_as_script, **kwargs)

@property
def default_labels(self) -> List[str]:
Expand Down Expand Up @@ -83,8 +87,12 @@ def get_flow(self, flow_location: str) -> "Flow":

self.logger.info("Downloading {} from {}".format(flow_location, self.container))

content = client.download_blob()
return cloudpickle.loads(content.content_as_bytes())
content = client.download_blob().content_as_bytes()

if self.stored_as_script:
return extract_flow_from_file(file_contents=content) # type: ignore

return cloudpickle.loads(content)

def add_flow(self, flow: "Flow") -> str:
"""
Expand Down Expand Up @@ -135,6 +143,13 @@ def build(self) -> "Storage":
"""
self.run_basic_healthchecks()

if self.stored_as_script:
if not self.blob_name:
raise ValueError(
"A `blob_name` must be provided to show where flow `.py` file is stored in Azure."
)
return self

for flow_name, flow in self._flows.items():
data = cloudpickle.dumps(flow)

Expand Down
4 changes: 4 additions & 0 deletions src/prefect/environments/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class Storage(metaclass=ABCMeta):
- add_default_labels (bool): If `True`, adds the storage specific default label (if
applicable) to the storage labels. Defaults to the value specified in the
configuration at `flows.defaults.storage.add_default_labels`.
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
"""

def __init__(
Expand All @@ -34,9 +36,11 @@ def __init__(
secrets: List[str] = None,
labels: List[str] = None,
add_default_labels: bool = None,
stored_as_script: bool = False,
) -> None:
self.result = result
self.secrets = secrets or []
self.stored_as_script = stored_as_script
self._labels = labels or []
if add_default_labels is None:
self.add_default_labels = config.flows.defaults.storage.add_default_labels
Expand Down
42 changes: 31 additions & 11 deletions src/prefect/environments/storage/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import prefect
from prefect.environments.storage import Storage
from prefect.utilities.storage import extract_flow_from_file

if TYPE_CHECKING:
import docker
Expand Down Expand Up @@ -75,6 +76,10 @@ class Docker(Storage):
api.html#docker.api.build.BuildApiMixin.build)
- prefect_directory (str, optional): Path to the directory where prefect configuration/flows
should be stored inside the Docker image. Defaults to `/opt/prefect`.
- path (str, optional): a direct path to the location of the flow file in the Docker image
if `stored_as_script=True`.
- stored_as_script (bool, optional): boolean for specifying if the flow has been stored
as a `.py` file. Defaults to `False`
- **kwargs (Any, optional): any additional `Storage` initialization options
Raises:
Expand All @@ -99,6 +104,8 @@ def __init__(
tls_config: Union[bool, "docker.tls.TLSConfig"] = False,
build_kwargs: dict = None,
prefect_directory: str = "/opt/prefect",
path: str = None,
stored_as_script: bool = False,
**kwargs: Any,
) -> None:
self.registry_url = registry_url
Expand All @@ -112,6 +119,7 @@ def __init__(
self.python_dependencies.append("wheel")

self.prefect_directory = prefect_directory
self.path = path

self.env_vars = env_vars or {}
self.env_vars.setdefault(
Expand Down Expand Up @@ -175,7 +183,7 @@ def __init__(
"absolute paths only."
).format(", ".join(not_absolute))
)
super().__init__(**kwargs)
super().__init__(stored_as_script=stored_as_script, **kwargs)

def get_env_runner(self, flow_location: str) -> Callable[[Dict[str, str]], None]:
"""
Expand Down Expand Up @@ -234,7 +242,7 @@ def add_flow(self, flow: "prefect.core.flow.Flow") -> str:
flow.name
)
)
flow_path = "{}/flows/{}.prefect".format(
flow_path = self.path or "{}/flows/{}.prefect".format(
self.prefect_directory, slugify(flow.name)
)
self.flows[flow.name] = flow_path
Expand All @@ -252,6 +260,9 @@ def get_flow(self, flow_location: str) -> "prefect.core.flow.Flow":
Returns:
- Flow: the requested flow
"""
if self.stored_as_script:
return extract_flow_from_file(file_path=flow_location)

with open(flow_location, "rb") as f:
return cloudpickle.load(f)

Expand Down Expand Up @@ -438,15 +449,23 @@ def create_dockerfile_object(self, directory: str) -> str:

# Write all flows to file and load into the image
copy_flows = ""
for flow_name, flow_location in self.flows.items():
clean_name = slugify(flow_name)
flow_path = os.path.join(directory, "{}.flow".format(clean_name))
with open(flow_path, "wb") as f:
cloudpickle.dump(self._flows[flow_name], f)
copy_flows += "COPY {source} {dest}\n".format(
source=flow_path if self.dockerfile else "{}.flow".format(clean_name),
dest=flow_location,
)
if not self.stored_as_script:
for flow_name, flow_location in self.flows.items():
clean_name = slugify(flow_name)
flow_path = os.path.join(directory, "{}.flow".format(clean_name))
with open(flow_path, "wb") as f:
cloudpickle.dump(self._flows[flow_name], f)
copy_flows += "COPY {source} {dest}\n".format(
source=flow_path
if self.dockerfile
else "{}.flow".format(clean_name),
dest=flow_location,
)
else:
if not self.path:
raise ValueError(
"A `path` must be provided to show where flow `.py` file is stored in the image."
)

# Write all extra commands that should be run in the image
extra_commands = ""
Expand Down Expand Up @@ -498,6 +517,7 @@ def create_dockerfile_object(self, directory: str) -> str:
)

# append the line that runs the healthchecks
# skip over for now if storing flow as file
if not self.ignore_healthchecks:
file_contents += textwrap.dedent(
"""
Expand Down
Loading

0 comments on commit 6e18531

Please sign in to comment.