Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Webhook storage (fixes #2835) #3000

Merged
merged 21 commits into from
Aug 4, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cleaning up docs
  • Loading branch information
jameslamb committed Jul 20, 2020
commit 32ff9e37337b9ef523ce8d372f4930101032c9d9
90 changes: 68 additions & 22 deletions src/prefect/environments/storage/webhook.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# TODO
#
# * figure out how to work with systems where the ID isn't known
# until after build() is done
# * test with Google Drive
# * test with something custom on API Gateway
# * add to all the relevant docs

import cloudpickle
import os
import warnings
Expand All @@ -27,29 +19,41 @@

class WebHook(Storage):
"""
WebHook storage class. This class represents the Storage interface for
Flows stored and retrieved with HTTP requests.

This storage class takes in keyword arguments which describe how to
create the requests, including information on how to fill in headers
from environment variable or Prefect Cloud secrets.

**Note**: Flows registered with this Storage option will automatically be
labeled with `webhook-flow-storage`.

Args:
- build_kwargs (dict): Dictionary of keyword arguments to the
function from ``requests`` used to store the flow. Do not supply
function from `requests` used to store the flow. Do not supply
`"data"` to this argument, as it will be overwritten with the
flow's content when `.build()` is run.
- build_http_method (str): HTTP method identifying the type of request
to execute when storing the flow. For example, ``"POST"`` for
``requests.post()``.
to execute when storing the flow. For example, `"POST"` for
`requests.post()`.
- build_secret_config (dict): A dictionary describing how to set
request headers from environment variables or Prefect Cloud
secrets. See example for details on specifying this. This config
applies to tthe request issued by `.build()`, and wiill also be
used for `.get_flow()` unless you explicitly set
`get_flow_secret_config`.
- get_flow_kwargs (dict): Dictionary of keyword arguments to the
function from ``requests`` used to retrieve the flow.
function from `requests` used to retrieve the flow.
- get_flow_http_method (str): HTTP method identifying the type of
request to execute when storing the flow. For example, ``"GET"``
for ``requests.post()``.
request to execute when storing the flow. For example, `"GET"`
for `requests.post()`.
- get_flow_secret_config (dict): Similar to `build_secret_config`, but
used for the request in `.get_flow()`. By default, the config
passed to `build_secret_config` will be used for `.get_flow()`
as well. Pass a value to this argument to override that behavior.
as well. Pass a value to this argument only if you want to use a
different config for `.get_flow()` than the one used for
`.build()`.

Passing sensitive data in headers
---------------------------------
Expand All @@ -60,16 +64,16 @@ class WebHook(Storage):

This should be a dictionary whose keys are headers, and whose
values indicate whether to retrieve real values from environment
variables (``"type": "environment"``) or
Prefect secrets (``"type": "secret"``).
variables (`"type": "environment"`) or Prefect Cloud secrets
(`"type": "secret"`).

So, for example, to get an API key from an environment variable you
can do the following

```python
storage = Webhoook(
storage = Webhook(
build_kwargs={
"url": "some-random-service.place.thing",
"url": "some-service",
"headers" = {
"Content-Type" = "application/octet-stream"
}
Expand Down Expand Up @@ -194,7 +198,49 @@ def build(self) -> "WebHook":
to store the flow.

The response from this request is stored in `._build_responses`,
a dictionary keyed by flow name.
a dictionary keyed by flow name. If you are using a service where
all the details necessary to fetch a flow cannot be known until you've
stored it, you can do something like the following.

```python
import cloudpickle
import json
import os
import random
import requests

from prefect import task, Task, Flow
from prefect.environments.storage import WebHook

@task
def random_number():
return random.randint(0, 100)
with Flow("test-flow") as flow:
random_number()


flow.storage = WebHook(
build_kwargs={
"url": "some-service/upload",
"headers": {"Content-Type": "application/octet-stream"},
},
build_http_method="POST",
get_flow_kwargs={
"url": "some-service/download",
"headers": {"Accept": "application/octet-stream"},
},
get_flow_http_method="GET",
)

flow.storage.add_flow(flow)
res = flow.storage.build()

# get the ID from the response
flow_id = res._build_responses[flow.name].json()["id"]

# update storage
flow.storage.get_flow_kwargs["url"] = f"{GET_ROUTE}/{flow_id}"
```

Returns:
- Storage: a WebHook storage object
Expand All @@ -220,8 +266,8 @@ def build(self) -> "WebHook":

if "data" in build_kwargs.keys():
msg = (
"'data' found in build_kwargs. This value is overwritten with "
"the flow content and should not be set directly"
"'data' found in build_kwargs. This value is overwritten "
"with the flow content and should not be set directly"
)
self.logger.warning(msg)
warnings.warn(msg, RuntimeWarning)
Expand Down