From 541b1c11e81eb746b44115bf9a5771ed9849d29b Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sun, 19 Jul 2020 21:11:51 -0500 Subject: [PATCH 01/14] Add WebHook storage (fixes #2835) --- src/prefect/environments/storage/__init__.py | 1 + src/prefect/environments/storage/webhook.py | 270 ++++++++++++++++++ src/prefect/serialization/storage.py | 33 ++- .../storage/test_webhook_storage.py | 221 ++++++++++++++ tests/serialization/test_storage.py | 83 ++++++ 5 files changed, 607 insertions(+), 1 deletion(-) create mode 100644 src/prefect/environments/storage/webhook.py create mode 100644 tests/environments/storage/test_webhook_storage.py diff --git a/src/prefect/environments/storage/__init__.py b/src/prefect/environments/storage/__init__.py index 75ae45e951d6..f90ec5fc18c0 100644 --- a/src/prefect/environments/storage/__init__.py +++ b/src/prefect/environments/storage/__init__.py @@ -22,6 +22,7 @@ from prefect.environments.storage.gcs import GCS from prefect.environments.storage.s3 import S3 from prefect.environments.storage.github import GitHub +from prefect.environments.storage.webhook import WebHook def get_default_storage_class() -> type: diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py new file mode 100644 index 000000000000..300417123346 --- /dev/null +++ b/src/prefect/environments/storage/webhook.py @@ -0,0 +1,270 @@ +# TODO +# +# * figure out how to work with systems where the ID isn't known +# until after build() is done +# * test with Google Drive +# * test with something custom on API Gateway +# * test that creds work with Prefect Cloud secrets +# * test that creds work with environment variables +# * different secret_config for get_flow() vs. build() +# * add to all the relevant docs + +import cloudpickle +import os +import warnings + +from copy import deepcopy +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from requests import Session +from requests.adapters import HTTPAdapter +from requests.models import Response +from requests.packages.urllib3.util.retry import Retry + +from prefect.client import Secret +from prefect.environments.storage import Storage + +if TYPE_CHECKING: + from prefect.core.flow import Flow + + +class WebHook(Storage): + """ + Args: + - build_kwargs (dict): Dictionary of keyword arguments to the + function from ``requests`` used to store the flow. Do not supply + `"data"` to this argument, as it will be overwritten with the + flow's content when `.build()` is run. + - build_http_method (str): HTTP method identifying the type of request + to execute when storing the flow. For example, ``"POST"`` for + ``requests.post()``. + - build_secret_config (dict): A dictionary describing how to set + request headers from environment variables or Prefect Cloud + secrets. See example for details on specifying this. This config + applies to tthe request issued by `.build()`, and wiill also be + used for `.get_flow()` unless you explicitly set + `get_flow_secret_config`. + - get_flow_kwargs (dict): Dictionary of keyword arguments to the + function from ``requests`` used to retrieve the flow. + - get_flow_http_method (str): HTTP method identifying the type of + request to execute when storing the flow. For example, ``"GET"`` + for ``requests.post()``. + - get_flow_secret_config (dict): Similar to `build_secret_config`, but + used for the request in `.get_flow()`. By default, the config + passed to `build_secret_config` will be used for `.get_flow()` + as well. Pass a value to this argument to override that behavior. + + Passing sensitive data in headers + --------------------------------- + + For services which require authentication, use `secret_config` to pass + sensitive data like API keys without storing their values in this Storage + object. + + This should be a dictionary whose keys are headers, and whose + values indicate whether to retrieve real values from environment + variables (``"type": "environment"``) or + Prefect secrets (``"type": "secret"``). + + So, for example, to get an API key from an environment variable you + can do the following + + ```python + storage = Webhoook( + build_kwargs={ + "url": "some-random-service.place.thing", + "headers" = { + "Content-Type" = "application/octet-stream" + } + }, + build_http_method="POST", + ... + ... + build_secret_config={ + "X-Api-Key": { + "value": "MY_COOL_ENV_VARIABLE", + "type": "environment" + } + } + ) + ``` + """ + + def __init__( + self, + build_kwargs: Dict[str, Any], + build_http_method: str, + get_flow_kwargs: Dict[str, Any], + get_flow_http_method: str, + build_secret_config: Optional[Dict[str, Any]] = None, + get_flow_secret_config: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + self.flows = dict() # type: Dict[str, str] + self._flows = dict() # type: Dict[str, "Flow"] + + # set up logic for authenticating with Saturn back-end service + retry_logic = HTTPAdapter(max_retries=Retry(total=3)) + self._session = Session() + self._session.mount("http://", retry_logic) + self._session.mount("https://", retry_logic) + + self._method_to_function = { + "GET": self._session.get, + "PATCH": self._session.patch, + "POST": self._session.post, + "PUT": self._session.put, + } + + if build_http_method not in self._method_to_function.keys(): + msg = "HTTP method '{}' not recognized".format(build_http_method) + self.logger.fatal(msg) + raise RuntimeError(msg) + + if get_flow_http_method not in self._method_to_function.keys(): + msg = "HTTP method '{}' not recognized".format(get_flow_http_method) + self.logger.fatal(msg) + raise RuntimeError(msg) + + self.build_kwargs = build_kwargs + self.build_http_method = build_http_method + self.build_secret_config = build_secret_config or {} + + self.get_flow_kwargs = get_flow_kwargs + self.get_flow_http_method = get_flow_http_method + self.get_flow_secret_config = get_flow_secret_config or self.build_secret_config + + self._build_responses: Optional[Dict[str, Response]] = None + + super().__init__(**kwargs) + + @property + def default_labels(self) -> List[str]: + return ["webhook-flow-storage"] + + def get_flow(self, flow_location: str = "placeholder") -> "Flow": + """ + Get the flow from storage. This method will call + `cloudpickle.loads()` on the binary content of the flow, so it + shuould only be called in an environment with all of the flow's + dependencies. + + Args: + - flow_location (str): This argument is included to comply with the + interface used by other storage objects, but it has no meaning + for `WebHook` storage, since `WebHook` only corresponds to a + single flow. Ignore it. + + Raises: + - requests.exceptions.HTTPError if getting the flow fails + """ + self.logger.info("Retrieving flow") + req_function = self._method_to_function[self.get_flow_http_method] + + get_flow_kwargs = deepcopy(self.get_flow_kwargs) + get_flow_kwargs["headers"] = self._render_headers( + headers=get_flow_kwargs.get("headers", {}), + secret_config=self.get_flow_secret_config, + ) + + response = req_function(**get_flow_kwargs) + response.raise_for_status() + + return cloudpickle.loads(response.content) + + def add_flow(self, flow: "Flow") -> str: + """ + Method for adding a flow to a `Storage` object's in-memory + storage. `.build()` will look here for flows. + + `WebHook` storage only supports a single flow per storage + object, so this method will overwrite any existing flows + stored in an instance. + + Args: + - flow (Flow): a Prefect Flow to add + + Returns: + - str: the name of the flow + """ + self.flows = {flow.name: flow.name} + self._flows = {flow.name: flow} + return flow.name + + def build(self) -> "WebHook": + """ + Build the WebHook storage object by issuing an HTTP request + to store the flow. + + The response from this request is stored in `._build_responses`, + a dictionary keyed by flow name. + + Returns: + - Storage: a WebHook storage object + + Raises: + - requests.exceptions.HTTPError if pushing the flow fails + """ + self.run_basic_healthchecks() + self._build_responses = {} + + for flow_name, flow in self._flows.items(): + self.logger.info("Uploading flow '{}'".format(flow_name)) + + data = cloudpickle.dumps(flow) + + req_function = self._method_to_function[self.build_http_method] + + build_kwargs = deepcopy(self.build_kwargs) + build_kwargs["headers"] = self._render_headers( + headers=build_kwargs.get("headers", {}), + secret_config=self.build_secret_config, + ) + + if "data" in build_kwargs.keys(): + msg = ( + "'data' found in build_kwargs. This value is overwritten with " + "the flow content and should not be set directly" + ) + self.logger.warning(msg) + warnings.warn(msg, RuntimeWarning) + build_kwargs["data"] = data + + response = req_function(**build_kwargs) + response.raise_for_status() + + self._build_responses[flow_name] = response + self.logger.info("Successfully uploaded flow '{}'".format(flow_name)) + + return self + + def __contains__(self, obj: Any) -> bool: + """ + Method for determining whether an object is + contained within this storage. + """ + if not isinstance(obj, str): + return False + return obj in self.flows + + @staticmethod + def _render_headers( + headers: Dict[str, Any], secret_config: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Given a dictionary of headers, add additional headers with values + resolved froom environment variables or Prefect Cloud secrets. + + Args: + - headers (dict): A dictionary of headers. + - secret_config (dict): A secret config. See `help(WebHook)` for + details on how this should be constructed. + """ + out_headers = deepcopy(headers) + for header, details in secret_config.items(): + name = details["value"] + if details["type"] == "environment": + out_headers[header] = os.environ[name] + elif details["type"] == "secret": + out_headers[header] = Secret(name).get() + return out_headers diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index c79dd92a66ea..4a082653d504 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -2,7 +2,16 @@ from marshmallow import fields, post_load -from prefect.environments.storage import GCS, S3, Azure, Docker, Local, Storage, GitHub +from prefect.environments.storage import ( + GCS, + S3, + Azure, + Docker, + Local, + Storage, + GitHub, + WebHook, +) from prefect.utilities.serialization import JSONCompatible, ObjectSchema, OneOfSchema @@ -126,6 +135,27 @@ def create_object(self, data: dict, **kwargs: Any) -> GitHub: return base_obj +class WebHookSchema(ObjectSchema): + class Meta: + object_class = WebHook + + build_kwargs = fields.Dict(key=fields.Str, allow_none=False) + build_http_method = fields.String(allow_none=False) + get_flow_kwargs = fields.Dict(key=fields.Str, allow_none=False) + get_flow_http_method = fields.String(allow_none=False) + build_secret_config = fields.Dict(key=fields.Str, allow_none=False) + get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False) + flows = fields.Dict(key=fields.Str(), values=fields.Str()) + secrets = fields.List(fields.Str(), allow_none=True) + + @post_load + def create_object(self, data: dict, **kwargs: Any) -> GitHub: + flows = data.pop("flows", dict()) + base_obj = super().create_object(data) + base_obj.flows = flows + return base_obj + + class StorageSchema(OneOfSchema): """ Field that chooses between several nested schemas @@ -140,4 +170,5 @@ class StorageSchema(OneOfSchema): "Storage": BaseStorageSchema, "S3": S3Schema, "GitHub": GitHubSchema, + "WebHook": WebHookSchema, } diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py new file mode 100644 index 000000000000..339448a038cd --- /dev/null +++ b/tests/environments/storage/test_webhook_storage.py @@ -0,0 +1,221 @@ +import cloudpickle +import pytest +import random +import uuid + +from requests.exceptions import HTTPError +from typing import Any, Dict, Optional + +from prefect import task, Flow +from prefect.environments.storage import WebHook + + +@pytest.fixture +def sample_flow(): + @task + def random_number(): + return random.randint(0, 100) + + with Flow("test-flow") as flow: + random_number() + + return flow + + +class _MockResponse: + """ + This class is a minimal mock of `requests.models.Response`. + Other mocking functions created in the tests below use this + to mock responses from services. + """ + + def __init__( + self, + status_code: int, + json: Optional[Dict[str, Any]] = None, + content: Optional[bytes] = None, + ): + self.status_code = status_code + self.json_content = json or {} + self.content = content + + def raise_for_status(self, *args, **kwargs) -> None: + if (self.status_code // 100) != 2: + raise HTTPError("test-error-message") + else: + RuntimeError("blegh") + + +def test_create_webhook_storage(): + build_kwargs = {"url": "https://content.dropboxapi.com/2/files/upload"} + get_flow_kwargs = {"url": "https://content.dropboxapi.com/2/files/download"} + storage = WebHook( + build_kwargs=build_kwargs, + build_http_method="PATCH", + get_flow_kwargs=get_flow_kwargs, + get_flow_http_method="GET", + ) + assert storage + assert storage.logger + assert storage.build_kwargs == build_kwargs + assert storage.build_http_method == "PATCH" + assert storage.build_secret_config == {} + assert storage.get_flow_kwargs == get_flow_kwargs + assert storage.get_flow_http_method == "GET" + assert storage.get_flow_secret_config == {} + assert storage.secrets == [] + assert storage.default_labels == ["webhook-flow-storage"] + + +def test_all_valid_http_verb_combinations_work(): + possible_verbs = ["GET", "PATCH", "POST", "PUT"] + for build_verb in possible_verbs: + for get_verb in possible_verbs: + storage = WebHook( + build_kwargs={"url": "whatever"}, + build_http_method=build_verb, + get_flow_kwargs={"url": "whatever"}, + get_flow_http_method=get_verb, + ) + assert storage.build_http_method == build_verb + assert storage.get_flow_http_method == get_verb + + +def test_webhook_fails_for_bad_build_http_method(): + with pytest.raises(RuntimeError, match="HTTP method 'PASTA' not recognized"): + WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="PASTA", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="POST", + ) + + +def test_webhook_fails_for_bad_get_flow_http_method(): + with pytest.raises(RuntimeError, match="HTTP method 'BET' not recognized"): + WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="BET", + ) + + +def test_add_flow_and_contains_work_as_expected(sample_flow): + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + assert sample_flow.name not in webhook + out = webhook.add_flow(sample_flow) + assert isinstance(out, str) + assert sample_flow.name in webhook + assert str(uuid.uuid4()) not in webhook + + +def test_webhook_build_works_with_no_arguments(sample_flow): + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + def _mock_successful_get(*args, **kwargs): + return _MockResponse( + status_code=200, json={}, content=cloudpickle.dumps(sample_flow) + ) + + def _mock_successful_post(*args, **kwargs): + return _MockResponse(status_code=200, json={"id": "abc"}) + + webhook._method_to_function = { + "GET": _mock_successful_get, + "POST": _mock_successful_post, + } + webhook.add_flow(sample_flow) + + res = webhook.build() + assert isinstance(res, WebHook) + + res = webhook.get_flow() + assert isinstance(res, Flow) + + +def test_webhook_raises_warning_if_data_in_build_kwargs(sample_flow): + webhook = WebHook( + build_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "data": cloudpickle.dumps(sample_flow), + }, + build_http_method="POST", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + def _mock_successful_get(*args, **kwargs): + return _MockResponse( + status_code=200, json={}, content=cloudpickle.dumps(sample_flow) + ) + + def _mock_successful_post(*args, **kwargs): + return _MockResponse(status_code=200, json={"id": "abc"}) + + webhook._method_to_function = { + "GET": _mock_successful_get, + "POST": _mock_successful_post, + } + webhook.add_flow(sample_flow) + + with pytest.warns( + RuntimeWarning, match="flow content and should not be set directly" + ): + res = webhook.build() + assert isinstance(res, WebHook) + + +def test_webhook_raises_error_on_build_failure(sample_flow): + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + def _mock_failed_post(*args, **kwargs): + return _MockResponse(status_code=404, json={"id": "abc"}) + + webhook._method_to_function = { + "POST": _mock_failed_post, + } + webhook.add_flow(sample_flow) + + with pytest.raises(HTTPError, match="test-error-message"): + webhook.build() + + +def test_webhook_raises_error_on_get_flow_failure(sample_flow): + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + def _mock_failed_get(*args, **kwargs): + return _MockResponse(status_code=500, json={}) + + def _mock_successful_post(*args, **kwargs): + return _MockResponse(status_code=200, json={"id": "abc"}) + + webhook._method_to_function = { + "GET": _mock_failed_get, + "POST": _mock_successful_post, + } + webhook.add_flow(sample_flow) + res = webhook.build() + + with pytest.raises(HTTPError, match="test-error-message"): + webhook.get_flow() diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index e4f5f97125e9..8aa85bfdddbf 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -1,3 +1,4 @@ +import json import os import socket import tempfile @@ -13,6 +14,7 @@ GCSSchema, LocalSchema, S3Schema, + WebHookSchema, ) @@ -298,3 +300,84 @@ def test_gcs_serialize_with_flows(): deserialized = GCSSchema().load(serialized) assert f.name in deserialized assert deserialized.secrets == ["CREDS"] + + +def test_webhook_full_serialize(): + test_file = "/Apps/test-app.flow" + content_type = "application/octet-stream" + base_url = "https://content.dropboxapi.com/2/files" + build_url = f"{base_url}/upload" + get_url = f"{base_url}/download" + + webhook = storage.WebHook( + build_kwargs={ + "url": build_url, + "headers": { + "Content-Type": content_type, + "Dropbox-API-Arg": json.dumps({"path": test_file}), + }, + }, + build_http_method="POST", + get_flow_kwargs={ + "url": get_url, + "headers": { + "Content-Type": content_type, + "Dropbox-API-Arg": json.dumps({"path": test_file}), + }, + }, + get_flow_http_method="POST", + build_secret_config={ + "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} + }, + secrets=["CREDS"], + ) + f = prefect.Flow("test") + webhook.flows["test"] = "key" + + serialized = WebHookSchema().dump(webhook) + + assert serialized + assert serialized["__version__"] == prefect.__version__ + assert serialized["secrets"] == ["CREDS"] + assert serialized["build_kwargs"] == { + "url": build_url, + "headers": { + "Content-Type": content_type, + "Dropbox-API-Arg": json.dumps({"path": test_file}), + }, + } + assert serialized["build_http_method"] == "POST" + assert serialized["get_flow_kwargs"] == { + "url": get_url, + "headers": { + "Content-Type": content_type, + "Dropbox-API-Arg": json.dumps({"path": test_file}), + }, + } + assert serialized["get_flow_http_method"] == "POST" + assert serialized["build_secret_config"] == { + "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} + } + assert serialized["build_secret_config"] == serialized["get_flow_secret_config"] + + +def test_webhook_different_secret_configs(): + build_config = { + "Authorization": {"value": "WRITE_ONLY_TOKEN", "type": "environment"} + } + get_flow_config = {"Authorization": {"value": "READ_ONLY_TOKEN", "type": "secret"}} + webhook = storage.WebHook( + build_kwargs={}, + build_http_method="POST", + get_flow_kwargs={}, + get_flow_http_method="POST", + build_secret_config=build_config, + get_flow_secret_config=get_flow_config, + ) + f = prefect.Flow("test") + webhook.flows["test"] = "key" + + serialized = WebHookSchema().dump(webhook) + assert serialized["build_secret_config"] == build_config + assert serialized["get_flow_secret_config"] == get_flow_config + assert serialized["build_secret_config"] != serialized["get_flow_secret_config"] From 9bafd61c0da7d5d5b4854f69743a7addac172cb9 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sun, 19 Jul 2020 23:47:59 -0500 Subject: [PATCH 02/14] update LocalAgent tests --- .../execution/storage_options.md | 53 ++++++++++++++ src/prefect/agent/local/agent.py | 6 +- src/prefect/environments/storage/webhook.py | 10 +-- tests/agent/test_local_agent.py | 32 +++++++- .../storage/test_webhook_storage.py | 73 ++++++++++++++++++- tests/serialization/test_storage.py | 4 +- 6 files changed, 167 insertions(+), 11 deletions(-) diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index a851323df379..dd9ec2254f11 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -140,6 +140,59 @@ If you do not specify a `registry_url` for your Docker Storage then the image wi Docker Storage uses the [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/index.html) to build the image and push to a registry. Make sure you have the Docker daemon running locally and you are configured to push to your desired container registry. Additionally make sure whichever platform Agent deploys the container also has permissions to pull from that same registry. ::: +## WebHook + +[WebHook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services. + +For example, the following code could be used to store flows in DropBox. + +```python +from prefect import Flow +from prefect.environments.storage import WebHook + +flow = Flow( + "dropbox-flow", + storage=WebHoook( + build_kwargs={ + "url": "https://content.dropboxapi.com/2/files/upload", + "headers": { + "Content-Type": "application/octet-stream", + "Dropbox-API-Arg": json.dumps( + { + "path": "/Apps/prefect-test-app/dropbox-flow.flow", + "mode": "overwrite", + "autorename": False, + "strict_conflict": True, + } + ), + }, + }, + build_http_method="POST", + get_flow_kwargs={ + "url": "https://content.dropboxapi.com/2/files/download", + "headers": { + "Accept": "application/octet-stream", + "Dropbox-API-Arg": json.dumps( + {"path": "/Apps/prefect-test-app/dropbox-flow.flow"} + ), + }, + }, + get_flow_http_method="POST", + build_secret_config={ + "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} + }, + ) +) + +flow.storage.build() +``` + +The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html). + +::: tip Sensible Defaults +Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `WebHook` storage. +::: + ### Non-Docker Storage for Containerized Environments Prefect allows for flows to be stored in cloud storage services and executed in containerized environments. This has the added benefit of rapidly deploying new versions of flows without having to rebuild images each time. To enable this functionality add an image name to the flow's Environment metadata. diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index 30447b28d707..a880c3a2280a 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -6,7 +6,7 @@ from prefect import config from prefect.agent import Agent -from prefect.environments.storage import GCS, S3, Azure, Local, GitHub +from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, WebHook from prefect.serialization.storage import StorageSchema from prefect.utilities.graphql import GraphQLResult @@ -91,6 +91,7 @@ def __init__( "gcs-flow-storage", "s3-flow-storage", "github-flow-storage", + "webhook-flow-storage", ] ) @@ -125,7 +126,8 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str: ) if not isinstance( - StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3, GitHub) + StorageSchema().load(flow_run.flow.storage), + (Local, Azure, GCS, S3, GitHub, WebHook), ): self.logger.error( "Storage for flow run {} is not a supported type.".format(flow_run.id) diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 300417123346..b98a5bd0e21f 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -4,9 +4,6 @@ # until after build() is done # * test with Google Drive # * test with something custom on API Gateway -# * test that creds work with Prefect Cloud secrets -# * test that creds work with environment variables -# * different secret_config for get_flow() vs. build() # * add to all the relevant docs import cloudpickle @@ -82,7 +79,7 @@ class WebHook(Storage): ... build_secret_config={ "X-Api-Key": { - "value": "MY_COOL_ENV_VARIABLE", + "name": "MY_COOL_ENV_VARIABLE", "type": "environment" } } @@ -259,10 +256,13 @@ def _render_headers( - headers (dict): A dictionary of headers. - secret_config (dict): A secret config. See `help(WebHook)` for details on how this should be constructed. + Raises: + - KeyError if referencing an environment variable that does not exist + - ValueError if referencing a Secret that does not exist """ out_headers = deepcopy(headers) for header, details in secret_config.items(): - name = details["value"] + name = details["name"] if details["type"] == "environment": out_headers[header] = os.environ[name] elif details["type"] == "secret": diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index fd1c2e5fbf1b..a7254d0b3106 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -7,7 +7,7 @@ from testfixtures import compare, LogCapture from prefect.agent.local import LocalAgent -from prefect.environments.storage import Docker, Local, Azure, GCS, S3 +from prefect.environments.storage import Docker, Local, Azure, GCS, S3, WebHook from prefect.utilities.configuration import set_temporary_config from prefect.utilities.graphql import GraphQLResult @@ -21,6 +21,7 @@ def test_local_agent_init(runner_token): "s3-flow-storage", "gcs-flow-storage", "github-flow-storage", + "webhook-flow-storage", } assert agent.name == "agent" @@ -46,6 +47,7 @@ def test_local_agent_config_options(runner_token): "s3-flow-storage", "gcs-flow-storage", "github-flow-storage", + "webhook-flow-storage", "test_label", } @@ -71,6 +73,7 @@ def test_local_agent_config_options_hostname(runner_token): "s3-flow-storage", "gcs-flow-storage", "github-flow-storage", + "webhook-flow-storage", } @@ -140,6 +143,7 @@ def test_populate_env_vars(runner_token): "gcs-flow-storage", "s3-flow-storage", "github-flow-storage", + "webhook-flow-storage", ] ), "PREFECT__CONTEXT__FLOW_RUN_ID": "id", @@ -180,6 +184,7 @@ def test_populate_env_vars_includes_agent_labels(runner_token): "gcs-flow-storage", "s3-flow-storage", "github-flow-storage", + "webhook-flow-storage", ] ), "PREFECT__CONTEXT__FLOW_RUN_ID": "id", @@ -278,6 +283,31 @@ def test_local_agent_deploy_processes_azure_storage(monkeypatch, runner_token): assert len(agent.processes) == 1 +def test_local_agent_deploy_processes_webhook_storage(monkeypatch, runner_token): + + popen = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.Popen", popen) + + agent = LocalAgent() + webhook = WebHook( + build_kwargs={"url": "test-service/upload"}, + build_http_method="POST", + get_flow_kwargs={"url": "test-service/download"}, + get_flow_http_method="GET", + ) + agent.deploy_flow( + flow_run=GraphQLResult( + { + "flow": GraphQLResult({"storage": webhook.serialize(), "id": "foo"}), + "id": "id", + } + ) + ) + + assert popen.called + assert len(agent.processes) == 1 + + def test_local_agent_deploy_storage_raises_not_supported_storage( monkeypatch, runner_token ): diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index 339448a038cd..375000bf0f94 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -6,6 +6,7 @@ from requests.exceptions import HTTPError from typing import Any, Dict, Optional +from prefect import context from prefect import task, Flow from prefect.environments.storage import WebHook @@ -215,7 +216,77 @@ def _mock_successful_post(*args, **kwargs): "POST": _mock_successful_post, } webhook.add_flow(sample_flow) - res = webhook.build() + webhook.build() with pytest.raises(HTTPError, match="test-error-message"): webhook.get_flow() + + +def test_render_headers_gets_env_variables(monkeypatch): + some_cred = str(uuid.uuid4()) + another_secret = str(uuid.uuid4()) + monkeypatch.setenv("SOME_CRED", some_cred) + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + build_secret_config={ + "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, + "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, + }, + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + # set a local secret + context.setdefault("secrets", {}) + context.secrets["ANOTHER_SECRET"] = another_secret + + initial_headers = {"X-Api-Key": "abc"} + new_headers = webhook._render_headers( + headers=initial_headers, secret_config=webhook.build_secret_config + ) + + # _render_headers should not have side effects + assert initial_headers == {"X-Api-Key": "abc"} + + # env variables and secrets should have been filled in + assert new_headers["X-Api-Key"] == some_cred + assert new_headers["X-Custom-Key"] == another_secret + + +def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch): + monkeypatch.delenv("SOME_CRED", raising=False) + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + build_secret_config={ + "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, + }, + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + with pytest.raises(KeyError, match="SOME_CRED"): + initial_headers = {"X-Api-Key": "abc"} + webhook._render_headers( + headers=initial_headers, secret_config=webhook.build_secret_config + ) + + +def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch): + monkeypatch.delenv("ANOTHER_SECRET", raising=False) + webhook = WebHook( + build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_http_method="POST", + build_secret_config={ + "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, + }, + get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_http_method="GET", + ) + + with pytest.raises(ValueError, match='Local Secret "ANOTHER_SECRET" was not found'): + initial_headers = {"X-Api-Key": "abc"} + webhook._render_headers( + headers=initial_headers, secret_config=webhook.build_secret_config + ) diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index 8aa85bfdddbf..b1539b15f92c 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -321,7 +321,7 @@ def test_webhook_full_serialize(): get_flow_kwargs={ "url": get_url, "headers": { - "Content-Type": content_type, + "Accept": content_type, "Dropbox-API-Arg": json.dumps({"path": test_file}), }, }, @@ -350,7 +350,7 @@ def test_webhook_full_serialize(): assert serialized["get_flow_kwargs"] == { "url": get_url, "headers": { - "Content-Type": content_type, + "Accept": content_type, "Dropbox-API-Arg": json.dumps({"path": test_file}), }, } From 32ff9e37337b9ef523ce8d372f4930101032c9d9 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 20 Jul 2020 00:12:15 -0500 Subject: [PATCH 03/14] cleaning up docs --- src/prefect/environments/storage/webhook.py | 90 ++++++++++++++++----- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index b98a5bd0e21f..76d889b5bac5 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -1,11 +1,3 @@ -# TODO -# -# * figure out how to work with systems where the ID isn't known -# until after build() is done -# * test with Google Drive -# * test with something custom on API Gateway -# * add to all the relevant docs - import cloudpickle import os import warnings @@ -27,14 +19,24 @@ class WebHook(Storage): """ + WebHook storage class. This class represents the Storage interface for + Flows stored and retrieved with HTTP requests. + + This storage class takes in keyword arguments which describe how to + create the requests, including information on how to fill in headers + from environment variable or Prefect Cloud secrets. + + **Note**: Flows registered with this Storage option will automatically be + labeled with `webhook-flow-storage`. + Args: - build_kwargs (dict): Dictionary of keyword arguments to the - function from ``requests`` used to store the flow. Do not supply + function from `requests` used to store the flow. Do not supply `"data"` to this argument, as it will be overwritten with the flow's content when `.build()` is run. - build_http_method (str): HTTP method identifying the type of request - to execute when storing the flow. For example, ``"POST"`` for - ``requests.post()``. + to execute when storing the flow. For example, `"POST"` for + `requests.post()`. - build_secret_config (dict): A dictionary describing how to set request headers from environment variables or Prefect Cloud secrets. See example for details on specifying this. This config @@ -42,14 +44,16 @@ class WebHook(Storage): used for `.get_flow()` unless you explicitly set `get_flow_secret_config`. - get_flow_kwargs (dict): Dictionary of keyword arguments to the - function from ``requests`` used to retrieve the flow. + function from `requests` used to retrieve the flow. - get_flow_http_method (str): HTTP method identifying the type of - request to execute when storing the flow. For example, ``"GET"`` - for ``requests.post()``. + request to execute when storing the flow. For example, `"GET"` + for `requests.post()`. - get_flow_secret_config (dict): Similar to `build_secret_config`, but used for the request in `.get_flow()`. By default, the config passed to `build_secret_config` will be used for `.get_flow()` - as well. Pass a value to this argument to override that behavior. + as well. Pass a value to this argument only if you want to use a + different config for `.get_flow()` than the one used for + `.build()`. Passing sensitive data in headers --------------------------------- @@ -60,16 +64,16 @@ class WebHook(Storage): This should be a dictionary whose keys are headers, and whose values indicate whether to retrieve real values from environment - variables (``"type": "environment"``) or - Prefect secrets (``"type": "secret"``). + variables (`"type": "environment"`) or Prefect Cloud secrets + (`"type": "secret"`). So, for example, to get an API key from an environment variable you can do the following ```python - storage = Webhoook( + storage = Webhook( build_kwargs={ - "url": "some-random-service.place.thing", + "url": "some-service", "headers" = { "Content-Type" = "application/octet-stream" } @@ -194,7 +198,49 @@ def build(self) -> "WebHook": to store the flow. The response from this request is stored in `._build_responses`, - a dictionary keyed by flow name. + a dictionary keyed by flow name. If you are using a service where + all the details necessary to fetch a flow cannot be known until you've + stored it, you can do something like the following. + + ```python + import cloudpickle + import json + import os + import random + import requests + + from prefect import task, Task, Flow + from prefect.environments.storage import WebHook + + @task + def random_number(): + return random.randint(0, 100) + with Flow("test-flow") as flow: + random_number() + + + flow.storage = WebHook( + build_kwargs={ + "url": "some-service/upload", + "headers": {"Content-Type": "application/octet-stream"}, + }, + build_http_method="POST", + get_flow_kwargs={ + "url": "some-service/download", + "headers": {"Accept": "application/octet-stream"}, + }, + get_flow_http_method="GET", + ) + + flow.storage.add_flow(flow) + res = flow.storage.build() + + # get the ID from the response + flow_id = res._build_responses[flow.name].json()["id"] + + # update storage + flow.storage.get_flow_kwargs["url"] = f"{GET_ROUTE}/{flow_id}" + ``` Returns: - Storage: a WebHook storage object @@ -220,8 +266,8 @@ def build(self) -> "WebHook": if "data" in build_kwargs.keys(): msg = ( - "'data' found in build_kwargs. This value is overwritten with " - "the flow content and should not be set directly" + "'data' found in build_kwargs. This value is overwritten " + "with the flow content and should not be set directly" ) self.logger.warning(msg) warnings.warn(msg, RuntimeWarning) From 8ca588df58d85afd23cd628eb7a3016552c074b8 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 20 Jul 2020 00:44:54 -0500 Subject: [PATCH 04/14] fix typehint and add changes entry --- changes/pr3000.yaml | 5 +++++ src/prefect/serialization/storage.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 changes/pr3000.yaml diff --git a/changes/pr3000.yaml b/changes/pr3000.yaml new file mode 100644 index 000000000000..adb85bb7bda2 --- /dev/null +++ b/changes/pr3000.yaml @@ -0,0 +1,5 @@ +enhancement: + - "Add WebHook storage - [#3000](https://github.com/PrefectHQ/prefect/pull/3000)" + +contributor: + - "[James Lamb](https://github.com/jameslamb)" diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 4a082653d504..35ce09e04791 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -149,7 +149,7 @@ class Meta: secrets = fields.List(fields.Str(), allow_none=True) @post_load - def create_object(self, data: dict, **kwargs: Any) -> GitHub: + def create_object(self, data: dict, **kwargs: Any) -> WebHook: flows = data.pop("flows", dict()) base_obj = super().create_object(data) base_obj.flows = flows From b4f5537e0d390eef57f4b72eafeb28bad6896173 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 20 Jul 2020 01:18:09 -0500 Subject: [PATCH 05/14] tell mypy to trust me --- src/prefect/environments/storage/webhook.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 76d889b5bac5..3eec29c23d39 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -168,7 +168,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": secret_config=self.get_flow_secret_config, ) - response = req_function(**get_flow_kwargs) + response = req_function(**get_flow_kwargs) # type: ignore response.raise_for_status() return cloudpickle.loads(response.content) @@ -273,7 +273,7 @@ def random_number(): warnings.warn(msg, RuntimeWarning) build_kwargs["data"] = data - response = req_function(**build_kwargs) + response = req_function(**build_kwargs) # type: ignore response.raise_for_status() self._build_responses[flow_name] = response From e98b62f7edff43f067bcf70f64906ba504fd4a9f Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 20 Jul 2020 01:21:52 -0500 Subject: [PATCH 06/14] add test for uncovered line --- tests/environments/storage/test_webhook_storage.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index 375000bf0f94..e0e524d9c21c 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -115,6 +115,9 @@ def test_add_flow_and_contains_work_as_expected(sample_flow): assert sample_flow.name in webhook assert str(uuid.uuid4()) not in webhook + # should return False if input is not a string + assert sample_flow not in webhook + def test_webhook_build_works_with_no_arguments(sample_flow): webhook = WebHook( From 1a3b94d91a88a2d3968010600fe7ff5a05f28a7c Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 20 Jul 2020 01:26:22 -0500 Subject: [PATCH 07/14] fix typos --- docs/orchestration/execution/storage_options.md | 2 +- src/prefect/environments/storage/webhook.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index dd9ec2254f11..e3951dc37514 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -152,7 +152,7 @@ from prefect.environments.storage import WebHook flow = Flow( "dropbox-flow", - storage=WebHoook( + storage=WebHook( build_kwargs={ "url": "https://content.dropboxapi.com/2/files/upload", "headers": { diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 3eec29c23d39..3aa9b2d74c8f 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -40,7 +40,7 @@ class WebHook(Storage): - build_secret_config (dict): A dictionary describing how to set request headers from environment variables or Prefect Cloud secrets. See example for details on specifying this. This config - applies to tthe request issued by `.build()`, and wiill also be + applies to the request issued by `.build()`, and will also be used for `.get_flow()` unless you explicitly set `get_flow_secret_config`. - get_flow_kwargs (dict): Dictionary of keyword arguments to the @@ -147,7 +147,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": """ Get the flow from storage. This method will call `cloudpickle.loads()` on the binary content of the flow, so it - shuould only be called in an environment with all of the flow's + should only be called in an environment with all of the flow's dependencies. Args: @@ -296,7 +296,7 @@ def _render_headers( ) -> Dict[str, Any]: """ Given a dictionary of headers, add additional headers with values - resolved froom environment variables or Prefect Cloud secrets. + resolved from environment variables or Prefect Cloud secrets. Args: - headers (dict): A dictionary of headers. From 59bcc06dbf5e8db1fae1154d9b838ef20804f8b8 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 27 Jul 2020 00:40:21 -0500 Subject: [PATCH 08/14] add support for stored_as_script --- src/prefect/environments/storage/webhook.py | 46 ++++++++- src/prefect/serialization/storage.py | 1 + .../storage/test_webhook_storage.py | 99 +++++++++++++++++++ tests/serialization/test_storage.py | 1 + 4 files changed, 146 insertions(+), 1 deletion(-) diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 3aa9b2d74c8f..207d6a2a59b5 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -12,6 +12,7 @@ from prefect.client import Secret from prefect.environments.storage import Storage +from prefect.utilities.storage import extract_flow_from_file if TYPE_CHECKING: from prefect.core.flow import Flow @@ -54,6 +55,18 @@ class WebHook(Storage): as well. Pass a value to this argument only if you want to use a different config for `.get_flow()` than the one used for `.build()`. + - stored_as_script (bool, optional): boolean for specifying if the + flow has been stored as a `.py` file. Defaults to `False`. + - flow_script_path (str, optional): path to a local `.py` file that + defines the flow. You must pass a value to this argument if + `stored_as_script` is `True`. This script's content will be read + into a string and attached to the request in `build()` as UTF-8 + encoded binary data. Similarly, `.get_flow()` expects that the + script's contents will be returned as binary data. This path will + not be sent to Prefect Cloud and is only needed when running + `.build()`. + - **kwargs (Any, optional): any additional `Storage` initialization + options Passing sensitive data in headers --------------------------------- @@ -99,6 +112,8 @@ def __init__( get_flow_http_method: str, build_secret_config: Optional[Dict[str, Any]] = None, get_flow_secret_config: Optional[Dict[str, Any]] = None, + stored_as_script: bool = False, + flow_script_path: Optional[str] = None, **kwargs: Any, ) -> None: self.flows = dict() # type: Dict[str, str] @@ -127,6 +142,9 @@ def __init__( self.logger.fatal(msg) raise RuntimeError(msg) + self.stored_as_script = stored_as_script + self.flow_script_path = flow_script_path + self.build_kwargs = build_kwargs self.build_http_method = build_http_method self.build_secret_config = build_secret_config or {} @@ -137,7 +155,7 @@ def __init__( self._build_responses: Optional[Dict[str, Response]] = None - super().__init__(**kwargs) + super().__init__(stored_as_script=stored_as_script, **kwargs) @property def default_labels(self) -> List[str]: @@ -171,6 +189,10 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": response = req_function(**get_flow_kwargs) # type: ignore response.raise_for_status() + if self.stored_as_script: + flow_script_content = response.content.decode("utf-8") + return extract_flow_from_file(file_contents=flow_script_content) # type: ignore + return cloudpickle.loads(response.content) def add_flow(self, flow: "Flow") -> str: @@ -197,6 +219,10 @@ def build(self) -> "WebHook": Build the WebHook storage object by issuing an HTTP request to store the flow. + If `self.stored_as_script` is `True`, this method + will read in the contents of `self.flow_script_path`, convert it to + byes, and attach it to the request as `data`. + The response from this request is stored in `._build_responses`, a dictionary keyed by flow name. If you are using a service where all the details necessary to fetch a flow cannot be known until you've @@ -255,6 +281,24 @@ def random_number(): self.logger.info("Uploading flow '{}'".format(flow_name)) data = cloudpickle.dumps(flow) + if self.stored_as_script: + + # these checks are here in build() instead of the constructor + # so that serialization and deserialization of flows does not fail + if not self.flow_script_path: + msg = "flow_script_path must be provided if stored_as_script=True" + self.logger.fatal(msg) + raise RuntimeError(msg) + + if not os.path.isfile(self.flow_script_path): + msg = "file '{}' passed to flow_script_path does not exist".format( + self.flow_script_path + ) + self.logger.fatal(msg) + raise RuntimeError(msg) + + with open(self.flow_script_path, "r") as f: + data = f.read().encode("utf-8") req_function = self._method_to_function[self.build_http_method] diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 35ce09e04791..0bb3425093cb 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -145,6 +145,7 @@ class Meta: get_flow_http_method = fields.String(allow_none=False) build_secret_config = fields.Dict(key=fields.Str, allow_none=False) get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False) + stored_as_script = fields.Bool(allow_none=True) flows = fields.Dict(key=fields.Str(), values=fields.Str()) secrets = fields.List(fields.Str(), allow_none=True) diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index e0e524d9c21c..92a0088d2dca 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -1,4 +1,5 @@ import cloudpickle +import os import pytest import random import uuid @@ -66,6 +67,7 @@ def test_create_webhook_storage(): assert storage.get_flow_secret_config == {} assert storage.secrets == [] assert storage.default_labels == ["webhook-flow-storage"] + assert storage.stored_as_script is False def test_all_valid_http_verb_combinations_work(): @@ -293,3 +295,100 @@ def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch) webhook._render_headers( headers=initial_headers, secret_config=webhook.build_secret_config ) + + +def test_webhook_works_with_file_storage(sample_flow, tmpdir): + + script_file = os.path.join(tmpdir, "{}.py".format(str(uuid.uuid4()))) + script_contents = """from prefect import Flow\nf=Flow('test-flow')""" + with open(script_file, "w") as f: + f.write(script_contents) + + webhook = WebHook( + build_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"Content-Type": "application/octet-stream"}, + }, + build_http_method="POST", + get_flow_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"Accept": "application/octet-stream"}, + }, + get_flow_http_method="GET", + stored_as_script=True, + flow_script_path=script_file, + ) + + def _mock_successful_get(*args, **kwargs): + file_contents = """from prefect import Flow\nf=Flow('test-flow')""" + return _MockResponse( + status_code=200, json={}, content=file_contents.encode("utf-8") + ) + + def _mock_successful_post(*args, **kwargs): + return _MockResponse(status_code=200, json={"id": "abc"}) + + webhook._method_to_function = { + "GET": _mock_successful_get, + "POST": _mock_successful_post, + } + webhook.add_flow(sample_flow) + + res = webhook.build() + assert isinstance(res, WebHook) + + res = webhook.get_flow() + assert isinstance(res, Flow) + assert res.name == "test-flow" + + +def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flow): + + webhook = WebHook( + build_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"Content-Type": "application/octet-stream"}, + }, + build_http_method="POST", + get_flow_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"Accept": "application/octet-stream"}, + }, + get_flow_http_method="GET", + stored_as_script=True, + ) + + webhook.add_flow(sample_flow) + + error_msg = "flow_script_path must be provided if stored_as_script=True" + with pytest.raises(RuntimeError, match=error_msg): + res = webhook.build() + assert isinstance(res, WebHook) + + +def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist( + sample_flow, +): + + nonexistent_file = "{}.py".format(str(uuid.uuid4())) + webhook = WebHook( + build_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"Content-Type": "application/octet-stream"}, + }, + build_http_method="POST", + get_flow_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"Accept": "application/octet-stream"}, + }, + get_flow_http_method="GET", + stored_as_script=True, + flow_script_path=nonexistent_file, + ) + + webhook.add_flow(sample_flow) + + error_msg = "passed to flow_script_path does not exist" + with pytest.raises(RuntimeError, match=error_msg): + res = webhook.build() + assert isinstance(res, WebHook) diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index b1539b15f92c..f7f3e266da0e 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -359,6 +359,7 @@ def test_webhook_full_serialize(): "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} } assert serialized["build_secret_config"] == serialized["get_flow_secret_config"] + assert serialized["stored_as_script"] is False def test_webhook_different_secret_configs(): From 8c0e32dca59f926a2dfcbd977284f0e0d1c5590f Mon Sep 17 00:00:00 2001 From: James Lamb Date: Mon, 27 Jul 2020 00:54:20 -0500 Subject: [PATCH 09/14] WebHook -> Webhook --- changes/pr3000.yaml | 2 +- .../execution/storage_options.md | 10 ++--- src/prefect/agent/local/agent.py | 4 +- src/prefect/environments/storage/__init__.py | 2 +- src/prefect/environments/storage/webhook.py | 20 ++++----- src/prefect/serialization/storage.py | 10 ++--- tests/agent/test_local_agent.py | 4 +- .../storage/test_webhook_storage.py | 42 +++++++++---------- tests/serialization/test_storage.py | 10 ++--- 9 files changed, 52 insertions(+), 52 deletions(-) diff --git a/changes/pr3000.yaml b/changes/pr3000.yaml index adb85bb7bda2..29f930bc57d9 100644 --- a/changes/pr3000.yaml +++ b/changes/pr3000.yaml @@ -1,5 +1,5 @@ enhancement: - - "Add WebHook storage - [#3000](https://github.com/PrefectHQ/prefect/pull/3000)" + - "Add Webhook storage - [#3000](https://github.com/PrefectHQ/prefect/pull/3000)" contributor: - "[James Lamb](https://github.com/jameslamb)" diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index 7da0e9a96ee0..32db0f40e8bf 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -140,19 +140,19 @@ If you do not specify a `registry_url` for your Docker Storage then the image wi Docker Storage uses the [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/index.html) to build the image and push to a registry. Make sure you have the Docker daemon running locally and you are configured to push to your desired container registry. Additionally make sure whichever platform Agent deploys the container also has permissions to pull from that same registry. ::: -## WebHook +## Webhook -[WebHook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services. +[Webhook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services. For example, the following code could be used to store flows in DropBox. ```python from prefect import Flow -from prefect.environments.storage import WebHook +from prefect.environments.storage import Webhook flow = Flow( "dropbox-flow", - storage=WebHook( + storage=Webhook( build_kwargs={ "url": "https://content.dropboxapi.com/2/files/upload", "headers": { @@ -190,7 +190,7 @@ flow.storage.build() The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html). ::: tip Sensible Defaults -Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `WebHook` storage. +Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `Webhook` storage. ::: ### Non-Docker Storage for Containerized Environments diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index a880c3a2280a..10ddd43624d0 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -6,7 +6,7 @@ from prefect import config from prefect.agent import Agent -from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, WebHook +from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, Webhook from prefect.serialization.storage import StorageSchema from prefect.utilities.graphql import GraphQLResult @@ -127,7 +127,7 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str: if not isinstance( StorageSchema().load(flow_run.flow.storage), - (Local, Azure, GCS, S3, GitHub, WebHook), + (Local, Azure, GCS, S3, GitHub, Webhook), ): self.logger.error( "Storage for flow run {} is not a supported type.".format(flow_run.id) diff --git a/src/prefect/environments/storage/__init__.py b/src/prefect/environments/storage/__init__.py index f90ec5fc18c0..07da3c3d0f16 100644 --- a/src/prefect/environments/storage/__init__.py +++ b/src/prefect/environments/storage/__init__.py @@ -22,7 +22,7 @@ from prefect.environments.storage.gcs import GCS from prefect.environments.storage.s3 import S3 from prefect.environments.storage.github import GitHub -from prefect.environments.storage.webhook import WebHook +from prefect.environments.storage.webhook import Webhook def get_default_storage_class() -> type: diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 207d6a2a59b5..71b4a2e501e7 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -18,9 +18,9 @@ from prefect.core.flow import Flow -class WebHook(Storage): +class Webhook(Storage): """ - WebHook storage class. This class represents the Storage interface for + Webhook storage class. This class represents the Storage interface for Flows stored and retrieved with HTTP requests. This storage class takes in keyword arguments which describe how to @@ -171,7 +171,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": Args: - flow_location (str): This argument is included to comply with the interface used by other storage objects, but it has no meaning - for `WebHook` storage, since `WebHook` only corresponds to a + for `Webhook` storage, since `Webhook` only corresponds to a single flow. Ignore it. Raises: @@ -200,7 +200,7 @@ def add_flow(self, flow: "Flow") -> str: Method for adding a flow to a `Storage` object's in-memory storage. `.build()` will look here for flows. - `WebHook` storage only supports a single flow per storage + `Webhook` storage only supports a single flow per storage object, so this method will overwrite any existing flows stored in an instance. @@ -214,9 +214,9 @@ def add_flow(self, flow: "Flow") -> str: self._flows = {flow.name: flow} return flow.name - def build(self) -> "WebHook": + def build(self) -> "Webhook": """ - Build the WebHook storage object by issuing an HTTP request + Build the Webhook storage object by issuing an HTTP request to store the flow. If `self.stored_as_script` is `True`, this method @@ -236,7 +236,7 @@ def build(self) -> "WebHook": import requests from prefect import task, Task, Flow - from prefect.environments.storage import WebHook + from prefect.environments.storage import Webhook @task def random_number(): @@ -245,7 +245,7 @@ def random_number(): random_number() - flow.storage = WebHook( + flow.storage = Webhook( build_kwargs={ "url": "some-service/upload", "headers": {"Content-Type": "application/octet-stream"}, @@ -269,7 +269,7 @@ def random_number(): ``` Returns: - - Storage: a WebHook storage object + - Storage: a Webhook storage object Raises: - requests.exceptions.HTTPError if pushing the flow fails @@ -344,7 +344,7 @@ def _render_headers( Args: - headers (dict): A dictionary of headers. - - secret_config (dict): A secret config. See `help(WebHook)` for + - secret_config (dict): A secret config. See `help(Webhook)` for details on how this should be constructed. Raises: - KeyError if referencing an environment variable that does not exist diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 0bb3425093cb..55a0714538cc 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -10,7 +10,7 @@ Local, Storage, GitHub, - WebHook, + Webhook, ) from prefect.utilities.serialization import JSONCompatible, ObjectSchema, OneOfSchema @@ -135,9 +135,9 @@ def create_object(self, data: dict, **kwargs: Any) -> GitHub: return base_obj -class WebHookSchema(ObjectSchema): +class WebhookSchema(ObjectSchema): class Meta: - object_class = WebHook + object_class = Webhook build_kwargs = fields.Dict(key=fields.Str, allow_none=False) build_http_method = fields.String(allow_none=False) @@ -150,7 +150,7 @@ class Meta: secrets = fields.List(fields.Str(), allow_none=True) @post_load - def create_object(self, data: dict, **kwargs: Any) -> WebHook: + def create_object(self, data: dict, **kwargs: Any) -> Webhook: flows = data.pop("flows", dict()) base_obj = super().create_object(data) base_obj.flows = flows @@ -171,5 +171,5 @@ class StorageSchema(OneOfSchema): "Storage": BaseStorageSchema, "S3": S3Schema, "GitHub": GitHubSchema, - "WebHook": WebHookSchema, + "Webhook": WebhookSchema, } diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index a7254d0b3106..9a73baddc844 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -7,7 +7,7 @@ from testfixtures import compare, LogCapture from prefect.agent.local import LocalAgent -from prefect.environments.storage import Docker, Local, Azure, GCS, S3, WebHook +from prefect.environments.storage import Docker, Local, Azure, GCS, S3, Webhook from prefect.utilities.configuration import set_temporary_config from prefect.utilities.graphql import GraphQLResult @@ -289,7 +289,7 @@ def test_local_agent_deploy_processes_webhook_storage(monkeypatch, runner_token) monkeypatch.setattr("prefect.agent.local.agent.Popen", popen) agent = LocalAgent() - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "test-service/upload"}, build_http_method="POST", get_flow_kwargs={"url": "test-service/download"}, diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index 92a0088d2dca..df480671df5c 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -9,7 +9,7 @@ from prefect import context from prefect import task, Flow -from prefect.environments.storage import WebHook +from prefect.environments.storage import Webhook @pytest.fixture @@ -51,7 +51,7 @@ def raise_for_status(self, *args, **kwargs) -> None: def test_create_webhook_storage(): build_kwargs = {"url": "https://content.dropboxapi.com/2/files/upload"} get_flow_kwargs = {"url": "https://content.dropboxapi.com/2/files/download"} - storage = WebHook( + storage = Webhook( build_kwargs=build_kwargs, build_http_method="PATCH", get_flow_kwargs=get_flow_kwargs, @@ -74,7 +74,7 @@ def test_all_valid_http_verb_combinations_work(): possible_verbs = ["GET", "PATCH", "POST", "PUT"] for build_verb in possible_verbs: for get_verb in possible_verbs: - storage = WebHook( + storage = Webhook( build_kwargs={"url": "whatever"}, build_http_method=build_verb, get_flow_kwargs={"url": "whatever"}, @@ -86,7 +86,7 @@ def test_all_valid_http_verb_combinations_work(): def test_webhook_fails_for_bad_build_http_method(): with pytest.raises(RuntimeError, match="HTTP method 'PASTA' not recognized"): - WebHook( + Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="PASTA", get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, @@ -96,7 +96,7 @@ def test_webhook_fails_for_bad_build_http_method(): def test_webhook_fails_for_bad_get_flow_http_method(): with pytest.raises(RuntimeError, match="HTTP method 'BET' not recognized"): - WebHook( + Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, @@ -105,7 +105,7 @@ def test_webhook_fails_for_bad_get_flow_http_method(): def test_add_flow_and_contains_work_as_expected(sample_flow): - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, @@ -122,7 +122,7 @@ def test_add_flow_and_contains_work_as_expected(sample_flow): def test_webhook_build_works_with_no_arguments(sample_flow): - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, @@ -144,14 +144,14 @@ def _mock_successful_post(*args, **kwargs): webhook.add_flow(sample_flow) res = webhook.build() - assert isinstance(res, WebHook) + assert isinstance(res, Webhook) res = webhook.get_flow() assert isinstance(res, Flow) def test_webhook_raises_warning_if_data_in_build_kwargs(sample_flow): - webhook = WebHook( + webhook = Webhook( build_kwargs={ "url": "https://content.dropboxapi.com/2/files", "data": cloudpickle.dumps(sample_flow), @@ -179,11 +179,11 @@ def _mock_successful_post(*args, **kwargs): RuntimeWarning, match="flow content and should not be set directly" ): res = webhook.build() - assert isinstance(res, WebHook) + assert isinstance(res, Webhook) def test_webhook_raises_error_on_build_failure(sample_flow): - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, @@ -203,7 +203,7 @@ def _mock_failed_post(*args, **kwargs): def test_webhook_raises_error_on_get_flow_failure(sample_flow): - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, @@ -231,7 +231,7 @@ def test_render_headers_gets_env_variables(monkeypatch): some_cred = str(uuid.uuid4()) another_secret = str(uuid.uuid4()) monkeypatch.setenv("SOME_CRED", some_cred) - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", build_secret_config={ @@ -261,7 +261,7 @@ def test_render_headers_gets_env_variables(monkeypatch): def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch): monkeypatch.delenv("SOME_CRED", raising=False) - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", build_secret_config={ @@ -280,7 +280,7 @@ def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch): monkeypatch.delenv("ANOTHER_SECRET", raising=False) - webhook = WebHook( + webhook = Webhook( build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, build_http_method="POST", build_secret_config={ @@ -304,7 +304,7 @@ def test_webhook_works_with_file_storage(sample_flow, tmpdir): with open(script_file, "w") as f: f.write(script_contents) - webhook = WebHook( + webhook = Webhook( build_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Content-Type": "application/octet-stream"}, @@ -335,7 +335,7 @@ def _mock_successful_post(*args, **kwargs): webhook.add_flow(sample_flow) res = webhook.build() - assert isinstance(res, WebHook) + assert isinstance(res, Webhook) res = webhook.get_flow() assert isinstance(res, Flow) @@ -344,7 +344,7 @@ def _mock_successful_post(*args, **kwargs): def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flow): - webhook = WebHook( + webhook = Webhook( build_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Content-Type": "application/octet-stream"}, @@ -363,7 +363,7 @@ def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flo error_msg = "flow_script_path must be provided if stored_as_script=True" with pytest.raises(RuntimeError, match=error_msg): res = webhook.build() - assert isinstance(res, WebHook) + assert isinstance(res, Webhook) def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist( @@ -371,7 +371,7 @@ def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist( ): nonexistent_file = "{}.py".format(str(uuid.uuid4())) - webhook = WebHook( + webhook = Webhook( build_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Content-Type": "application/octet-stream"}, @@ -391,4 +391,4 @@ def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist( error_msg = "passed to flow_script_path does not exist" with pytest.raises(RuntimeError, match=error_msg): res = webhook.build() - assert isinstance(res, WebHook) + assert isinstance(res, Webhook) diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index f7f3e266da0e..6fc3d1c43a19 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -14,7 +14,7 @@ GCSSchema, LocalSchema, S3Schema, - WebHookSchema, + WebhookSchema, ) @@ -309,7 +309,7 @@ def test_webhook_full_serialize(): build_url = f"{base_url}/upload" get_url = f"{base_url}/download" - webhook = storage.WebHook( + webhook = storage.Webhook( build_kwargs={ "url": build_url, "headers": { @@ -334,7 +334,7 @@ def test_webhook_full_serialize(): f = prefect.Flow("test") webhook.flows["test"] = "key" - serialized = WebHookSchema().dump(webhook) + serialized = WebhookSchema().dump(webhook) assert serialized assert serialized["__version__"] == prefect.__version__ @@ -367,7 +367,7 @@ def test_webhook_different_secret_configs(): "Authorization": {"value": "WRITE_ONLY_TOKEN", "type": "environment"} } get_flow_config = {"Authorization": {"value": "READ_ONLY_TOKEN", "type": "secret"}} - webhook = storage.WebHook( + webhook = storage.Webhook( build_kwargs={}, build_http_method="POST", get_flow_kwargs={}, @@ -378,7 +378,7 @@ def test_webhook_different_secret_configs(): f = prefect.Flow("test") webhook.flows["test"] = "key" - serialized = WebHookSchema().dump(webhook) + serialized = WebhookSchema().dump(webhook) assert serialized["build_secret_config"] == build_config assert serialized["get_flow_secret_config"] == get_flow_config assert serialized["build_secret_config"] != serialized["get_flow_secret_config"] From 495372f8eb40a8dc9eef2022e4aa5fbade718f4d Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 28 Jul 2020 15:44:37 -0500 Subject: [PATCH 10/14] fix tests, merge master --- tests/agent/test_local_agent.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index c3fb6502c275..804a4d42c2d7 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -35,6 +35,7 @@ def test_local_agent_deduplicates_labels(runner_token): "s3-flow-storage", "gcs-flow-storage", "github-flow-storage", + "webhook-flow-storage", } assert len(agent.labels) == len(set(agent.labels)) From 0add059a315f0e90083da1f65555b7c533defc92 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 31 Jul 2020 23:49:44 -0500 Subject: [PATCH 11/14] add _request to keyword args --- .../execution/storage_options.md | 8 +- src/prefect/environments/storage/webhook.py | 72 ++++----- src/prefect/serialization/storage.py | 8 +- tests/agent/test_local_agent.py | 8 +- .../storage/test_webhook_storage.py | 142 +++++++++--------- tests/serialization/test_storage.py | 24 +-- 6 files changed, 131 insertions(+), 131 deletions(-) diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index 32db0f40e8bf..daa4ebe52d0f 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -153,7 +153,7 @@ from prefect.environments.storage import Webhook flow = Flow( "dropbox-flow", storage=Webhook( - build_kwargs={ + build_request_kwargs={ "url": "https://content.dropboxapi.com/2/files/upload", "headers": { "Content-Type": "application/octet-stream", @@ -167,8 +167,8 @@ flow = Flow( ), }, }, - build_http_method="POST", - get_flow_kwargs={ + build_request_http_method="POST", + get_flow_request_kwargs={ "url": "https://content.dropboxapi.com/2/files/download", "headers": { "Accept": "application/octet-stream", @@ -177,7 +177,7 @@ flow = Flow( ), }, }, - get_flow_http_method="POST", + get_flow_request_http_method="POST", build_secret_config={ "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} }, diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 71b4a2e501e7..5590e15f8a13 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -31,11 +31,11 @@ class Webhook(Storage): labeled with `webhook-flow-storage`. Args: - - build_kwargs (dict): Dictionary of keyword arguments to the + - build_request_kwargs (dict): Dictionary of keyword arguments to the function from `requests` used to store the flow. Do not supply `"data"` to this argument, as it will be overwritten with the flow's content when `.build()` is run. - - build_http_method (str): HTTP method identifying the type of request + - build_request_http_method (str): HTTP method identifying the type of request to execute when storing the flow. For example, `"POST"` for `requests.post()`. - build_secret_config (dict): A dictionary describing how to set @@ -44,9 +44,9 @@ class Webhook(Storage): applies to the request issued by `.build()`, and will also be used for `.get_flow()` unless you explicitly set `get_flow_secret_config`. - - get_flow_kwargs (dict): Dictionary of keyword arguments to the + - get_flow_request_kwargs (dict): Dictionary of keyword arguments to the function from `requests` used to retrieve the flow. - - get_flow_http_method (str): HTTP method identifying the type of + - get_flow_request_http_method (str): HTTP method identifying the type of request to execute when storing the flow. For example, `"GET"` for `requests.post()`. - get_flow_secret_config (dict): Similar to `build_secret_config`, but @@ -85,13 +85,13 @@ class Webhook(Storage): ```python storage = Webhook( - build_kwargs={ + build_request_kwargs={ "url": "some-service", "headers" = { "Content-Type" = "application/octet-stream" } }, - build_http_method="POST", + build_request_http_method="POST", ... ... build_secret_config={ @@ -106,10 +106,10 @@ class Webhook(Storage): def __init__( self, - build_kwargs: Dict[str, Any], - build_http_method: str, - get_flow_kwargs: Dict[str, Any], - get_flow_http_method: str, + build_request_kwargs: Dict[str, Any], + build_request_http_method: str, + get_flow_request_kwargs: Dict[str, Any], + get_flow_request_http_method: str, build_secret_config: Optional[Dict[str, Any]] = None, get_flow_secret_config: Optional[Dict[str, Any]] = None, stored_as_script: bool = False, @@ -132,25 +132,25 @@ def __init__( "PUT": self._session.put, } - if build_http_method not in self._method_to_function.keys(): - msg = "HTTP method '{}' not recognized".format(build_http_method) + if build_request_http_method not in self._method_to_function.keys(): + msg = "HTTP method '{}' not recognized".format(build_request_http_method) self.logger.fatal(msg) raise RuntimeError(msg) - if get_flow_http_method not in self._method_to_function.keys(): - msg = "HTTP method '{}' not recognized".format(get_flow_http_method) + if get_flow_request_http_method not in self._method_to_function.keys(): + msg = "HTTP method '{}' not recognized".format(get_flow_request_http_method) self.logger.fatal(msg) raise RuntimeError(msg) self.stored_as_script = stored_as_script self.flow_script_path = flow_script_path - self.build_kwargs = build_kwargs - self.build_http_method = build_http_method + self.build_request_kwargs = build_request_kwargs + self.build_request_http_method = build_request_http_method self.build_secret_config = build_secret_config or {} - self.get_flow_kwargs = get_flow_kwargs - self.get_flow_http_method = get_flow_http_method + self.get_flow_request_kwargs = get_flow_request_kwargs + self.get_flow_request_http_method = get_flow_request_http_method self.get_flow_secret_config = get_flow_secret_config or self.build_secret_config self._build_responses: Optional[Dict[str, Response]] = None @@ -178,15 +178,15 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": - requests.exceptions.HTTPError if getting the flow fails """ self.logger.info("Retrieving flow") - req_function = self._method_to_function[self.get_flow_http_method] + req_function = self._method_to_function[self.get_flow_request_http_method] - get_flow_kwargs = deepcopy(self.get_flow_kwargs) - get_flow_kwargs["headers"] = self._render_headers( - headers=get_flow_kwargs.get("headers", {}), + get_flow_request_kwargs = deepcopy(self.get_flow_request_kwargs) + get_flow_request_kwargs["headers"] = self._render_headers( + headers=get_flow_request_kwargs.get("headers", {}), secret_config=self.get_flow_secret_config, ) - response = req_function(**get_flow_kwargs) # type: ignore + response = req_function(**get_flow_request_kwargs) # type: ignore response.raise_for_status() if self.stored_as_script: @@ -246,16 +246,16 @@ def random_number(): flow.storage = Webhook( - build_kwargs={ + build_request_kwargs={ "url": "some-service/upload", "headers": {"Content-Type": "application/octet-stream"}, }, - build_http_method="POST", - get_flow_kwargs={ + build_request_http_method="POST", + get_flow_request_kwargs={ "url": "some-service/download", "headers": {"Accept": "application/octet-stream"}, }, - get_flow_http_method="GET", + get_flow_request_http_method="GET", ) flow.storage.add_flow(flow) @@ -265,7 +265,7 @@ def random_number(): flow_id = res._build_responses[flow.name].json()["id"] # update storage - flow.storage.get_flow_kwargs["url"] = f"{GET_ROUTE}/{flow_id}" + flow.storage.get_flow_request_kwargs["url"] = f"{GET_ROUTE}/{flow_id}" ``` Returns: @@ -300,24 +300,24 @@ def random_number(): with open(self.flow_script_path, "r") as f: data = f.read().encode("utf-8") - req_function = self._method_to_function[self.build_http_method] + req_function = self._method_to_function[self.build_request_http_method] - build_kwargs = deepcopy(self.build_kwargs) - build_kwargs["headers"] = self._render_headers( - headers=build_kwargs.get("headers", {}), + build_request_kwargs = deepcopy(self.build_request_kwargs) + build_request_kwargs["headers"] = self._render_headers( + headers=build_request_kwargs.get("headers", {}), secret_config=self.build_secret_config, ) - if "data" in build_kwargs.keys(): + if "data" in build_request_kwargs.keys(): msg = ( - "'data' found in build_kwargs. This value is overwritten " + "'data' found in build_request_kwargs. This value is overwritten " "with the flow content and should not be set directly" ) self.logger.warning(msg) warnings.warn(msg, RuntimeWarning) - build_kwargs["data"] = data + build_request_kwargs["data"] = data - response = req_function(**build_kwargs) # type: ignore + response = req_function(**build_request_kwargs) # type: ignore response.raise_for_status() self._build_responses[flow_name] = response diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 55a0714538cc..1c1d812bf376 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -139,10 +139,10 @@ class WebhookSchema(ObjectSchema): class Meta: object_class = Webhook - build_kwargs = fields.Dict(key=fields.Str, allow_none=False) - build_http_method = fields.String(allow_none=False) - get_flow_kwargs = fields.Dict(key=fields.Str, allow_none=False) - get_flow_http_method = fields.String(allow_none=False) + build_request_kwargs = fields.Dict(key=fields.Str, allow_none=False) + build_request_http_method = fields.String(allow_none=False) + get_flow_request_kwargs = fields.Dict(key=fields.Str, allow_none=False) + get_flow_request_http_method = fields.String(allow_none=False) build_secret_config = fields.Dict(key=fields.Str, allow_none=False) get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False) stored_as_script = fields.Bool(allow_none=True) diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index 804a4d42c2d7..90192f36f536 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -304,10 +304,10 @@ def test_local_agent_deploy_processes_webhook_storage(monkeypatch, runner_token) agent = LocalAgent() webhook = Webhook( - build_kwargs={"url": "test-service/upload"}, - build_http_method="POST", - get_flow_kwargs={"url": "test-service/download"}, - get_flow_http_method="GET", + build_request_kwargs={"url": "test-service/upload"}, + build_request_http_method="POST", + get_flow_request_kwargs={"url": "test-service/download"}, + get_flow_request_http_method="GET", ) agent.deploy_flow( flow_run=GraphQLResult( diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index df480671df5c..051444fc35aa 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -49,21 +49,21 @@ def raise_for_status(self, *args, **kwargs) -> None: def test_create_webhook_storage(): - build_kwargs = {"url": "https://content.dropboxapi.com/2/files/upload"} - get_flow_kwargs = {"url": "https://content.dropboxapi.com/2/files/download"} + build_request_kwargs = {"url": "https://content.dropboxapi.com/2/files/upload"} + get_flow_request_kwargs = {"url": "https://content.dropboxapi.com/2/files/download"} storage = Webhook( - build_kwargs=build_kwargs, - build_http_method="PATCH", - get_flow_kwargs=get_flow_kwargs, - get_flow_http_method="GET", + build_request_kwargs=build_request_kwargs, + build_request_http_method="PATCH", + get_flow_request_kwargs=get_flow_request_kwargs, + get_flow_request_http_method="GET", ) assert storage assert storage.logger - assert storage.build_kwargs == build_kwargs - assert storage.build_http_method == "PATCH" + assert storage.build_request_kwargs == build_request_kwargs + assert storage.build_request_http_method == "PATCH" assert storage.build_secret_config == {} - assert storage.get_flow_kwargs == get_flow_kwargs - assert storage.get_flow_http_method == "GET" + assert storage.get_flow_request_kwargs == get_flow_request_kwargs + assert storage.get_flow_request_http_method == "GET" assert storage.get_flow_secret_config == {} assert storage.secrets == [] assert storage.default_labels == ["webhook-flow-storage"] @@ -75,41 +75,41 @@ def test_all_valid_http_verb_combinations_work(): for build_verb in possible_verbs: for get_verb in possible_verbs: storage = Webhook( - build_kwargs={"url": "whatever"}, - build_http_method=build_verb, - get_flow_kwargs={"url": "whatever"}, - get_flow_http_method=get_verb, + build_request_kwargs={"url": "whatever"}, + build_request_http_method=build_verb, + get_flow_request_kwargs={"url": "whatever"}, + get_flow_request_http_method=get_verb, ) - assert storage.build_http_method == build_verb - assert storage.get_flow_http_method == get_verb + assert storage.build_request_http_method == build_verb + assert storage.get_flow_request_http_method == get_verb -def test_webhook_fails_for_bad_build_http_method(): +def test_webhook_fails_for_bad_build_request_http_method(): with pytest.raises(RuntimeError, match="HTTP method 'PASTA' not recognized"): Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="PASTA", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="POST", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="PASTA", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="POST", ) -def test_webhook_fails_for_bad_get_flow_http_method(): +def test_webhook_fails_for_bad_get_flow_request_http_method(): with pytest.raises(RuntimeError, match="HTTP method 'BET' not recognized"): Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="BET", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="BET", ) def test_add_flow_and_contains_work_as_expected(sample_flow): webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) assert sample_flow.name not in webhook out = webhook.add_flow(sample_flow) @@ -123,10 +123,10 @@ def test_add_flow_and_contains_work_as_expected(sample_flow): def test_webhook_build_works_with_no_arguments(sample_flow): webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) def _mock_successful_get(*args, **kwargs): @@ -150,15 +150,15 @@ def _mock_successful_post(*args, **kwargs): assert isinstance(res, Flow) -def test_webhook_raises_warning_if_data_in_build_kwargs(sample_flow): +def test_webhook_raises_warning_if_data_in_build_request_kwargs(sample_flow): webhook = Webhook( - build_kwargs={ + build_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "data": cloudpickle.dumps(sample_flow), }, - build_http_method="POST", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + build_request_http_method="POST", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) def _mock_successful_get(*args, **kwargs): @@ -184,10 +184,10 @@ def _mock_successful_post(*args, **kwargs): def test_webhook_raises_error_on_build_failure(sample_flow): webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) def _mock_failed_post(*args, **kwargs): @@ -204,10 +204,10 @@ def _mock_failed_post(*args, **kwargs): def test_webhook_raises_error_on_get_flow_failure(sample_flow): webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) def _mock_failed_get(*args, **kwargs): @@ -232,14 +232,14 @@ def test_render_headers_gets_env_variables(monkeypatch): another_secret = str(uuid.uuid4()) monkeypatch.setenv("SOME_CRED", some_cred) webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", build_secret_config={ "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, }, - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) # set a local secret @@ -262,13 +262,13 @@ def test_render_headers_gets_env_variables(monkeypatch): def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch): monkeypatch.delenv("SOME_CRED", raising=False) webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", build_secret_config={ "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, }, - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) with pytest.raises(KeyError, match="SOME_CRED"): @@ -281,13 +281,13 @@ def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch): monkeypatch.delenv("ANOTHER_SECRET", raising=False) webhook = Webhook( - build_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_http_method="POST", + build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + build_request_http_method="POST", build_secret_config={ "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, }, - get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - get_flow_http_method="GET", + get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, + get_flow_request_http_method="GET", ) with pytest.raises(ValueError, match='Local Secret "ANOTHER_SECRET" was not found'): @@ -305,16 +305,16 @@ def test_webhook_works_with_file_storage(sample_flow, tmpdir): f.write(script_contents) webhook = Webhook( - build_kwargs={ + build_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Content-Type": "application/octet-stream"}, }, - build_http_method="POST", - get_flow_kwargs={ + build_request_http_method="POST", + get_flow_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Accept": "application/octet-stream"}, }, - get_flow_http_method="GET", + get_flow_request_http_method="GET", stored_as_script=True, flow_script_path=script_file, ) @@ -345,16 +345,16 @@ def _mock_successful_post(*args, **kwargs): def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flow): webhook = Webhook( - build_kwargs={ + build_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Content-Type": "application/octet-stream"}, }, - build_http_method="POST", - get_flow_kwargs={ + build_request_http_method="POST", + get_flow_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Accept": "application/octet-stream"}, }, - get_flow_http_method="GET", + get_flow_request_http_method="GET", stored_as_script=True, ) @@ -372,16 +372,16 @@ def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist( nonexistent_file = "{}.py".format(str(uuid.uuid4())) webhook = Webhook( - build_kwargs={ + build_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Content-Type": "application/octet-stream"}, }, - build_http_method="POST", - get_flow_kwargs={ + build_request_http_method="POST", + get_flow_request_kwargs={ "url": "https://content.dropboxapi.com/2/files", "headers": {"Accept": "application/octet-stream"}, }, - get_flow_http_method="GET", + get_flow_request_http_method="GET", stored_as_script=True, flow_script_path=nonexistent_file, ) diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index 6fc3d1c43a19..9d98a7cbd7b6 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -310,22 +310,22 @@ def test_webhook_full_serialize(): get_url = f"{base_url}/download" webhook = storage.Webhook( - build_kwargs={ + build_request_kwargs={ "url": build_url, "headers": { "Content-Type": content_type, "Dropbox-API-Arg": json.dumps({"path": test_file}), }, }, - build_http_method="POST", - get_flow_kwargs={ + build_request_http_method="POST", + get_flow_request_kwargs={ "url": get_url, "headers": { "Accept": content_type, "Dropbox-API-Arg": json.dumps({"path": test_file}), }, }, - get_flow_http_method="POST", + get_flow_request_http_method="POST", build_secret_config={ "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} }, @@ -339,22 +339,22 @@ def test_webhook_full_serialize(): assert serialized assert serialized["__version__"] == prefect.__version__ assert serialized["secrets"] == ["CREDS"] - assert serialized["build_kwargs"] == { + assert serialized["build_request_kwargs"] == { "url": build_url, "headers": { "Content-Type": content_type, "Dropbox-API-Arg": json.dumps({"path": test_file}), }, } - assert serialized["build_http_method"] == "POST" - assert serialized["get_flow_kwargs"] == { + assert serialized["build_request_http_method"] == "POST" + assert serialized["get_flow_request_kwargs"] == { "url": get_url, "headers": { "Accept": content_type, "Dropbox-API-Arg": json.dumps({"path": test_file}), }, } - assert serialized["get_flow_http_method"] == "POST" + assert serialized["get_flow_request_http_method"] == "POST" assert serialized["build_secret_config"] == { "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} } @@ -368,10 +368,10 @@ def test_webhook_different_secret_configs(): } get_flow_config = {"Authorization": {"value": "READ_ONLY_TOKEN", "type": "secret"}} webhook = storage.Webhook( - build_kwargs={}, - build_http_method="POST", - get_flow_kwargs={}, - get_flow_http_method="POST", + build_request_kwargs={}, + build_request_http_method="POST", + get_flow_request_kwargs={}, + get_flow_request_http_method="POST", build_secret_config=build_config, get_flow_secret_config=get_flow_config, ) From 937dd84700ba5799319da2637a593d4a61cbbf64 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sat, 1 Aug 2020 19:59:45 -0500 Subject: [PATCH 12/14] replace secret_config with templating --- .../execution/storage_options.md | 7 +- src/prefect/environments/storage/webhook.py | 206 +++++++++++------- src/prefect/serialization/storage.py | 2 - .../storage/test_webhook_storage.py | 97 ++++++--- tests/serialization/test_storage.py | 29 --- 5 files changed, 189 insertions(+), 152 deletions(-) diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index daa4ebe52d0f..e8e27b9dc1fc 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -165,6 +165,7 @@ flow = Flow( "strict_conflict": True, } ), + "Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}" }, }, build_request_http_method="POST", @@ -175,19 +176,17 @@ flow = Flow( "Dropbox-API-Arg": json.dumps( {"path": "/Apps/prefect-test-app/dropbox-flow.flow"} ), + "Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}" }, }, get_flow_request_http_method="POST", - build_secret_config={ - "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} - }, ) ) flow.storage.build() ``` -The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html). +Template strings in `${}` are used to reference sensitive information. Given `${SOME_TOKEN}`, this storage object will first look in environment variable `SOME_TOKEN` and then fall back to [Prefect secrets](/core/concepts/secrets.html) `SOME_TOKEN`. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. ::: tip Sensible Defaults Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `Webhook` storage. diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 5590e15f8a13..1c650e4ddcdf 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -1,9 +1,11 @@ import cloudpickle import os +import string import warnings +from collections.abc import Mapping from copy import deepcopy -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional from requests import Session from requests.adapters import HTTPAdapter @@ -18,14 +20,88 @@ from prefect.core.flow import Flow +class _SecretMapping(Mapping): + """ + Magic mapping. When `__getitem__()` is called + to look for the value of a `key`, this mapping will + try to resolve it from the environment and, if not found + there, from `prefect` secrets. + + Raises: + - RuntimeError if the key you're looking for isn't found + """ + + def __getitem__(self, key: str) -> str: + out = os.getenv(key, None) + if out is None: + try: + out = Secret(key).get() + except ValueError: + msg = ( + "Template value '{}' does not refer to an " + "environment variable or Prefect secret." + ) + raise RuntimeError(msg.format(key)) + return out + + def __iter__(self) -> Iterable: + return iter([]) + + def __len__(self) -> int: + return 0 + + +_mapping = _SecretMapping() + + +def _render_dict(input_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Replace string elements in a dictionary with environment variables + or Prefect secrets. + + This method will look through all string values (not keys) of a dictionary, + recursively, for template string. When it encounters a string like + `${abc}`, it will first try to replace it with the value of environment + variable `abc`. If environment variable `abc` is not defined, it will try + to replace it with `prefect` secret `abc`. If that is also not found, + raises a `KeyError`. + + Args: + - input_dict (Dict[str, Any]): A dictionary that may contain + template strings. + + Returns: + - A dictionary with template strings replaced by the literal values of + environment variables or Prefect secrets + + Raises: + - RuntimeError if any template value cannot be found in the environment + or `prefect` secrets + """ + input_dict = deepcopy(input_dict) + output_dict = {} + + for key, value in input_dict.items(): + if isinstance(value, str): + new_value = string.Template(value).substitute(_mapping) + output_dict[key] = new_value + elif isinstance(value, dict): + output_dict[key] = _render_dict(value) + else: + output_dict[key] = value + + return output_dict + + class Webhook(Storage): """ Webhook storage class. This class represents the Storage interface for Flows stored and retrieved with HTTP requests. This storage class takes in keyword arguments which describe how to - create the requests, including information on how to fill in headers - from environment variable or Prefect Cloud secrets. + create the requests. These arguments' values can contain template + strings which will be filled in dynamically from environment variables + or Prefect secrets. **Note**: Flows registered with this Storage option will automatically be labeled with `webhook-flow-storage`. @@ -35,26 +111,14 @@ class Webhook(Storage): function from `requests` used to store the flow. Do not supply `"data"` to this argument, as it will be overwritten with the flow's content when `.build()` is run. - - build_request_http_method (str): HTTP method identifying the type of request - to execute when storing the flow. For example, `"POST"` for + - build_request_http_method (str): HTTP method identifying the type of + request to execute when storing the flow. For example, `"POST"` for `requests.post()`. - - build_secret_config (dict): A dictionary describing how to set - request headers from environment variables or Prefect Cloud - secrets. See example for details on specifying this. This config - applies to the request issued by `.build()`, and will also be - used for `.get_flow()` unless you explicitly set - `get_flow_secret_config`. - - get_flow_request_kwargs (dict): Dictionary of keyword arguments to the - function from `requests` used to retrieve the flow. - - get_flow_request_http_method (str): HTTP method identifying the type of - request to execute when storing the flow. For example, `"GET"` + - get_flow_request_kwargs (dict): Dictionary of keyword arguments to + the function from `requests` used to retrieve the flow. + - get_flow_request_http_method (str): HTTP method identifying the type + of request to execute when storing the flow. For example, `"GET"` for `requests.post()`. - - get_flow_secret_config (dict): Similar to `build_secret_config`, but - used for the request in `.get_flow()`. By default, the config - passed to `build_secret_config` will be used for `.get_flow()` - as well. Pass a value to this argument only if you want to use a - different config for `.get_flow()` than the one used for - `.build()`. - stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a `.py` file. Defaults to `False`. - flow_script_path (str, optional): path to a local `.py` file that @@ -68,17 +132,24 @@ class Webhook(Storage): - **kwargs (Any, optional): any additional `Storage` initialization options - Passing sensitive data in headers - --------------------------------- + Including Sensitive Data + ------------------------ + + It is common for requests used with this storage to need access to + sensitive information. - For services which require authentication, use `secret_config` to pass - sensitive data like API keys without storing their values in this Storage - object. + For example: - This should be a dictionary whose keys are headers, and whose - values indicate whether to retrieve real values from environment - variables (`"type": "environment"`) or Prefect Cloud secrets - (`"type": "secret"`). + - auth tokens passed in headers like `X-Api-Key` or `Authorization` + - auth information passed in to URL as query parameters + + `Webhook` storage supports the inclusion of such sensitive information + with templating. Any of the string values passed to + `build_flow_request_kwargs` or `get_flow_request_kwargs` can include + template strings like `${SOME_VARIABLE}`. When `.build()` or `.get_flow()` + is run, such values will be replaced with the value of environment + variables or, when no matching environment variable is found, Prefect + Secrets. So, for example, to get an API key from an environment variable you can do the following @@ -86,20 +157,29 @@ class Webhook(Storage): ```python storage = Webhook( build_request_kwargs={ - "url": "some-service", + "url": "some-service/upload", "headers" = { - "Content-Type" = "application/octet-stream" + "Content-Type" = "application/octet-stream", + "X-Api-Key": "${MY_COOL_ENV_VARIABLE}" } }, build_request_http_method="POST", - ... - ... - build_secret_config={ - "X-Api-Key": { - "name": "MY_COOL_ENV_VARIABLE", - "type": "environment" + ) + ``` + + You can also take advantage of this templating when only part + of a string needs to be replaced. + + ```python + storage = Webhook( + get_flow_request_kwargs={ + "url": "some-service/download", + "headers" = { + "Accept" = "application/octet-stream", + "Authorization": "Bearer ${MY_COOL_ENV_VARIABLE}" } - } + }, + build_request_http_method="POST", ) ``` """ @@ -110,8 +190,6 @@ def __init__( build_request_http_method: str, get_flow_request_kwargs: Dict[str, Any], get_flow_request_http_method: str, - build_secret_config: Optional[Dict[str, Any]] = None, - get_flow_secret_config: Optional[Dict[str, Any]] = None, stored_as_script: bool = False, flow_script_path: Optional[str] = None, **kwargs: Any, @@ -147,11 +225,9 @@ def __init__( self.build_request_kwargs = build_request_kwargs self.build_request_http_method = build_request_http_method - self.build_secret_config = build_secret_config or {} self.get_flow_request_kwargs = get_flow_request_kwargs self.get_flow_request_http_method = get_flow_request_http_method - self.get_flow_secret_config = get_flow_secret_config or self.build_secret_config self._build_responses: Optional[Dict[str, Response]] = None @@ -180,11 +256,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": self.logger.info("Retrieving flow") req_function = self._method_to_function[self.get_flow_request_http_method] - get_flow_request_kwargs = deepcopy(self.get_flow_request_kwargs) - get_flow_request_kwargs["headers"] = self._render_headers( - headers=get_flow_request_kwargs.get("headers", {}), - secret_config=self.get_flow_secret_config, - ) + get_flow_request_kwargs = _render_dict(self.get_flow_request_kwargs) response = req_function(**get_flow_request_kwargs) # type: ignore response.raise_for_status() @@ -284,7 +356,7 @@ def random_number(): if self.stored_as_script: # these checks are here in build() instead of the constructor - # so that serialization and deserialization of flows does not fail + # so that serialization and deserialization of flows doesnot fail if not self.flow_script_path: msg = "flow_script_path must be provided if stored_as_script=True" self.logger.fatal(msg) @@ -302,16 +374,13 @@ def random_number(): req_function = self._method_to_function[self.build_request_http_method] - build_request_kwargs = deepcopy(self.build_request_kwargs) - build_request_kwargs["headers"] = self._render_headers( - headers=build_request_kwargs.get("headers", {}), - secret_config=self.build_secret_config, - ) + build_request_kwargs = _render_dict(self.build_request_kwargs) if "data" in build_request_kwargs.keys(): msg = ( - "'data' found in build_request_kwargs. This value is overwritten " - "with the flow content and should not be set directly" + "'data' found in build_request_kwargs. This value is " + "overwritten with the flow content and should not " + "be set directly" ) self.logger.warning(msg) warnings.warn(msg, RuntimeWarning) @@ -333,28 +402,3 @@ def __contains__(self, obj: Any) -> bool: if not isinstance(obj, str): return False return obj in self.flows - - @staticmethod - def _render_headers( - headers: Dict[str, Any], secret_config: Dict[str, Any] - ) -> Dict[str, Any]: - """ - Given a dictionary of headers, add additional headers with values - resolved from environment variables or Prefect Cloud secrets. - - Args: - - headers (dict): A dictionary of headers. - - secret_config (dict): A secret config. See `help(Webhook)` for - details on how this should be constructed. - Raises: - - KeyError if referencing an environment variable that does not exist - - ValueError if referencing a Secret that does not exist - """ - out_headers = deepcopy(headers) - for header, details in secret_config.items(): - name = details["name"] - if details["type"] == "environment": - out_headers[header] = os.environ[name] - elif details["type"] == "secret": - out_headers[header] = Secret(name).get() - return out_headers diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 1c1d812bf376..9990a6b5975a 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -143,8 +143,6 @@ class Meta: build_request_http_method = fields.String(allow_none=False) get_flow_request_kwargs = fields.Dict(key=fields.Str, allow_none=False) get_flow_request_http_method = fields.String(allow_none=False) - build_secret_config = fields.Dict(key=fields.Str, allow_none=False) - get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False) stored_as_script = fields.Bool(allow_none=True) flows = fields.Dict(key=fields.Str(), values=fields.Str()) secrets = fields.List(fields.Str(), allow_none=True) diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index 051444fc35aa..b44b1cb90a46 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -10,6 +10,7 @@ from prefect import context from prefect import task, Flow from prefect.environments.storage import Webhook +from prefect.environments.storage.webhook import _render_dict @pytest.fixture @@ -61,10 +62,8 @@ def test_create_webhook_storage(): assert storage.logger assert storage.build_request_kwargs == build_request_kwargs assert storage.build_request_http_method == "PATCH" - assert storage.build_secret_config == {} assert storage.get_flow_request_kwargs == get_flow_request_kwargs assert storage.get_flow_request_http_method == "GET" - assert storage.get_flow_secret_config == {} assert storage.secrets == [] assert storage.default_labels == ["webhook-flow-storage"] assert storage.stored_as_script is False @@ -227,17 +226,19 @@ def _mock_successful_post(*args, **kwargs): webhook.get_flow() -def test_render_headers_gets_env_variables(monkeypatch): +def test_render_dict_gets_env_variables(monkeypatch): some_cred = str(uuid.uuid4()) another_secret = str(uuid.uuid4()) monkeypatch.setenv("SOME_CRED", some_cred) webhook = Webhook( - build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_request_http_method="POST", - build_secret_config={ - "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, - "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, + build_request_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": { + "X-Api-Key": "${SOME_CRED}", + "X-Custom-Key": "${ANOTHER_SECRET}", + }, }, + build_request_http_method="POST", get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, get_flow_request_http_method="GET", ) @@ -246,55 +247,79 @@ def test_render_headers_gets_env_variables(monkeypatch): context.setdefault("secrets", {}) context.secrets["ANOTHER_SECRET"] = another_secret - initial_headers = {"X-Api-Key": "abc"} - new_headers = webhook._render_headers( - headers=initial_headers, secret_config=webhook.build_secret_config - ) + new_kwargs = _render_dict(webhook.build_request_kwargs) - # _render_headers should not have side effects - assert initial_headers == {"X-Api-Key": "abc"} + # _render_dict should not have side effects + assert webhook.build_request_kwargs == { + "url": "https://content.dropboxapi.com/2/files", + "headers": {"X-Api-Key": "${SOME_CRED}", "X-Custom-Key": "${ANOTHER_SECRET}"}, + } # env variables and secrets should have been filled in - assert new_headers["X-Api-Key"] == some_cred - assert new_headers["X-Custom-Key"] == another_secret + assert new_kwargs["headers"]["X-Api-Key"] == some_cred + assert new_kwargs["headers"]["X-Custom-Key"] == another_secret -def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch): +def test_render_dict_raises_expected_exception_on_missing_env_var(monkeypatch): monkeypatch.delenv("SOME_CRED", raising=False) webhook = Webhook( - build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_request_http_method="POST", - build_secret_config={ - "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, + build_request_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"X-Api-Key": "Token ${SOME_CRED}"}, }, + build_request_http_method="POST", get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, get_flow_request_http_method="GET", ) - with pytest.raises(KeyError, match="SOME_CRED"): - initial_headers = {"X-Api-Key": "abc"} - webhook._render_headers( - headers=initial_headers, secret_config=webhook.build_secret_config - ) + with pytest.raises(RuntimeError, match="SOME_CRED"): + _render_dict(webhook.build_request_kwargs) -def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch): +def test_render_dict_raises_expected_exception_on_missing_secret(monkeypatch): monkeypatch.delenv("ANOTHER_SECRET", raising=False) webhook = Webhook( - build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_request_http_method="POST", - build_secret_config={ - "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, + build_request_kwargs={ + "url": "https://content.dropboxapi.com/2/files?key=${ANOTHER_SECRET}" }, + build_request_http_method="POST", get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, get_flow_request_http_method="GET", ) - with pytest.raises(ValueError, match='Local Secret "ANOTHER_SECRET" was not found'): - initial_headers = {"X-Api-Key": "abc"} - webhook._render_headers( - headers=initial_headers, secret_config=webhook.build_secret_config - ) + with pytest.raises( + RuntimeError, match="does not refer to an environment variable or" + ): + _render_dict(webhook.build_request_kwargs) + + +def test_templating_works_with_env_variable_top_level(monkeypatch): + monkeypatch.setenv("JAVA_HOME", "abc") + x = {"a": "coffee_place: ${JAVA_HOME}"} + out = _render_dict(x) + assert out == {"a": "coffee_place: abc"} + assert x == {"a": "coffee_place: ${JAVA_HOME}"} + + +def test_templating_works_with_env_variable_recursively(monkeypatch): + monkeypatch.setenv("USER", "leia") + x = {"a": {"b": {"c": "Bearer: ${USER}"}}} + out = _render_dict(x) + assert out == {"a": {"b": {"c": "Bearer: leia"}}} + assert x == {"a": {"b": {"c": "Bearer: ${USER}"}}} + + +def templating_works_if_nothing_to_template(): + x = {"thing": 4, "stuff": [5, 6, 7], "big": {"if": True}} + out = _render_dict(x) + assert out == x + assert x == {"thing": 4, "stuff": [5, 6, 7], "big": {"if": True}} + + +def templating_works_with_embedded_json_strings(): + x = {"headers": {"dropbox-args": '{"USER"}'}} + out = _render_dict(x) + assert out == x def test_webhook_works_with_file_storage(sample_flow, tmpdir): diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index 9d98a7cbd7b6..c472238f4792 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -326,9 +326,6 @@ def test_webhook_full_serialize(): }, }, get_flow_request_http_method="POST", - build_secret_config={ - "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} - }, secrets=["CREDS"], ) f = prefect.Flow("test") @@ -355,30 +352,4 @@ def test_webhook_full_serialize(): }, } assert serialized["get_flow_request_http_method"] == "POST" - assert serialized["build_secret_config"] == { - "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} - } - assert serialized["build_secret_config"] == serialized["get_flow_secret_config"] assert serialized["stored_as_script"] is False - - -def test_webhook_different_secret_configs(): - build_config = { - "Authorization": {"value": "WRITE_ONLY_TOKEN", "type": "environment"} - } - get_flow_config = {"Authorization": {"value": "READ_ONLY_TOKEN", "type": "secret"}} - webhook = storage.Webhook( - build_request_kwargs={}, - build_request_http_method="POST", - get_flow_request_kwargs={}, - get_flow_request_http_method="POST", - build_secret_config=build_config, - get_flow_secret_config=get_flow_config, - ) - f = prefect.Flow("test") - webhook.flows["test"] = "key" - - serialized = WebhookSchema().dump(webhook) - assert serialized["build_secret_config"] == build_config - assert serialized["get_flow_secret_config"] == get_flow_config - assert serialized["build_secret_config"] != serialized["get_flow_secret_config"] From a2d40a10b622247d7203f7086f1e634fb1951e01 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Sat, 1 Aug 2020 21:40:09 -0500 Subject: [PATCH 13/14] linting --- src/prefect/environments/storage/webhook.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 1c650e4ddcdf..f4e06e3f269d 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -5,7 +5,7 @@ from collections.abc import Mapping from copy import deepcopy -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional from requests import Session from requests.adapters import HTTPAdapter @@ -32,7 +32,7 @@ class _SecretMapping(Mapping): """ def __getitem__(self, key: str) -> str: - out = os.getenv(key, None) + out = os.getenv(key, None) # type: ignore if out is None: try: out = Secret(key).get() @@ -42,9 +42,9 @@ def __getitem__(self, key: str) -> str: "environment variable or Prefect secret." ) raise RuntimeError(msg.format(key)) - return out + return out # type: ignore - def __iter__(self) -> Iterable: + def __iter__(self) -> Iterator[Any]: return iter([]) def __len__(self) -> int: @@ -86,7 +86,7 @@ def _render_dict(input_dict: Dict[str, Any]) -> Dict[str, Any]: new_value = string.Template(value).substitute(_mapping) output_dict[key] = new_value elif isinstance(value, dict): - output_dict[key] = _render_dict(value) + output_dict[key] = _render_dict(value) # type: ignore else: output_dict[key] = value From 921e387d26e624b248de962e3fda9b5a54f1db91 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Tue, 4 Aug 2020 11:58:51 -0500 Subject: [PATCH 14/14] remove deepcopy --- src/prefect/environments/storage/webhook.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index f4e06e3f269d..5aff2eb2ea76 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -4,7 +4,6 @@ import warnings from collections.abc import Mapping -from copy import deepcopy from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional from requests import Session @@ -78,7 +77,6 @@ def _render_dict(input_dict: Dict[str, Any]) -> Dict[str, Any]: - RuntimeError if any template value cannot be found in the environment or `prefect` secrets """ - input_dict = deepcopy(input_dict) output_dict = {} for key, value in input_dict.items():