Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Webhook storage (fixes #2835) #3000

Merged
merged 21 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WebHook -> Webhook
  • Loading branch information
jameslamb committed Jul 27, 2020
commit 8c0e32dca59f926a2dfcbd977284f0e0d1c5590f
2 changes: 1 addition & 1 deletion changes/pr3000.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
enhancement:
- "Add WebHook storage - [#3000](https://github.com/PrefectHQ/prefect/pull/3000)"
- "Add Webhook storage - [#3000](https://github.com/PrefectHQ/prefect/pull/3000)"

contributor:
- "[James Lamb](https://github.com/jameslamb)"
10 changes: 5 additions & 5 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,19 @@ If you do not specify a `registry_url` for your Docker Storage then the image wi
Docker Storage uses the [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/index.html) to build the image and push to a registry. Make sure you have the Docker daemon running locally and you are configured to push to your desired container registry. Additionally make sure whichever platform Agent deploys the container also has permissions to pull from that same registry.
:::

## WebHook
## Webhook

[WebHook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services.
[Webhook Storage](/api/latest/environments/storage.html#webhook) is a storage option that stores and retrieves flows with HTTP requests. This type of storage can be used with any type of agent, and is intended to be a flexible way to integrate Prefect with your existing ecosystem, including your own file storage services.

For example, the following code could be used to store flows in DropBox.

```python
from prefect import Flow
from prefect.environments.storage import WebHook
from prefect.environments.storage import Webhook

flow = Flow(
"dropbox-flow",
storage=WebHook(
storage=Webhook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files/upload",
"headers": {
Expand Down Expand Up @@ -190,7 +190,7 @@ flow.storage.build()
The `build_secret_config` is used to resolve environment variables to fill in request headers with sensitive information. Because this resolution is at runtime, this storage option never has your sensitive information stored in it and that sensitive information is never sent to Prefect Cloud. That config supports environment variables and [Prefect secrets](/core/concepts/secrets.html).

::: tip Sensible Defaults
Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `WebHook` storage.
Flows registered with this storage option will automatically be labeled with `"webhook-flow-storage"`. Add that label to an agent to tell Prefect Cloud that that agent should run flows with `Webhook` storage.
:::

### Non-Docker Storage for Containerized Environments
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from prefect import config
from prefect.agent import Agent
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, WebHook
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub, Webhook
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult

Expand Down Expand Up @@ -127,7 +127,7 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str:

if not isinstance(
StorageSchema().load(flow_run.flow.storage),
(Local, Azure, GCS, S3, GitHub, WebHook),
(Local, Azure, GCS, S3, GitHub, Webhook),
):
self.logger.error(
"Storage for flow run {} is not a supported type.".format(flow_run.id)
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/environments/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from prefect.environments.storage.gcs import GCS
from prefect.environments.storage.s3 import S3
from prefect.environments.storage.github import GitHub
from prefect.environments.storage.webhook import WebHook
from prefect.environments.storage.webhook import Webhook


def get_default_storage_class() -> type:
Expand Down
20 changes: 10 additions & 10 deletions src/prefect/environments/storage/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from prefect.core.flow import Flow


class WebHook(Storage):
class Webhook(Storage):
"""
WebHook storage class. This class represents the Storage interface for
Webhook storage class. This class represents the Storage interface for
Flows stored and retrieved with HTTP requests.

This storage class takes in keyword arguments which describe how to
Expand Down Expand Up @@ -171,7 +171,7 @@ def get_flow(self, flow_location: str = "placeholder") -> "Flow":
Args:
- flow_location (str): This argument is included to comply with the
interface used by other storage objects, but it has no meaning
for `WebHook` storage, since `WebHook` only corresponds to a
for `Webhook` storage, since `Webhook` only corresponds to a
single flow. Ignore it.

Raises:
Expand Down Expand Up @@ -200,7 +200,7 @@ def add_flow(self, flow: "Flow") -> str:
Method for adding a flow to a `Storage` object's in-memory
storage. `.build()` will look here for flows.

`WebHook` storage only supports a single flow per storage
`Webhook` storage only supports a single flow per storage
object, so this method will overwrite any existing flows
stored in an instance.

Expand All @@ -214,9 +214,9 @@ def add_flow(self, flow: "Flow") -> str:
self._flows = {flow.name: flow}
return flow.name

def build(self) -> "WebHook":
def build(self) -> "Webhook":
"""
Build the WebHook storage object by issuing an HTTP request
Build the Webhook storage object by issuing an HTTP request
to store the flow.

If `self.stored_as_script` is `True`, this method
Expand All @@ -236,7 +236,7 @@ def build(self) -> "WebHook":
import requests

from prefect import task, Task, Flow
from prefect.environments.storage import WebHook
from prefect.environments.storage import Webhook

@task
def random_number():
Expand All @@ -245,7 +245,7 @@ def random_number():
random_number()


flow.storage = WebHook(
flow.storage = Webhook(
build_kwargs={
"url": "some-service/upload",
"headers": {"Content-Type": "application/octet-stream"},
Expand All @@ -269,7 +269,7 @@ def random_number():
```

Returns:
- Storage: a WebHook storage object
- Storage: a Webhook storage object

Raises:
- requests.exceptions.HTTPError if pushing the flow fails
Expand Down Expand Up @@ -344,7 +344,7 @@ def _render_headers(

Args:
- headers (dict): A dictionary of headers.
- secret_config (dict): A secret config. See `help(WebHook)` for
- secret_config (dict): A secret config. See `help(Webhook)` for
details on how this should be constructed.
Raises:
- KeyError if referencing an environment variable that does not exist
Expand Down
10 changes: 5 additions & 5 deletions src/prefect/serialization/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
Local,
Storage,
GitHub,
WebHook,
Webhook,
)
from prefect.utilities.serialization import JSONCompatible, ObjectSchema, OneOfSchema

Expand Down Expand Up @@ -135,9 +135,9 @@ def create_object(self, data: dict, **kwargs: Any) -> GitHub:
return base_obj


class WebHookSchema(ObjectSchema):
class WebhookSchema(ObjectSchema):
class Meta:
object_class = WebHook
object_class = Webhook

build_kwargs = fields.Dict(key=fields.Str, allow_none=False)
build_http_method = fields.String(allow_none=False)
Expand All @@ -150,7 +150,7 @@ class Meta:
secrets = fields.List(fields.Str(), allow_none=True)

@post_load
def create_object(self, data: dict, **kwargs: Any) -> WebHook:
def create_object(self, data: dict, **kwargs: Any) -> Webhook:
flows = data.pop("flows", dict())
base_obj = super().create_object(data)
base_obj.flows = flows
Expand All @@ -171,5 +171,5 @@ class StorageSchema(OneOfSchema):
"Storage": BaseStorageSchema,
"S3": S3Schema,
"GitHub": GitHubSchema,
"WebHook": WebHookSchema,
"Webhook": WebhookSchema,
}
4 changes: 2 additions & 2 deletions tests/agent/test_local_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from testfixtures import compare, LogCapture

from prefect.agent.local import LocalAgent
from prefect.environments.storage import Docker, Local, Azure, GCS, S3, WebHook
from prefect.environments.storage import Docker, Local, Azure, GCS, S3, Webhook
from prefect.utilities.configuration import set_temporary_config
from prefect.utilities.graphql import GraphQLResult

Expand Down Expand Up @@ -289,7 +289,7 @@ def test_local_agent_deploy_processes_webhook_storage(monkeypatch, runner_token)
monkeypatch.setattr("prefect.agent.local.agent.Popen", popen)

agent = LocalAgent()
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "test-service/upload"},
build_http_method="POST",
get_flow_kwargs={"url": "test-service/download"},
Expand Down
42 changes: 21 additions & 21 deletions tests/environments/storage/test_webhook_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from prefect import context
from prefect import task, Flow
from prefect.environments.storage import WebHook
from prefect.environments.storage import Webhook


@pytest.fixture
Expand Down Expand Up @@ -51,7 +51,7 @@ def raise_for_status(self, *args, **kwargs) -> None:
def test_create_webhook_storage():
build_kwargs = {"url": "https://content.dropboxapi.com/2/files/upload"}
get_flow_kwargs = {"url": "https://content.dropboxapi.com/2/files/download"}
storage = WebHook(
storage = Webhook(
build_kwargs=build_kwargs,
build_http_method="PATCH",
get_flow_kwargs=get_flow_kwargs,
Expand All @@ -74,7 +74,7 @@ def test_all_valid_http_verb_combinations_work():
possible_verbs = ["GET", "PATCH", "POST", "PUT"]
for build_verb in possible_verbs:
for get_verb in possible_verbs:
storage = WebHook(
storage = Webhook(
build_kwargs={"url": "whatever"},
build_http_method=build_verb,
get_flow_kwargs={"url": "whatever"},
Expand All @@ -86,7 +86,7 @@ def test_all_valid_http_verb_combinations_work():

def test_webhook_fails_for_bad_build_http_method():
with pytest.raises(RuntimeError, match="HTTP method 'PASTA' not recognized"):
WebHook(
Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="PASTA",
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
Expand All @@ -96,7 +96,7 @@ def test_webhook_fails_for_bad_build_http_method():

def test_webhook_fails_for_bad_get_flow_http_method():
with pytest.raises(RuntimeError, match="HTTP method 'BET' not recognized"):
WebHook(
Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
Expand All @@ -105,7 +105,7 @@ def test_webhook_fails_for_bad_get_flow_http_method():


def test_add_flow_and_contains_work_as_expected(sample_flow):
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
Expand All @@ -122,7 +122,7 @@ def test_add_flow_and_contains_work_as_expected(sample_flow):


def test_webhook_build_works_with_no_arguments(sample_flow):
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
Expand All @@ -144,14 +144,14 @@ def _mock_successful_post(*args, **kwargs):
webhook.add_flow(sample_flow)

res = webhook.build()
assert isinstance(res, WebHook)
assert isinstance(res, Webhook)

res = webhook.get_flow()
assert isinstance(res, Flow)


def test_webhook_raises_warning_if_data_in_build_kwargs(sample_flow):
webhook = WebHook(
webhook = Webhook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"data": cloudpickle.dumps(sample_flow),
Expand Down Expand Up @@ -179,11 +179,11 @@ def _mock_successful_post(*args, **kwargs):
RuntimeWarning, match="flow content and should not be set directly"
):
res = webhook.build()
assert isinstance(res, WebHook)
assert isinstance(res, Webhook)


def test_webhook_raises_error_on_build_failure(sample_flow):
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
Expand All @@ -203,7 +203,7 @@ def _mock_failed_post(*args, **kwargs):


def test_webhook_raises_error_on_get_flow_failure(sample_flow):
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
get_flow_kwargs={"url": "https://content.dropboxapi.com/2/files"},
Expand Down Expand Up @@ -231,7 +231,7 @@ def test_render_headers_gets_env_variables(monkeypatch):
some_cred = str(uuid.uuid4())
another_secret = str(uuid.uuid4())
monkeypatch.setenv("SOME_CRED", some_cred)
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
build_secret_config={
Expand Down Expand Up @@ -261,7 +261,7 @@ def test_render_headers_gets_env_variables(monkeypatch):

def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch):
monkeypatch.delenv("SOME_CRED", raising=False)
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
build_secret_config={
Expand All @@ -280,7 +280,7 @@ def test_render_headers_raises_expected_exception_on_missing_env_var(monkeypatch

def test_render_headers_raises_expected_exception_on_missing_secret(monkeypatch):
monkeypatch.delenv("ANOTHER_SECRET", raising=False)
webhook = WebHook(
webhook = Webhook(
build_kwargs={"url": "https://content.dropboxapi.com/2/files"},
build_http_method="POST",
build_secret_config={
Expand All @@ -304,7 +304,7 @@ def test_webhook_works_with_file_storage(sample_flow, tmpdir):
with open(script_file, "w") as f:
f.write(script_contents)

webhook = WebHook(
webhook = Webhook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Content-Type": "application/octet-stream"},
Expand Down Expand Up @@ -335,7 +335,7 @@ def _mock_successful_post(*args, **kwargs):
webhook.add_flow(sample_flow)

res = webhook.build()
assert isinstance(res, WebHook)
assert isinstance(res, Webhook)

res = webhook.get_flow()
assert isinstance(res, Flow)
Expand All @@ -344,7 +344,7 @@ def _mock_successful_post(*args, **kwargs):

def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flow):

webhook = WebHook(
webhook = Webhook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Content-Type": "application/octet-stream"},
Expand All @@ -363,15 +363,15 @@ def test_webhook_throws_informative_error_if_flow_script_path_not_set(sample_flo
error_msg = "flow_script_path must be provided if stored_as_script=True"
with pytest.raises(RuntimeError, match=error_msg):
res = webhook.build()
assert isinstance(res, WebHook)
assert isinstance(res, Webhook)


def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist(
sample_flow,
):

nonexistent_file = "{}.py".format(str(uuid.uuid4()))
webhook = WebHook(
webhook = Webhook(
build_kwargs={
"url": "https://content.dropboxapi.com/2/files",
"headers": {"Content-Type": "application/octet-stream"},
Expand All @@ -391,4 +391,4 @@ def test_webhook_throws_informative_error_if_flow_script_file_does_not_exist(
error_msg = "passed to flow_script_path does not exist"
with pytest.raises(RuntimeError, match=error_msg):
res = webhook.build()
assert isinstance(res, WebHook)
assert isinstance(res, Webhook)
Loading