Skip to content

Commit

Permalink
add support for stored_as_script
Browse files Browse the repository at this point in the history
  • Loading branch information
jameslamb committed Jul 27, 2020
1 parent 849a53f commit 59bcc06
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 1 deletion.
46 changes: 45 additions & 1 deletion src/prefect/environments/storage/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from prefect.client import Secret
from prefect.environments.storage import Storage
from prefect.utilities.storage import extract_flow_from_file

if TYPE_CHECKING:
from prefect.core.flow import Flow
Expand Down Expand Up @@ -54,6 +55,18 @@ class WebHook(Storage):
as well. Pass a value to this argument only if you want to use a
different config for `.get_flow()` than the one used for
`.build()`.
- stored_as_script (bool, optional): boolean for specifying if the
flow has been stored as a `.py` file. Defaults to `False`.
- flow_script_path (str, optional): path to a local `.py` file that
defines the flow. You must pass a value to this argument if
`stored_as_script` is `True`. This script's content will be read
into a string and attached to the request in `build()` as UTF-8
encoded binary data. Similarly, `.get_flow()` expects that the
script's contents will be returned as binary data. This path will
not be sent to Prefect Cloud and is only needed when running
`.build()`.
- **kwargs (Any, optional): any additional `Storage` initialization
options
Passing sensitive data in headers
---------------------------------
Expand Down Expand Up @@ -99,6 +112,8 @@ def __init__(
get_flow_http_method: str,
build_secret_config: Optional[Dict[str, Any]] = None,
get_flow_secret_config: Optional[Dict[str, Any]] = None,
stored_as_script: bool = False,
flow_script_path: Optional[str] = None,
**kwargs: Any,
) -> None:
self.flows = dict() # type: Dict[str, str]
Expand Down Expand Up @@ -127,6 +142,9 @@ def __init__(
self.logger.fatal(msg)
raise RuntimeError(msg)

self.stored_as_script = stored_as_script
self.flow_script_path = flow_script_path

self.build_kwargs = build_kwargs
self.build_http_method = build_http_method
self.build_secret_config = build_secret_config or {}
Expand All @@ -137,7 +155,7 @@ def __init__(

self._build_responses: Optional[Dict[str, Response]] = None

super().__init__(**kwargs)
super().__init__(stored_as_script=stored_as_script, **kwargs)

@property
def default_labels(self) -> List[str]:
Expand Down Expand Up @@ -171,6 +189,10 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow":
response = req_function(**get_flow_kwargs) # type: ignore
response.raise_for_status()

if self.stored_as_script:
flow_script_content = response.content.decode("utf-8")
return extract_flow_from_file(file_contents=flow_script_content) # type: ignore

return cloudpickle.loads(response.content)

def add_flow(self, flow: "Flow") -> str:
Expand All @@ -197,6 +219,10 @@ def build(self) -> "WebHook":
Build the WebHook storage object by issuing an HTTP request
to store the flow.
If `self.stored_as_script` is `True`, this method
will read in the contents of `self.flow_script_path`, convert it to
byes, and attach it to the request as `data`.
The response from this request is stored in `._build_responses`,
a dictionary keyed by flow name. If you are using a service where
all the details necessary to fetch a flow cannot be known until you've
Expand Down Expand Up @@ -255,6 +281,24 @@ def random_number():
self.logger.info("Uploading flow '{}'".format(flow_name))

data = cloudpickle.dumps(flow)
if self.stored_as_script:

# these checks are here in build() instead of the constructor
# so that serialization and deserialization of flows does not fail
if not self.flow_script_path:
msg = "flow_script_path must be provided if stored_as_script=True"
self.logger.fatal(msg)
raise RuntimeError(msg)

if not os.path.isfile(self.flow_script_path):
msg = "file '{}' passed to flow_script_path does not exist".format(
self.flow_script_path
)
self.logger.fatal(msg)
raise RuntimeError(msg)

with open(self.flow_script_path, "r") as f:
data = f.read().encode("utf-8")

req_function = self._method_to_function[self.build_http_method]

Expand Down
1 change: 1 addition & 0 deletions src/prefect/serialization/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class Meta:
get_flow_http_method = fields.String(allow_none=False)
build_secret_config = fields.Dict(key=fields.Str, allow_none=False)
get_flow_secret_config = fields.Dict(key=fields.Str, allow_none=False)
stored_as_script = fields.Bool(allow_none=True)
flows = fields.Dict(key=fields.Str(), values=fields.Str())
secrets = fields.List(fields.Str(), allow_none=True)

Expand Down
99 changes: 99 additions & 0 deletions tests/environments/storage/test_webhook_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import cloudpickle
import os
import pytest
import random
import uuid
Expand Down Expand Up @@ -66,6 +67,7 @@ def test_create_webhook_storage():
assert storage.get_flow_secret_config == {}
assert storage.secrets == []
assert storage.default_labels == ["webhook-flow-storage"]
assert storage.stored_as_script is False


def test_all_valid_http_verb_combinations_work():
Expand Down Expand Up @@ -293,3 +295,100 @@ def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch)
webhook._render_headers(
headers=initial_headers, secret_config=webhook.build_secret_config
)


def test_webhook_works_with_file_storage(sample_flow, tmpdir):

script_file = os.path.join(tmpdir, "{}.py".format(str(uuid.uuid4())))
script_contents = """from prefect import Flow\nf=Flow('test-flow')"""
with open(script_file, "w") as f:
f.write(script_contents)

webhook = WebHook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Content-Type": "application/octet-stream"},
},
build_http_method="POST",
get_flow_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Accept": "application/octet-stream"},
},
get_flow_http_method="GET",
stored_as_script=True,
flow_script_path=script_file,
)

def _mock_successful_get(*args, **kwargs):
file_contents = """from prefect import Flow\nf=Flow('test-flow')"""
return _MockResponse(
status_code=200, json={}, content=file_contents.encode("utf-8")
)

def _mock_successful_post(*args, **kwargs):
return _MockResponse(status_code=200, json={"id": "abc"})

webhook._method_to_function = {
"GET": _mock_successful_get,
"POST": _mock_successful_post,
}
webhook.add_flow(sample_flow)

res = webhook.build()
assert isinstance(res, WebHook)

res = webhook.get_flow()
assert isinstance(res, Flow)
assert res.name == "test-flow"


def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flow):

webhook = WebHook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Content-Type": "application/octet-stream"},
},
build_http_method="POST",
get_flow_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Accept": "application/octet-stream"},
},
get_flow_http_method="GET",
stored_as_script=True,
)

webhook.add_flow(sample_flow)

error_msg = "flow_script_path must be provided if stored_as_script=True"
with pytest.raises(RuntimeError, match=error_msg):
res = webhook.build()
assert isinstance(res, WebHook)


def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist(
sample_flow,
):

nonexistent_file = "{}.py".format(str(uuid.uuid4()))
webhook = WebHook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Content-Type": "application/octet-stream"},
},
build_http_method="POST",
get_flow_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Accept": "application/octet-stream"},
},
get_flow_http_method="GET",
stored_as_script=True,
flow_script_path=nonexistent_file,
)

webhook.add_flow(sample_flow)

error_msg = "passed to flow_script_path does not exist"
with pytest.raises(RuntimeError, match=error_msg):
res = webhook.build()
assert isinstance(res, WebHook)
1 change: 1 addition & 0 deletions tests/serialization/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def test_webhook_full_serialize():
"Authorization": {"value": "DBOX_OAUTH2_TOKEN", "type": "environment"}
}
assert serialized["build_secret_config"] == serialized["get_flow_secret_config"]
assert serialized["stored_as_script"] is False


def test_webhook_different_secret_configs():
Expand Down

0 comments on commit 59bcc06

Please sign in to comment.