Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Webhook storage (fixes #2835) #3000

Merged
merged 21 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add _request to keyword args
  • Loading branch information
jameslamb committed Aug 1, 2020
commit 0add059a315f0e90083da1f65555b7c533defc92
8 changes: 4 additions & 4 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ from prefect.environments.storage import Webhook
flow = Flow(
"dropbox-flow",
storage=Webhook(
build_kwargs={
build_request_kwargs={
"url": "https://content.dropboxapi.com/2/files/upload",
"headers": {
"Content-Type": "application/octet-stream",
Expand All @@ -167,8 +167,8 @@ flow = Flow(
),
},
},
build_http_method="POST",
get_flow_kwargs={
build_request_http_method="POST",
get_flow_request_kwargs={
"url": "https://content.dropboxapi.com/2/files/download",
"headers": {
"Accept": "application/octet-stream",
Expand All @@ -177,7 +177,7 @@ flow = Flow(
),
},
},
get_flow_http_method="POST",
get_flow_request_http_method="POST",
build_secret_config={
Copy link

@jcrist jcrist Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this kwarg a bit confusing and limiting (it only works on headers? what if you need to set a query parameter or part of the body? what if you need to modify the secret before transmitting (e.g. adding a Bearer prefix)). I wonder if we might instead support templating in the (recursive) values in get_flow_kwargs/build_flow_kwargs. The semantics might be:

  • Recursively search the values (not keys) of dicts passed to build_kwargs and get_flow_kwargs for strings
  • Replace any template strings found in those values, using environment variables first and falling back to secrets.
storage = Webhook(
    build_kwargs={
        "url": "https://content.dropboxapi.com/2/files/upload",
        "headers": {
            "Content-Type": "application/octet-stream",
            "Dropbox-API-Arg": json.dumps(
                {
                    "path": "/Apps/prefect-test-app/dropbox-flow.flow",
                    "mode": "overwrite",
                    "autorename": False,
                    "strict_conflict": True,
                }
            ),
        },
        "Authorization": "${DBOX_OAUTH2_TOKEN}",
    },
    build_http_method="POST",
    get_flow_kwargs={
        "url": "https://content.dropboxapi.com/2/files/download",
        "headers": {
            "Accept": "application/octet-stream",
            "Dropbox-API-Arg": json.dumps(
                {"path": "/Apps/prefect-test-app/dropbox-flow.flow"}
            ),
        },
    },
    get_flow_http_method="POST",
)

One way of doing this would be to make use of string.Template and a magic mapping to handle dynamically looking up fields. We'd might want to change the regex to drop the $ prefix to make it similar to str.format not (or maybe not? not sure what's clearer) but this works. (note that str.format converts the mapping to a dict before formatting, so we can't use that to dynamically load secrets/environment variables unfortunately).

In [13]: from collections.abc import Mapping

In [14]: class Magic(Mapping):
    ...:     def __getitem__(self, key):
    ...:         print("Could lookup environment variable or secret here")
    ...:         return "hello-world"
    ...:     def __iter__(self):
    ...:         return iter([])
    ...:     def __len__(self):
    ...:         return 0
    ...:

In [15]: magic_map = Magic()

In [16]: template = string.Template("Bearer ${token}")

In [17]: template.substitute(magic_map)
Could lookup environment variable or secret here
Out[17]: 'Bearer hello-world'

Could also use the regex module directly, which might be simpler 🤷.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also find the get_flow and build prefixes on these kwargs a bit off. I know they correspond to the requests for the build and get_flow methods, but without the word request in there build_kwargs looks like kwargs to pass to build to me. Feels too tied to the interface method names and not tied to what the requests are actually doing (storing and loading bytes). Perhaps?

store_request_kwargs=...,
store_request_method=...,
load_request_kwargs=...,
load_request_method=...,

I'd use put and get, except those conflict with the http methods. Not a strong opinion, just a thought.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was sufficiently complex that I shouldn't make it part of the first pass, but if you think it's necessary I'm happy to add it!

I'm a little worried about free-form templating everything though...that's going to be a problem if you have JSON jammed in a string, like the DropBox API requires (https://github.com/jameslamb/webhook-storage-integration-tests/blob/3bc93bf2ce4b9a0539306045f2f6a82bc3325c53/test-dropbox.py#L47). That opens you up to needing to know how to escape the right }, which doesn't sound fun.

maybe it would be simpler to, instead of templating individual string fields, just allow people to replace the entire value of any build_kwarg or get_flow_kwarg with the value of an environment variable / secret?

what if you need to modify the secret before transmitting (e.g. adding a Bearer prefix))

I did think about this specific case. If your API token's literal value is abc, there's no reason you couldn't put Bearer abc into an environment variable / secret, right? Without needing to have any code run to add Bearer to the front.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels non-intuitive to me to require storing a full authorization header/url/etc... in a secret to make proper use of it. If we keep the $ prefix requirement that string.Template uses, that (I believe) avoids the issue of accidentally templating things that just happen to contain {} characters, since they won't match the regex.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooooo ook! I like that a lot. I'll add that to this PR.

As for the names, I feel that there's value in coupling to the method names actually. build() and get_flow() are important to understand when using a Storage object, I think, and I'd rather couple to those than invent another thing people have to reason about. But I do like adding _request to make that clearer. How do you feel about build_request_*and get_flow_request_*?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I do like adding _request to make that clearer.

Makes sense to me!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I just attempted the templating thing. Awesome suggestion, I like this a lot better than the secret_config approach.

Commit: 937dd84

I also updated my integration tests and ran them to be sure it's working as expected: https://github.com/jameslamb/webhook-storage-integration-tests.

Note for reviewers

I think it could be valuable to offer more explicit control over whether environment variables or Prefect secrets are used, to avoid issues caused by conflicting names.

I think that could be done in a backwards-compatible way in the future, by adding a render_preferences argument that is like {"SOME_VARIABLE": "environment"}, which changes the behavior for rendering ${SOME_VARIABLE} from "env --> secret --> error-if-absent" to "env --> error-if-absent".

I thought that complexity wasn't worth it for a first pass, but I'd be happy to add something like it if reviewers think it's a good idea.

"Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"}
},
Expand Down
72 changes: 36 additions & 36 deletions src/prefect/environments/storage/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ class Webhook(Storage):
labeled with `webhook-flow-storage`.

Args:
- build_kwargs (dict): Dictionary of keyword arguments to the
- build_request_kwargs (dict): Dictionary of keyword arguments to the
function from `requests` used to store the flow. Do not supply
`"data"` to this argument, as it will be overwritten with the
flow's content when `.build()` is run.
- build_http_method (str): HTTP method identifying the type of request
- build_request_http_method (str): HTTP method identifying the type of request
to execute when storing the flow. For example, `"POST"` for
`requests.post()`.
- build_secret_config (dict): A dictionary describing how to set
Expand All @@ -44,9 +44,9 @@ class Webhook(Storage):
applies to the request issued by `.build()`, and will also be
used for `.get_flow()` unless you explicitly set
`get_flow_secret_config`.
- get_flow_kwargs (dict): Dictionary of keyword arguments to the
- get_flow_request_kwargs (dict): Dictionary of keyword arguments to the
function from `requests` used to retrieve the flow.
- get_flow_http_method (str): HTTP method identifying the type of
- get_flow_request_http_method (str): HTTP method identifying the type of
request to execute when storing the flow. For example, `"GET"`
for `requests.post()`.
- get_flow_secret_config (dict): Similar to `build_secret_config`, but
Expand Down Expand Up @@ -85,13 +85,13 @@ class Webhook(Storage):

```python
storage = Webhook(
build_kwargs={
build_request_kwargs={
"url": "some-service",
"headers" = {
"Content-Type" = "application/octet-stream"
}
},
build_http_method="POST",
build_request_http_method="POST",
...
...
build_secret_config={
Expand All @@ -106,10 +106,10 @@ class Webhook(Storage):

def __init__(
self,
build_kwargs: Dict[str, Any],
build_http_method: str,
get_flow_kwargs: Dict[str, Any],
get_flow_http_method: str,
build_request_kwargs: Dict[str, Any],
build_request_http_method: str,
get_flow_request_kwargs: Dict[str, Any],
get_flow_request_http_method: str,
build_secret_config: Optional[Dict[str, Any]] = None,
get_flow_secret_config: Optional[Dict[str, Any]] = None,
stored_as_script: bool = False,
Expand All @@ -132,25 +132,25 @@ def __init__(
"PUT": self._session.put,
}

if build_http_method not in self._method_to_function.keys():
msg = "HTTP method '{}' not recognized".format(build_http_method)
if build_request_http_method not in self._method_to_function.keys():
msg = "HTTP method '{}' not recognized".format(build_request_http_method)
self.logger.fatal(msg)
raise RuntimeError(msg)

if get_flow_http_method not in self._method_to_function.keys():
msg = "HTTP method '{}' not recognized".format(get_flow_http_method)
if get_flow_request_http_method not in self._method_to_function.keys():
msg = "HTTP method '{}' not recognized".format(get_flow_request_http_method)
self.logger.fatal(msg)
raise RuntimeError(msg)

self.stored_as_script = stored_as_script
self.flow_script_path = flow_script_path

self.build_kwargs = build_kwargs
self.build_http_method = build_http_method
self.build_request_kwargs = build_request_kwargs
self.build_request_http_method = build_request_http_method
self.build_secret_config = build_secret_config or {}

self.get_flow_kwargs = get_flow_kwargs
self.get_flow_http_method = get_flow_http_method
self.get_flow_request_kwargs = get_flow_request_kwargs
self.get_flow_request_http_method = get_flow_request_http_method
self.get_flow_secret_config = get_flow_secret_config or self.build_secret_config

self._build_responses: Optional[Dict[str, Response]] = None
Expand Down Expand Up @@ -178,15 +178,15 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow":
- requests.exceptions.HTTPError if getting the flow fails
"""
self.logger.info("Retrieving flow")
req_function = self._method_to_function[self.get_flow_http_method]
req_function = self._method_to_function[self.get_flow_request_http_method]

get_flow_kwargs = deepcopy(self.get_flow_kwargs)
get_flow_kwargs["headers"] = self._render_headers(
headers=get_flow_kwargs.get("headers", {}),
get_flow_request_kwargs = deepcopy(self.get_flow_request_kwargs)
get_flow_request_kwargs["headers"] = self._render_headers(
headers=get_flow_request_kwargs.get("headers", {}),
secret_config=self.get_flow_secret_config,
)

response = req_function(**get_flow_kwargs) # type: ignore
response = req_function(**get_flow_request_kwargs) # type: ignore
response.raise_for_status()

if self.stored_as_script:
Expand Down Expand Up @@ -246,16 +246,16 @@ def random_number():


flow.storage = Webhook(
build_kwargs={
build_request_kwargs={
"url": "some-service/upload",
"headers": {"Content-Type": "application/octet-stream"},
},
build_http_method="POST",
get_flow_kwargs={
build_request_http_method="POST",
get_flow_request_kwargs={
"url": "some-service/download",
"headers": {"Accept": "application/octet-stream"},
},
get_flow_http_method="GET",
get_flow_request_http_method="GET",
)

flow.storage.add_flow(flow)
Expand All @@ -265,7 +265,7 @@ def random_number():
flow_id = res._build_responses[flow.name].json()["id"]

# update storage
flow.storage.get_flow_kwargs["url"] = f"{GET_ROUTE}/{flow_id}"
flow.storage.get_flow_request_kwargs["url"] = f"{GET_ROUTE}/{flow_id}"
```

Returns:
Expand Down Expand Up @@ -300,24 +300,24 @@ def random_number():
with open(self.flow_script_path, "r") as f:
data = f.read().encode("utf-8")

req_function = self._method_to_function[self.build_http_method]
req_function = self._method_to_function[self.build_request_http_method]

build_kwargs = deepcopy(self.build_kwargs)
build_kwargs["headers"] = self._render_headers(
headers=build_kwargs.get("headers", {}),
build_request_kwargs = deepcopy(self.build_request_kwargs)
build_request_kwargs["headers"] = self._render_headers(
headers=build_request_kwargs.get("headers", {}),
secret_config=self.build_secret_config,
)

if "data" in build_kwargs.keys():
if "data" in build_request_kwargs.keys():
msg = (
"'data' found in build_kwargs. This value is overwritten "
"'data' found in build_request_kwargs. This value is overwritten "
"with the flow content and should not be set directly"
)
self.logger.warning(msg)
warnings.warn(msg, RuntimeWarning)
build_kwargs["data"] = data
build_request_kwargs["data"] = data

response = req_function(**build_kwargs) # type: ignore
response = req_function(**build_request_kwargs) # type: ignore
response.raise_for_status()

self._build_responses[flow_name] = response
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/serialization/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ class WebhookSchema(ObjectSchema):
class Meta:
object_class = Webhook

build_kwargs = fields.Dict(key=fields.Str, allow_none=False)
build_http_method = fields.String(allow_none=False)
get_flow_kwargs = fields.Dict(key=fields.Str, allow_none=False)
get_flow_http_method = fields.String(allow_none=False)
build_request_kwargs = fields.Dict(key=fields.Str, allow_none=False)
build_request_http_method = fields.String(allow_none=False)
get_flow_request_kwargs = fields.Dict(key=fields.Str, allow_none=False)
get_flow_request_http_method = fields.String(allow_none=False)
build_secret_config = fields.Dict(key=fields.Str, allow_none=False)
get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False)
stored_as_script = fields.Bool(allow_none=True)
Expand Down
8 changes: 4 additions & 4 deletions tests/agent/test_local_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,10 @@ def test_local_agent_deploy_processes_webhook_storage(monkeypatch, runner_token)

agent = LocalAgent()
webhook = Webhook(
build_kwargs={"url": "test-service/upload"},
build_http_method="POST",
get_flow_kwargs={"url": "test-service/download"},
get_flow_http_method="GET",
build_request_kwargs={"url": "test-service/upload"},
build_request_http_method="POST",
get_flow_request_kwargs={"url": "test-service/download"},
get_flow_request_http_method="GET",
)
agent.deploy_flow(
flow_run=GraphQLResult(
Expand Down
Loading