Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Webhook storage (fixes #2835) #3000

Merged
merged 21 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update LocalAgent tests
  • Loading branch information
jameslamb committed Jul 20, 2020
commit 9bafd61c0da7d5d5b4854f69743a7addac172cb9
53 changes: 53 additions & 0 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,59 @@ If you do not specify a `registry_url` for your Docker Storage then the image wi
Docker Storage uses the [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/index.html) to build the image and push to a registry. Make sure you have the Docker daemon running locally and you are configured to push to your desired container registry. Additionally make sure whichever platform Agent deploys the container also has permissions to pull from that same registry.
:::

## WebHook

[WebHook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services.

For example, the following code could be used to store flows in DropBox.

```python
from prefect import Flow
from prefect.environments.storage import WebHook

flow = Flow(
"dropbox-flow",
storage=WebHoook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files/upload",
"headers": {
"Content-Type": "application/octet-stream",
"Dropbox-API-Arg": json.dumps(
{
"path": "/Apps/prefect-test-app/dropbox-flow.flow",
"mode": "overwrite",
"autorename": False,
"strict_conflict": True,
}
),
},
},
build_http_method="POST",
get_flow_kwargs={
"url": "https://content.dropboxapi.com/2/files/download",
"headers": {
"Accept": "application/octet-stream",
"Dropbox-API-Arg": json.dumps(
{"path": "/Apps/prefect-test-app/dropbox-flow.flow"}
),
},
},
get_flow_http_method="POST",
build_secret_config={
Copy link

@jcrist jcrist Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this kwarg a bit confusing and limiting (it only works on headers? what if you need to set a query parameter or part of the body? what if you need to modify the secret before transmitting (e.g. adding a Bearer prefix)). I wonder if we might instead support templating in the (recursive) values in get_flow_kwargs/build_flow_kwargs. The semantics might be:

  • Recursively search the values (not keys) of dicts passed to build_kwargs and get_flow_kwargs for strings
  • Replace any template strings found in those values, using environment variables first and falling back to secrets.
storage = Webhook(
    build_kwargs={
        "url": "https://content.dropboxapi.com/2/files/upload",
        "headers": {
            "Content-Type": "application/octet-stream",
            "Dropbox-API-Arg": json.dumps(
                {
                    "path": "/Apps/prefect-test-app/dropbox-flow.flow",
                    "mode": "overwrite",
                    "autorename": False,
                    "strict_conflict": True,
                }
            ),
        },
        "Authorization": "${DBOX_OAUTH2_TOKEN}",
    },
    build_http_method="POST",
    get_flow_kwargs={
        "url": "https://content.dropboxapi.com/2/files/download",
        "headers": {
            "Accept": "application/octet-stream",
            "Dropbox-API-Arg": json.dumps(
                {"path": "/Apps/prefect-test-app/dropbox-flow.flow"}
            ),
        },
    },
    get_flow_http_method="POST",
)

One way of doing this would be to make use of string.Template and a magic mapping to handle dynamically looking up fields. We'd might want to change the regex to drop the $ prefix to make it similar to str.format not (or maybe not? not sure what's clearer) but this works. (note that str.format converts the mapping to a dict before formatting, so we can't use that to dynamically load secrets/environment variables unfortunately).

In [13]: from collections.abc import Mapping

In [14]: class Magic(Mapping):
    ...:     def __getitem__(self, key):
    ...:         print("Could lookup environment variable or secret here")
    ...:         return "hello-world"
    ...:     def __iter__(self):
    ...:         return iter([])
    ...:     def __len__(self):
    ...:         return 0
    ...:

In [15]: magic_map = Magic()

In [16]: template = string.Template("Bearer ${token}")

In [17]: template.substitute(magic_map)
Could lookup environment variable or secret here
Out[17]: 'Bearer hello-world'

Could also use the regex module directly, which might be simpler 🤷.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also find the get_flow and build prefixes on these kwargs a bit off. I know they correspond to the requests for the build and get_flow methods, but without the word request in there build_kwargs looks like kwargs to pass to build to me. Feels too tied to the interface method names and not tied to what the requests are actually doing (storing and loading bytes). Perhaps?

store_request_kwargs=...,
store_request_method=...,
load_request_kwargs=...,
load_request_method=...,

I'd use put and get, except those conflict with the http methods. Not a strong opinion, just a thought.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was sufficiently complex that I shouldn't make it part of the first pass, but if you think it's necessary I'm happy to add it!

I'm a little worried about free-form templating everything though...that's going to be a problem if you have JSON jammed in a string, like the DropBox API requires (https://github.com/jameslamb/webhook-storage-integration-tests/blob/3bc93bf2ce4b9a0539306045f2f6a82bc3325c53/test-dropbox.py#L47). That opens you up to needing to know how to escape the right }, which doesn't sound fun.

maybe it would be simpler to, instead of templating individual string fields, just allow people to replace the entire value of any build_kwarg or get_flow_kwarg with the value of an environment variable / secret?

what if you need to modify the secret before transmitting (e.g. adding a Bearer prefix))

I did think about this specific case. If your API token's literal value is abc, there's no reason you couldn't put Bearer abc into an environment variable / secret, right? Without needing to have any code run to add Bearer to the front.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels non-intuitive to me to require storing a full authorization header/url/etc... in a secret to make proper use of it. If we keep the $ prefix requirement that string.Template uses, that (I believe) avoids the issue of accidentally templating things that just happen to contain {} characters, since they won't match the regex.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooooo ook! I like that a lot. I'll add that to this PR.

As for the names, I feel that there's value in coupling to the method names actually. build() and get_flow() are important to understand when using a Storage object, I think, and I'd rather couple to those than invent another thing people have to reason about. But I do like adding _request to make that clearer. How do you feel about build_request_*and get_flow_request_*?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I do like adding _request to make that clearer.

Makes sense to me!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I just attempted the templating thing. Awesome suggestion, I like this a lot better than the secret_config approach.

Commit: 937dd84

I also updated my integration tests and ran them to be sure it's working as expected: https://github.com/jameslamb/webhook-storage-integration-tests.

Note for reviewers

I think it could be valuable to offer more explicit control over whether environment variables or Prefect secrets are used, to avoid issues caused by conflicting names.

I think that could be done in a backwards-compatible way in the future, by adding a render_preferences argument that is like {"SOME_VARIABLE": "environment"}, which changes the behavior for rendering ${SOME_VARIABLE} from "env --> secret --> error-if-absent" to "env --> error-if-absent".

I thought that complexity wasn't worth it for a first pass, but I'd be happy to add something like it if reviewers think it's a good idea.

"Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"}
},
)
)

flow.storage.build()
```

The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html).

::: tip Sensible Defaults
Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `WebHook` storage.
:::

### Non-Docker Storage for Containerized Environments

Prefect allows for flows to be stored in cloud storage services and executed in containerized environments. This has the added benefit of rapidly deploying new versions of flows without having to rebuild images each time. To enable this functionality add an image name to the flow's Environment metadata.
Expand Down
6 changes: 4 additions & 2 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from prefect import config
from prefect.agent import Agent
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, WebHook
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult

Expand Down Expand Up @@ -91,6 +91,7 @@ def __init__(
"gcs-flow-storage",
"s3-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
]
)

Expand Down Expand Up @@ -125,7 +126,8 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str:
)

if not isinstance(
StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3, GitHub)
StorageSchema().load(flow_run.flow.storage),
(Local, Azure, GCS, S3, GitHub, WebHook),
):
self.logger.error(
"Storage for flow run {} is not a supported type.".format(flow_run.id)
Expand Down
10 changes: 5 additions & 5 deletions src/prefect/environments/storage/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
# until after build() is done
# * test with Google Drive
# * test with something custom on API Gateway
# * test that creds work with Prefect Cloud secrets
# * test that creds work with environment variables
# * different secret_config for get_flow() vs. build()
# * add to all the relevant docs

import cloudpickle
Expand Down Expand Up @@ -82,7 +79,7 @@ class WebHook(Storage):
...
build_secret_config={
"X-Api-Key": {
"value": "MY_COOL_ENV_VARIABLE",
"name": "MY_COOL_ENV_VARIABLE",
"type": "environment"
}
}
Expand Down Expand Up @@ -259,10 +256,13 @@ def _render_headers(
- headers (dict): A dictionary of headers.
- secret_config (dict): A secret config. See `help(WebHook)` for
details on how this should be constructed.
Raises:
- KeyError if referencing an environment variable that does not exist
- ValueError if referencing a Secret that does not exist
"""
out_headers = deepcopy(headers)
for header, details in secret_config.items():
name = details["value"]
name = details["name"]
if details["type"] == "environment":
out_headers[header] = os.environ[name]
elif details["type"] == "secret":
Expand Down
32 changes: 31 additions & 1 deletion tests/agent/test_local_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from testfixtures import compare, LogCapture

from prefect.agent.local import LocalAgent
from prefect.environments.storage import Docker, Local, Azure, GCS, S3
from prefect.environments.storage import Docker, Local, Azure, GCS, S3, WebHook
from prefect.utilities.configuration import set_temporary_config
from prefect.utilities.graphql import GraphQLResult

Expand All @@ -21,6 +21,7 @@ def test_local_agent_init(runner_token):
"s3-flow-storage",
"gcs-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
}
assert agent.name == "agent"

Expand All @@ -46,6 +47,7 @@ def test_local_agent_config_options(runner_token):
"s3-flow-storage",
"gcs-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
"test_label",
}

Expand All @@ -71,6 +73,7 @@ def test_local_agent_config_options_hostname(runner_token):
"s3-flow-storage",
"gcs-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
}


Expand Down Expand Up @@ -140,6 +143,7 @@ def test_populate_env_vars(runner_token):
"gcs-flow-storage",
"s3-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
]
),
"PREFECT__CONTEXT__FLOW_RUN_ID": "id",
Expand Down Expand Up @@ -180,6 +184,7 @@ def test_populate_env_vars_includes_agent_labels(runner_token):
"gcs-flow-storage",
"s3-flow-storage",
"github-flow-storage",
"webhook-flow-storage",
]
),
"PREFECT__CONTEXT__FLOW_RUN_ID": "id",
Expand Down Expand Up @@ -278,6 +283,31 @@ def test_local_agent_deploy_processes_azure_storage(monkeypatch, runner_token):
assert len(agent.processes) == 1


def test_local_agent_deploy_processes_webhook_storage(monkeypatch, runner_token):

popen = MagicMock()
monkeypatch.setattr("prefect.agent.local.agent.Popen", popen)

agent = LocalAgent()
webhook = WebHook(
build_kwargs={"url": "test-service/upload"},
build_http_method="POST",
get_flow_kwargs={"url": "test-service/download"},
get_flow_http_method="GET",
)
agent.deploy_flow(
flow_run=GraphQLResult(
{
"flow": GraphQLResult({"storage": webhook.serialize(), "id": "foo"}),
"id": "id",
}
)
)

assert popen.called
assert len(agent.processes) == 1


def test_local_agent_deploy_storage_raises_not_supported_storage(
monkeypatch, runner_token
):
Expand Down
73 changes: 72 additions & 1 deletion tests/environments/storage/test_webhook_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from requests.exceptions import HTTPError
from typing import Any, Dict, Optional

from prefect import context
from prefect import task, Flow
from prefect.environments.storage import WebHook

Expand Down Expand Up @@ -215,7 +216,77 @@ def _mock_successful_post(*args, **kwargs):
"POST": _mock_successful_post,
}
webhook.add_flow(sample_flow)
res = webhook.build()
webhook.build()

with pytest.raises(HTTPError, match="test-error-message"):
webhook.get_flow()


def test_render_headers_gets_env_variables(monkeypatch):
some_cred = str(uuid.uuid4())
another_secret = str(uuid.uuid4())
monkeypatch.setenv("SOME_CRED", some_cred)
webhook = WebHook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
build_secret_config={
"X-Api-Key": {"name": "SOME_CRED", "type": "environment"},
"X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"},
},
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
get_flow_http_method="GET",
)

# set a local secret
context.setdefault("secrets", {})
context.secrets["ANOTHER_SECRET"] = another_secret

initial_headers = {"X-Api-Key": "abc"}
new_headers = webhook._render_headers(
headers=initial_headers, secret_config=webhook.build_secret_config
)

# _render_headers should not have side effects
assert initial_headers == {"X-Api-Key": "abc"}

# env variables and secrets should have been filled in
assert new_headers["X-Api-Key"] == some_cred
assert new_headers["X-Custom-Key"] == another_secret


def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch):
monkeypatch.delenv("SOME_CRED", raising=False)
webhook = WebHook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
build_secret_config={
"X-Api-Key": {"name": "SOME_CRED", "type": "environment"},
},
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
get_flow_http_method="GET",
)

with pytest.raises(KeyError, match="SOME_CRED"):
initial_headers = {"X-Api-Key": "abc"}
webhook._render_headers(
headers=initial_headers, secret_config=webhook.build_secret_config
)


def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch):
monkeypatch.delenv("ANOTHER_SECRET", raising=False)
webhook = WebHook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
build_secret_config={
"X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"},
},
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
get_flow_http_method="GET",
)

with pytest.raises(ValueError, match='Local Secret "ANOTHER_SECRET" was not found'):
initial_headers = {"X-Api-Key": "abc"}
webhook._render_headers(
headers=initial_headers, secret_config=webhook.build_secret_config
)
4 changes: 2 additions & 2 deletions tests/serialization/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def test_webhook_full_serialize():
get_flow_kwargs={
"url": get_url,
"headers": {
"Content-Type": content_type,
"Accept": content_type,
"Dropbox-API-Arg": json.dumps({"path": test_file}),
},
},
Expand Down Expand Up @@ -350,7 +350,7 @@ def test_webhook_full_serialize():
assert serialized["get_flow_kwargs"] == {
"url": get_url,
"headers": {
"Content-Type": content_type,
"Accept": content_type,
"Dropbox-API-Arg": json.dumps({"path": test_file}),
},
}
Expand Down