diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index daa4ebe52d0f..e8e27b9dc1fc 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -165,6 +165,7 @@ flow = Flow( "strict_conflict": True, } ), + "Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}" }, }, build_request_http_method="POST", @@ -175,19 +176,17 @@ flow = Flow( "Dropbox-API-Arg": json.dumps( {"path": "/Apps/prefect-test-app/dropbox-flow.flow"} ), + "Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}" }, }, get_flow_request_http_method="POST", - build_secret_config={ - "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} - }, ) ) flow.storage.build() ``` -The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html). +Template strings in `${}` are used to reference sensitive information. Given `${SOME_TOKEN}`, this storage object will first look in environment variable `SOME_TOKEN` and then fall back to [Prefect secrets](/core/concepts/secrets.html) `SOME_TOKEN`. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. ::: tip Sensible Defaults Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `Webhook` storage. diff --git a/src/prefect/environments/storage/webhook.py b/src/prefect/environments/storage/webhook.py index 5590e15f8a13..1c650e4ddcdf 100644 --- a/src/prefect/environments/storage/webhook.py +++ b/src/prefect/environments/storage/webhook.py @@ -1,9 +1,11 @@ import cloudpickle import os +import string import warnings +from collections.abc import Mapping from copy import deepcopy -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional from requests import Session from requests.adapters import HTTPAdapter @@ -18,14 +20,88 @@ from prefect.core.flow import Flow +class _SecretMapping(Mapping): + """ + Magic mapping. When `__getitem__()` is called + to look for the value of a `key`, this mapping will + try to resolve it from the environment and, if not found + there, from `prefect` secrets. + + Raises: + - RuntimeError if the key you're looking for isn't found + """ + + def __getitem__(self, key: str) -> str: + out = os.getenv(key, None) + if out is None: + try: + out = Secret(key).get() + except ValueError: + msg = ( + "Template value '{}' does not refer to an " + "environment variable or Prefect secret." + ) + raise RuntimeError(msg.format(key)) + return out + + def __iter__(self) -> Iterable: + return iter([]) + + def __len__(self) -> int: + return 0 + + +_mapping = _SecretMapping() + + +def _render_dict(input_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Replace string elements in a dictionary with environment variables + or Prefect secrets. + + This method will look through all string values (not keys) of a dictionary, + recursively, for template string. When it encounters a string like + `${abc}`, it will first try to replace it with the value of environment + variable `abc`. If environment variable `abc` is not defined, it will try + to replace it with `prefect` secret `abc`. If that is also not found, + raises a `KeyError`. + + Args: + - input_dict (Dict[str, Any]): A dictionary that may contain + template strings. + + Returns: + - A dictionary with template strings replaced by the literal values of + environment variables or Prefect secrets + + Raises: + - RuntimeError if any template value cannot be found in the environment + or `prefect` secrets + """ + input_dict = deepcopy(input_dict) + output_dict = {} + + for key, value in input_dict.items(): + if isinstance(value, str): + new_value = string.Template(value).substitute(_mapping) + output_dict[key] = new_value + elif isinstance(value, dict): + output_dict[key] = _render_dict(value) + else: + output_dict[key] = value + + return output_dict + + class Webhook(Storage): """ Webhook storage class. This class represents the Storage interface for Flows stored and retrieved with HTTP requests. This storage class takes in keyword arguments which describe how to - create the requests, including information on how to fill in headers - from environment variable or Prefect Cloud secrets. + create the requests. These arguments' values can contain template + strings which will be filled in dynamically from environment variables + or Prefect secrets. **Note**: Flows registered with this Storage option will automatically be labeled with `webhook-flow-storage`. @@ -35,26 +111,14 @@ class Webhook(Storage): function from `requests` used to store the flow. Do not supply `"data"` to this argument, as it will be overwritten with the flow's content when `.build()` is run. - - build_request_http_method (str): HTTP method identifying the type of request - to execute when storing the flow. For example, `"POST"` for + - build_request_http_method (str): HTTP method identifying the type of + request to execute when storing the flow. For example, `"POST"` for `requests.post()`. - - build_secret_config (dict): A dictionary describing how to set - request headers from environment variables or Prefect Cloud - secrets. See example for details on specifying this. This config - applies to the request issued by `.build()`, and will also be - used for `.get_flow()` unless you explicitly set - `get_flow_secret_config`. - - get_flow_request_kwargs (dict): Dictionary of keyword arguments to the - function from `requests` used to retrieve the flow. - - get_flow_request_http_method (str): HTTP method identifying the type of - request to execute when storing the flow. For example, `"GET"` + - get_flow_request_kwargs (dict): Dictionary of keyword arguments to + the function from `requests` used to retrieve the flow. + - get_flow_request_http_method (str): HTTP method identifying the type + of request to execute when storing the flow. For example, `"GET"` for `requests.post()`. - - get_flow_secret_config (dict): Similar to `build_secret_config`, but - used for the request in `.get_flow()`. By default, the config - passed to `build_secret_config` will be used for `.get_flow()` - as well. Pass a value to this argument only if you want to use a - different config for `.get_flow()` than the one used for - `.build()`. - stored_as_script (bool, optional): boolean for specifying if the flow has been stored as a `.py` file. Defaults to `False`. - flow_script_path (str, optional): path to a local `.py` file that @@ -68,17 +132,24 @@ class Webhook(Storage): - **kwargs (Any, optional): any additional `Storage` initialization options - Passing sensitive data in headers - --------------------------------- + Including Sensitive Data + ------------------------ + + It is common for requests used with this storage to need access to + sensitive information. - For services which require authentication, use `secret_config` to pass - sensitive data like API keys without storing their values in this Storage - object. + For example: - This should be a dictionary whose keys are headers, and whose - values indicate whether to retrieve real values from environment - variables (`"type": "environment"`) or Prefect Cloud secrets - (`"type": "secret"`). + - auth tokens passed in headers like `X-Api-Key` or `Authorization` + - auth information passed in to URL as query parameters + + `Webhook` storage supports the inclusion of such sensitive information + with templating. Any of the string values passed to + `build_flow_request_kwargs` or `get_flow_request_kwargs` can include + template strings like `${SOME_VARIABLE}`. When `.build()` or `.get_flow()` + is run, such values will be replaced with the value of environment + variables or, when no matching environment variable is found, Prefect + Secrets. So, for example, to get an API key from an environment variable you can do the following @@ -86,20 +157,29 @@ class Webhook(Storage): ```python storage = Webhook( build_request_kwargs={ - "url": "some-service", + "url": "some-service/upload", "headers" = { - "Content-Type" = "application/octet-stream" + "Content-Type" = "application/octet-stream", + "X-Api-Key": "${MY_COOL_ENV_VARIABLE}" } }, build_request_http_method="POST", - ... - ... - build_secret_config={ - "X-Api-Key": { - "name": "MY_COOL_ENV_VARIABLE", - "type": "environment" + ) + ``` + + You can also take advantage of this templating when only part + of a string needs to be replaced. + + ```python + storage = Webhook( + get_flow_request_kwargs={ + "url": "some-service/download", + "headers" = { + "Accept" = "application/octet-stream", + "Authorization": "Bearer ${MY_COOL_ENV_VARIABLE}" } - } + }, + build_request_http_method="POST", ) ``` """ @@ -110,8 +190,6 @@ def __init__( build_request_http_method: str, get_flow_request_kwargs: Dict[str, Any], get_flow_request_http_method: str, - build_secret_config: Optional[Dict[str, Any]] = None, - get_flow_secret_config: Optional[Dict[str, Any]] = None, stored_as_script: bool = False, flow_script_path: Optional[str] = None, **kwargs: Any, @@ -147,11 +225,9 @@ def __init__( self.build_request_kwargs = build_request_kwargs self.build_request_http_method = build_request_http_method - self.build_secret_config = build_secret_config or {} self.get_flow_request_kwargs = get_flow_request_kwargs self.get_flow_request_http_method = get_flow_request_http_method - self.get_flow_secret_config = get_flow_secret_config or self.build_secret_config self._build_responses: Optional[Dict[str, Response]] = None @@ -180,11 +256,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow": self.logger.info("Retrieving flow") req_function = self._method_to_function[self.get_flow_request_http_method] - get_flow_request_kwargs = deepcopy(self.get_flow_request_kwargs) - get_flow_request_kwargs["headers"] = self._render_headers( - headers=get_flow_request_kwargs.get("headers", {}), - secret_config=self.get_flow_secret_config, - ) + get_flow_request_kwargs = _render_dict(self.get_flow_request_kwargs) response = req_function(**get_flow_request_kwargs) # type: ignore response.raise_for_status() @@ -284,7 +356,7 @@ def random_number(): if self.stored_as_script: # these checks are here in build() instead of the constructor - # so that serialization and deserialization of flows does not fail + # so that serialization and deserialization of flows doesnot fail if not self.flow_script_path: msg = "flow_script_path must be provided if stored_as_script=True" self.logger.fatal(msg) @@ -302,16 +374,13 @@ def random_number(): req_function = self._method_to_function[self.build_request_http_method] - build_request_kwargs = deepcopy(self.build_request_kwargs) - build_request_kwargs["headers"] = self._render_headers( - headers=build_request_kwargs.get("headers", {}), - secret_config=self.build_secret_config, - ) + build_request_kwargs = _render_dict(self.build_request_kwargs) if "data" in build_request_kwargs.keys(): msg = ( - "'data' found in build_request_kwargs. This value is overwritten " - "with the flow content and should not be set directly" + "'data' found in build_request_kwargs. This value is " + "overwritten with the flow content and should not " + "be set directly" ) self.logger.warning(msg) warnings.warn(msg, RuntimeWarning) @@ -333,28 +402,3 @@ def __contains__(self, obj: Any) -> bool: if not isinstance(obj, str): return False return obj in self.flows - - @staticmethod - def _render_headers( - headers: Dict[str, Any], secret_config: Dict[str, Any] - ) -> Dict[str, Any]: - """ - Given a dictionary of headers, add additional headers with values - resolved from environment variables or Prefect Cloud secrets. - - Args: - - headers (dict): A dictionary of headers. - - secret_config (dict): A secret config. See `help(Webhook)` for - details on how this should be constructed. - Raises: - - KeyError if referencing an environment variable that does not exist - - ValueError if referencing a Secret that does not exist - """ - out_headers = deepcopy(headers) - for header, details in secret_config.items(): - name = details["name"] - if details["type"] == "environment": - out_headers[header] = os.environ[name] - elif details["type"] == "secret": - out_headers[header] = Secret(name).get() - return out_headers diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 1c1d812bf376..9990a6b5975a 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -143,8 +143,6 @@ class Meta: build_request_http_method = fields.String(allow_none=False) get_flow_request_kwargs = fields.Dict(key=fields.Str, allow_none=False) get_flow_request_http_method = fields.String(allow_none=False) - build_secret_config = fields.Dict(key=fields.Str, allow_none=False) - get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False) stored_as_script = fields.Bool(allow_none=True) flows = fields.Dict(key=fields.Str(), values=fields.Str()) secrets = fields.List(fields.Str(), allow_none=True) diff --git a/tests/environments/storage/test_webhook_storage.py b/tests/environments/storage/test_webhook_storage.py index 051444fc35aa..b44b1cb90a46 100644 --- a/tests/environments/storage/test_webhook_storage.py +++ b/tests/environments/storage/test_webhook_storage.py @@ -10,6 +10,7 @@ from prefect import context from prefect import task, Flow from prefect.environments.storage import Webhook +from prefect.environments.storage.webhook import _render_dict @pytest.fixture @@ -61,10 +62,8 @@ def test_create_webhook_storage(): assert storage.logger assert storage.build_request_kwargs == build_request_kwargs assert storage.build_request_http_method == "PATCH" - assert storage.build_secret_config == {} assert storage.get_flow_request_kwargs == get_flow_request_kwargs assert storage.get_flow_request_http_method == "GET" - assert storage.get_flow_secret_config == {} assert storage.secrets == [] assert storage.default_labels == ["webhook-flow-storage"] assert storage.stored_as_script is False @@ -227,17 +226,19 @@ def _mock_successful_post(*args, **kwargs): webhook.get_flow() -def test_render_headers_gets_env_variables(monkeypatch): +def test_render_dict_gets_env_variables(monkeypatch): some_cred = str(uuid.uuid4()) another_secret = str(uuid.uuid4()) monkeypatch.setenv("SOME_CRED", some_cred) webhook = Webhook( - build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_request_http_method="POST", - build_secret_config={ - "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, - "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, + build_request_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": { + "X-Api-Key": "${SOME_CRED}", + "X-Custom-Key": "${ANOTHER_SECRET}", + }, }, + build_request_http_method="POST", get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, get_flow_request_http_method="GET", ) @@ -246,55 +247,79 @@ def test_render_headers_gets_env_variables(monkeypatch): context.setdefault("secrets", {}) context.secrets["ANOTHER_SECRET"] = another_secret - initial_headers = {"X-Api-Key": "abc"} - new_headers = webhook._render_headers( - headers=initial_headers, secret_config=webhook.build_secret_config - ) + new_kwargs = _render_dict(webhook.build_request_kwargs) - # _render_headers should not have side effects - assert initial_headers == {"X-Api-Key": "abc"} + # _render_dict should not have side effects + assert webhook.build_request_kwargs == { + "url": "https://content.dropboxapi.com/2/files", + "headers": {"X-Api-Key": "${SOME_CRED}", "X-Custom-Key": "${ANOTHER_SECRET}"}, + } # env variables and secrets should have been filled in - assert new_headers["X-Api-Key"] == some_cred - assert new_headers["X-Custom-Key"] == another_secret + assert new_kwargs["headers"]["X-Api-Key"] == some_cred + assert new_kwargs["headers"]["X-Custom-Key"] == another_secret -def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch): +def test_render_dict_raises_expected_exception_on_missing_env_var(monkeypatch): monkeypatch.delenv("SOME_CRED", raising=False) webhook = Webhook( - build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_request_http_method="POST", - build_secret_config={ - "X-Api-Key": {"name": "SOME_CRED", "type": "environment"}, + build_request_kwargs={ + "url": "https://content.dropboxapi.com/2/files", + "headers": {"X-Api-Key": "Token ${SOME_CRED}"}, }, + build_request_http_method="POST", get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, get_flow_request_http_method="GET", ) - with pytest.raises(KeyError, match="SOME_CRED"): - initial_headers = {"X-Api-Key": "abc"} - webhook._render_headers( - headers=initial_headers, secret_config=webhook.build_secret_config - ) + with pytest.raises(RuntimeError, match="SOME_CRED"): + _render_dict(webhook.build_request_kwargs) -def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch): +def test_render_dict_raises_expected_exception_on_missing_secret(monkeypatch): monkeypatch.delenv("ANOTHER_SECRET", raising=False) webhook = Webhook( - build_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, - build_request_http_method="POST", - build_secret_config={ - "X-Custom-Key": {"name": "ANOTHER_SECRET", "type": "secret"}, + build_request_kwargs={ + "url": "https://content.dropboxapi.com/2/files?key=${ANOTHER_SECRET}" }, + build_request_http_method="POST", get_flow_request_kwargs={"url": "https://content.dropboxapi.com/2/files"}, get_flow_request_http_method="GET", ) - with pytest.raises(ValueError, match='Local Secret "ANOTHER_SECRET" was not found'): - initial_headers = {"X-Api-Key": "abc"} - webhook._render_headers( - headers=initial_headers, secret_config=webhook.build_secret_config - ) + with pytest.raises( + RuntimeError, match="does not refer to an environment variable or" + ): + _render_dict(webhook.build_request_kwargs) + + +def test_templating_works_with_env_variable_top_level(monkeypatch): + monkeypatch.setenv("JAVA_HOME", "abc") + x = {"a": "coffee_place: ${JAVA_HOME}"} + out = _render_dict(x) + assert out == {"a": "coffee_place: abc"} + assert x == {"a": "coffee_place: ${JAVA_HOME}"} + + +def test_templating_works_with_env_variable_recursively(monkeypatch): + monkeypatch.setenv("USER", "leia") + x = {"a": {"b": {"c": "Bearer: ${USER}"}}} + out = _render_dict(x) + assert out == {"a": {"b": {"c": "Bearer: leia"}}} + assert x == {"a": {"b": {"c": "Bearer: ${USER}"}}} + + +def templating_works_if_nothing_to_template(): + x = {"thing": 4, "stuff": [5, 6, 7], "big": {"if": True}} + out = _render_dict(x) + assert out == x + assert x == {"thing": 4, "stuff": [5, 6, 7], "big": {"if": True}} + + +def templating_works_with_embedded_json_strings(): + x = {"headers": {"dropbox-args": '{"USER"}'}} + out = _render_dict(x) + assert out == x def test_webhook_works_with_file_storage(sample_flow, tmpdir): diff --git a/tests/serialization/test_storage.py b/tests/serialization/test_storage.py index 9d98a7cbd7b6..c472238f4792 100644 --- a/tests/serialization/test_storage.py +++ b/tests/serialization/test_storage.py @@ -326,9 +326,6 @@ def test_webhook_full_serialize(): }, }, get_flow_request_http_method="POST", - build_secret_config={ - "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} - }, secrets=["CREDS"], ) f = prefect.Flow("test") @@ -355,30 +352,4 @@ def test_webhook_full_serialize(): }, } assert serialized["get_flow_request_http_method"] == "POST" - assert serialized["build_secret_config"] == { - "Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"} - } - assert serialized["build_secret_config"] == serialized["get_flow_secret_config"] assert serialized["stored_as_script"] is False - - -def test_webhook_different_secret_configs(): - build_config = { - "Authorization": {"value": "WRITE_ONLY_TOKEN", "type": "environment"} - } - get_flow_config = {"Authorization": {"value": "READ_ONLY_TOKEN", "type": "secret"}} - webhook = storage.Webhook( - build_request_kwargs={}, - build_request_http_method="POST", - get_flow_request_kwargs={}, - get_flow_request_http_method="POST", - build_secret_config=build_config, - get_flow_secret_config=get_flow_config, - ) - f = prefect.Flow("test") - webhook.flows["test"] = "key" - - serialized = WebhookSchema().dump(webhook) - assert serialized["build_secret_config"] == build_config - assert serialized["get_flow_secret_config"] == get_flow_config - assert serialized["build_secret_config"] != serialized["get_flow_secret_config"]