Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Webhook storage (fixes #2835) #3000

Merged
merged 21 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changes/pr3000.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
enhancement:
- "Add Webhook storage - [#3000](https://github.com/PrefectHQ/prefect/pull/3000)"

contributor:
- "[James Lamb](https://github.com/jameslamb)"
52 changes: 52 additions & 0 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,58 @@ If you do not specify a `registry_url` for your Docker Storage then the image wi
Docker Storage uses the [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/index.html) to build the image and push to a registry. Make sure you have the Docker daemon running locally and you are configured to push to your desired container registry. Additionally make sure whichever platform Agent deploys the container also has permissions to pull from that same registry.
:::

## Webhook

[Webhook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services.

For example, the following code could be used to store flows in DropBox.

```python
from prefect import Flow
from prefect.environments.storage import Webhook

flow = Flow(
"dropbox-flow",
storage=Webhook(
build_request_kwargs={
"url": "https://content.dropboxapi.com/2/files/upload",
"headers": {
"Content-Type": "application/octet-stream",
"Dropbox-API-Arg": json.dumps(
{
"path": "/Apps/prefect-test-app/dropbox-flow.flow",
"mode": "overwrite",
"autorename": False,
"strict_conflict": True,
}
),
"Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}"
},
},
build_request_http_method="POST",
get_flow_request_kwargs={
"url": "https://content.dropboxapi.com/2/files/download",
"headers": {
"Accept": "application/octet-stream",
"Dropbox-API-Arg": json.dumps(
{"path": "/Apps/prefect-test-app/dropbox-flow.flow"}
),
"Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}"
},
},
get_flow_request_http_method="POST",
)
)

flow.storage.build()
```

Template strings in `${}` are used to reference sensitive information. Given `${SOME_TOKEN}`, this storage object will first look in environment variable `SOME_TOKEN` and then fall back to [Prefect secrets](/core/concepts/secrets.html) `SOME_TOKEN`. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud.

::: tip Sensible Defaults
Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `Webhook` storage.
:::

### Non-Docker Storage for Containerized Environments

Prefect allows for flows to be stored in cloud storage services and executed in containerized environments. This has the added benefit of rapidly deploying new versions of flows without having to rebuild images each time. To enable this functionality add an image name to the flow's Environment metadata.
Expand Down
6 changes: 4 additions & 2 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from prefect import config
from prefect.agent import Agent
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, Webhook
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult

Expand Down Expand Up @@ -91,6 +91,7 @@ def __init__(
"gcs-flow-storage",
"s3-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
]
for label in all_storage_labels:
if label not in self.labels:
Expand Down Expand Up @@ -127,7 +128,8 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str:
)

if not isinstance(
StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3, GitHub)
StorageSchema().load(flow_run.flow.storage),
(Local, Azure, GCS, S3, GitHub, Webhook),
):
self.logger.error(
"Storage for flow run {} is not a supported type.".format(flow_run.id)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/environments/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from prefect.environments.storage.gcs import GCS
from prefect.environments.storage.s3 import S3
from prefect.environments.storage.github import GitHub
from prefect.environments.storage.webhook import Webhook


def get_default_storage_class() -> type:
Expand Down
Loading