Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Webhook storage (fixes #2835) #3000

Merged
merged 21 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
replace secret_config with templating
  • Loading branch information
jameslamb committed Aug 2, 2020
commit 937dd84700ba5799319da2637a593d4a61cbbf64
7 changes: 3 additions & 4 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ flow = Flow(
"strict_conflict": True,
}
),
"Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}"
},
},
build_request_http_method="POST",
Expand All @@ -175,19 +176,17 @@ flow = Flow(
"Dropbox-API-Arg": json.dumps(
{"path": "/Apps/prefect-test-app/dropbox-flow.flow"}
),
"Authorization": "Bearer ${DBOX_OAUTH2_TOKEN}"
},
},
get_flow_request_http_method="POST",
build_secret_config={
"Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"}
},
)
)

flow.storage.build()
```

The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html).
Template strings in `${}` are used to reference sensitive information. Given `${SOME_TOKEN}`, this storage object will first look in environment variable `SOME_TOKEN` and then fall back to [Prefect secrets](/core/concepts/secrets.html) `SOME_TOKEN`. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud.

::: tip Sensible Defaults
Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `Webhook` storage.
Expand Down
206 changes: 125 additions & 81 deletions src/prefect/environments/storage/webhook.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import cloudpickle
import os
import string
import warnings

from collections.abc import Mapping
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional

from requests import Session
from requests.adapters import HTTPAdapter
Expand All @@ -18,14 +20,88 @@
from prefect.core.flow import Flow


class _SecretMapping(Mapping):
"""
Magic mapping. When `__getitem__()` is called
to look for the value of a `key`, this mapping will
try to resolve it from the environment and, if not found
there, from `prefect` secrets.

Raises:
- RuntimeError if the key you're looking for isn't found
"""

def __getitem__(self, key: str) -> str:
out = os.getenv(key, None)
if out is None:
try:
out = Secret(key).get()
except ValueError:
msg = (
"Template value '{}' does not refer to an "
"environment variable or Prefect secret."
)
raise RuntimeError(msg.format(key))
return out

def __iter__(self) -> Iterable:
return iter([])

def __len__(self) -> int:
return 0


_mapping = _SecretMapping()


def _render_dict(input_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Replace string elements in a dictionary with environment variables
or Prefect secrets.

This method will look through all string values (not keys) of a dictionary,
recursively, for template string. When it encounters a string like
`${abc}`, it will first try to replace it with the value of environment
variable `abc`. If environment variable `abc` is not defined, it will try
to replace it with `prefect` secret `abc`. If that is also not found,
raises a `KeyError`.

Args:
- input_dict (Dict[str, Any]): A dictionary that may contain
template strings.

Returns:
- A dictionary with template strings replaced by the literal values of
environment variables or Prefect secrets

Raises:
- RuntimeError if any template value cannot be found in the environment
or `prefect` secrets
"""
input_dict = deepcopy(input_dict)
jcrist marked this conversation as resolved.
Show resolved Hide resolved
output_dict = {}

for key, value in input_dict.items():
if isinstance(value, str):
new_value = string.Template(value).substitute(_mapping)
output_dict[key] = new_value
elif isinstance(value, dict):
output_dict[key] = _render_dict(value)
else:
output_dict[key] = value

return output_dict


class Webhook(Storage):
"""
Webhook storage class. This class represents the Storage interface for
Flows stored and retrieved with HTTP requests.

This storage class takes in keyword arguments which describe how to
create the requests, including information on how to fill in headers
from environment variable or Prefect Cloud secrets.
create the requests. These arguments' values can contain template
strings which will be filled in dynamically from environment variables
or Prefect secrets.

**Note**: Flows registered with this Storage option will automatically be
labeled with `webhook-flow-storage`.
Expand All @@ -35,26 +111,14 @@ class Webhook(Storage):
function from `requests` used to store the flow. Do not supply
`"data"` to this argument, as it will be overwritten with the
flow's content when `.build()` is run.
- build_request_http_method (str): HTTP method identifying the type of request
to execute when storing the flow. For example, `"POST"` for
- build_request_http_method (str): HTTP method identifying the type of
request to execute when storing the flow. For example, `"POST"` for
`requests.post()`.
- build_secret_config (dict): A dictionary describing how to set
request headers from environment variables or Prefect Cloud
secrets. See example for details on specifying this. This config
applies to the request issued by `.build()`, and will also be
used for `.get_flow()` unless you explicitly set
`get_flow_secret_config`.
- get_flow_request_kwargs (dict): Dictionary of keyword arguments to the
function from `requests` used to retrieve the flow.
- get_flow_request_http_method (str): HTTP method identifying the type of
request to execute when storing the flow. For example, `"GET"`
- get_flow_request_kwargs (dict): Dictionary of keyword arguments to
the function from `requests` used to retrieve the flow.
- get_flow_request_http_method (str): HTTP method identifying the type
of request to execute when storing the flow. For example, `"GET"`
for `requests.post()`.
- get_flow_secret_config (dict): Similar to `build_secret_config`, but
used for the request in `.get_flow()`. By default, the config
passed to `build_secret_config` will be used for `.get_flow()`
as well. Pass a value to this argument only if you want to use a
different config for `.get_flow()` than the one used for
`.build()`.
- stored_as_script (bool, optional): boolean for specifying if the
flow has been stored as a `.py` file. Defaults to `False`.
- flow_script_path (str, optional): path to a local `.py` file that
Expand All @@ -68,38 +132,54 @@ class Webhook(Storage):
- **kwargs (Any, optional): any additional `Storage` initialization
options

Passing sensitive data in headers
---------------------------------
Including Sensitive Data
------------------------

It is common for requests used with this storage to need access to
sensitive information.

For services which require authentication, use `secret_config` to pass
sensitive data like API keys without storing their values in this Storage
object.
For example:

This should be a dictionary whose keys are headers, and whose
values indicate whether to retrieve real values from environment
variables (`"type": "environment"`) or Prefect Cloud secrets
(`"type": "secret"`).
- auth tokens passed in headers like `X-Api-Key` or `Authorization`
- auth information passed in to URL as query parameters

`Webhook` storage supports the inclusion of such sensitive information
with templating. Any of the string values passed to
`build_flow_request_kwargs` or `get_flow_request_kwargs` can include
template strings like `${SOME_VARIABLE}`. When `.build()` or `.get_flow()`
is run, such values will be replaced with the value of environment
variables or, when no matching environment variable is found, Prefect
Secrets.

So, for example, to get an API key from an environment variable you
can do the following

```python
storage = Webhook(
build_request_kwargs={
"url": "some-service",
"url": "some-service/upload",
"headers" = {
"Content-Type" = "application/octet-stream"
"Content-Type" = "application/octet-stream",
"X-Api-Key": "${MY_COOL_ENV_VARIABLE}"
}
},
build_request_http_method="POST",
...
...
build_secret_config={
"X-Api-Key": {
"name": "MY_COOL_ENV_VARIABLE",
"type": "environment"
)
```

You can also take advantage of this templating when only part
of a string needs to be replaced.

```python
storage = Webhook(
get_flow_request_kwargs={
"url": "some-service/download",
"headers" = {
"Accept" = "application/octet-stream",
"Authorization": "Bearer ${MY_COOL_ENV_VARIABLE}"
}
}
},
build_request_http_method="POST",
)
```
"""
Expand All @@ -110,8 +190,6 @@ def __init__(
build_request_http_method: str,
get_flow_request_kwargs: Dict[str, Any],
get_flow_request_http_method: str,
build_secret_config: Optional[Dict[str, Any]] = None,
get_flow_secret_config: Optional[Dict[str, Any]] = None,
stored_as_script: bool = False,
flow_script_path: Optional[str] = None,
**kwargs: Any,
Expand Down Expand Up @@ -147,11 +225,9 @@ def __init__(

self.build_request_kwargs = build_request_kwargs
self.build_request_http_method = build_request_http_method
self.build_secret_config = build_secret_config or {}

self.get_flow_request_kwargs = get_flow_request_kwargs
self.get_flow_request_http_method = get_flow_request_http_method
self.get_flow_secret_config = get_flow_secret_config or self.build_secret_config

self._build_responses: Optional[Dict[str, Response]] = None

Expand Down Expand Up @@ -180,11 +256,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow":
self.logger.info("Retrieving flow")
req_function = self._method_to_function[self.get_flow_request_http_method]

get_flow_request_kwargs = deepcopy(self.get_flow_request_kwargs)
get_flow_request_kwargs["headers"] = self._render_headers(
headers=get_flow_request_kwargs.get("headers", {}),
secret_config=self.get_flow_secret_config,
)
get_flow_request_kwargs = _render_dict(self.get_flow_request_kwargs)

response = req_function(**get_flow_request_kwargs) # type: ignore
response.raise_for_status()
Expand Down Expand Up @@ -284,7 +356,7 @@ def random_number():
if self.stored_as_script:

# these checks are here in build() instead of the constructor
# so that serialization and deserialization of flows does not fail
# so that serialization and deserialization of flows doesnot fail
if not self.flow_script_path:
msg = "flow_script_path must be provided if stored_as_script=True"
self.logger.fatal(msg)
Expand All @@ -302,16 +374,13 @@ def random_number():

req_function = self._method_to_function[self.build_request_http_method]

build_request_kwargs = deepcopy(self.build_request_kwargs)
build_request_kwargs["headers"] = self._render_headers(
headers=build_request_kwargs.get("headers", {}),
secret_config=self.build_secret_config,
)
build_request_kwargs = _render_dict(self.build_request_kwargs)

if "data" in build_request_kwargs.keys():
msg = (
"'data' found in build_request_kwargs. This value is overwritten "
"with the flow content and should not be set directly"
"'data' found in build_request_kwargs. This value is "
"overwritten with the flow content and should not "
"be set directly"
)
self.logger.warning(msg)
warnings.warn(msg, RuntimeWarning)
Expand All @@ -333,28 +402,3 @@ def __contains__(self, obj: Any) -> bool:
if not isinstance(obj, str):
return False
return obj in self.flows

@staticmethod
def _render_headers(
headers: Dict[str, Any], secret_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Given a dictionary of headers, add additional headers with values
resolved from environment variables or Prefect Cloud secrets.

Args:
- headers (dict): A dictionary of headers.
- secret_config (dict): A secret config. See `help(Webhook)` for
details on how this should be constructed.
Raises:
- KeyError if referencing an environment variable that does not exist
- ValueError if referencing a Secret that does not exist
"""
out_headers = deepcopy(headers)
for header, details in secret_config.items():
name = details["name"]
if details["type"] == "environment":
out_headers[header] = os.environ[name]
elif details["type"] == "secret":
out_headers[header] = Secret(name).get()
return out_headers
2 changes: 0 additions & 2 deletions src/prefect/serialization/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ class Meta:
build_request_http_method = fields.String(allow_none=False)
get_flow_request_kwargs = fields.Dict(key=fields.Str, allow_none=False)
get_flow_request_http_method = fields.String(allow_none=False)
build_secret_config = fields.Dict(key=fields.Str, allow_none=False)
get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False)
stored_as_script = fields.Bool(allow_none=True)
flows = fields.Dict(key=fields.Str(), values=fields.Str())
secrets = fields.List(fields.Str(), allow_none=True)
Expand Down
Loading