Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: Storage object backed by a REST API #2835

Closed
jameslamb opened this issue Jun 22, 2020 · 2 comments
Closed

feature request: Storage object backed by a REST API #2835

jameslamb opened this issue Jun 22, 2020 · 2 comments
Labels
feature A new feature

Comments

@jameslamb
Copy link

Use Case

created from a discussion in Prefect Community Slack

I'd like to propose a new type of storage. For the sake of this conversation, I'll refer to it as Webhook storage.

With Webhook storage, flows are stored and retrieved by HTTP requests. The storage object contains the details needed to construct those requests. I think this could be a lightweight but powerful way to allow users to integrate Prefect with their existing stack.

benefit 1: custom storage with external services

This could be a route to using any type of external service that exposes writing and reading binary files over HTTP.

It would allow users to write their own storage classes for things like:

and would allow the use of other cloud providers' object stores that prefect doesn't have first-class support for (like Alibaba Cloud Object Storage Service or IBM Cloud object store)

benefit 2: integration with internal services

In companies I've worked at / with before, I've seen the pattern where pubic cloud services can only be used directly by infrastructure teams, and data scientists and other application developers are restricted to only using the company's own microservices.

Adding Webhook storage would allow users in such a situation to integrate with prefect core server (Cloud or run themselves) without needing to have any credentials that allow direct access to cloud providers (which is necessary to use S3, GCS, or Azure storage).

Solution

This might look something like this:

rough sketch implementation (click me)
import io
from typing import TYPE_CHECKING, Any, Dict, List
from prefect.client import Secret


class Webhook(Storage):
    """
    Args:
        - build_kwargs (dict): Dictionary of keyword arguments to the
            function from ``requests`` used to store the flow. 
        - build_http_method (str): HTTP method identifying the type of request
            to execute when storing the flow. For example, ``"POST"`` for
            ``requests.post()``.
        - get_flow_kwargs (dict): Dictionary of keyword arguments to the
            function from ``requests`` used to retrive the flow.
        - get_flow_http_method (str): HTTP method identifying the type of
            request to execute when storing the flow. For example, ``"GET"``
            for ``requests.post()``.
        - secret_config (dict, optional): A dictionary describing how to set
            request headers from environment variables or Prefect Cloud
            secrets. See example for details on specifying this

    Passing sensitive data in headers
    ---------------------------------

    For services which require authentication, use `secret_config` to pass
    sensitive data like API keys without storing their values in this Storage object.

    This should be a dictionary whose keys are headers, and whose
    values indicate whether to retrieve real values from environment
    variables (``"type": "environment"``) or
    Prefect secrets (``"type": "secret"``).

    So, for example, to get an API key from an environment variable you
    can do the following

    .. code-block:: python

        storage = Webhoook(
            build_kwargs={
                "url": "some-random-service.place.thing",
                "headers" = {
                    "Content-Type" = "application/octet-stream"
                }
            },
            build_http_method="POST",
            ...
            ...
            secret_config={
                "X-Api-Key": {
                    "value": "MY_COOL_ENV_VARIABLE",
                    "type": "environment"
                }
            }
        )
    """

    def __init__(
        self,
        build_kwargs: dict,
        build_http_method: str,
        get_flow_kwargs: dict,
        get_flow_http_method: str,
        secret_config: dict = {},
         **kwargs: Any
    ) -> None:
        self.flows = dict()  # type: Dict[str, str]
        self._flows = dict()  # type: Dict[str, "Flow"]

        self.build_kwargs = build_kwargs
        self.build_http_method = build_http_method

        self.get_flow_kwargs = get_flow_kwargs
        self.get_flow_http_method = build_http_method

        self.secret_config = {}

        self._method_to_function = {
            "GET": requests.get,
            "POST": requests.post,
            "PUT": requests.put
        }

        super().__init__(**kwargs)

    @property
    def default_labels(self) -> List[str]:
        return ["webhook-flow-storage"]

    def _render_headers(self, headers: dict) -> dict:
        out_headers = headers.copy()
        for header, _ in headers.items():
            if header in self.secret_config.keys():
                name = self.secret_config[header]
                if self.secret_config["type"] == "environment":
                    out_headers[header] = os.environ[name]
                elif self.secret_config["type"] == "secret":
                    out_headers[header] = Secret(name).get()
        return out_header

    def get_flow(self, flow_name: str) -> "Flow":
        req_function = self._method_to_function[self.get_flow_http_method]

        get_flow_kwargs = self.get_flow_kwargs.copy()
        get_flow_kwargs["headers"] = self._render_headers(
            get_flow_kwargs["headers"]
        )

        response = req_function(**get_flow_kwargs)

        return cloudpickle.loads(response.content)

    def build(self) -> "Storage":
        self.run_basic_healthchecks()

        for flow_name, flow in self._flows.items():
            # Pickle Flow
            data = cloudpickle.dumps(flow)

            # Write pickled Flow to stream
            try:
                stream = io.BytesIO(data)
            except TypeError:
                stream = io.BytesIO(data.encode())

            # write flow to the service
            req_function = self._method_to_function[self.build_method]

            build_kwargs = self.build_kwargs.copy()
            build_kwargs["headers"] = self._render_headers(
                build_kwargs["headers"]
            )
            response = req_function(**build_kwargs)

        return self

My basic proposal is that build() executes one HTTP request and get_flow() executes another.

Open Questions

How could this work with multiple flows?

  • I personally don't understand the use case for multiple flows in one Storage object, and I'm not certain how that could work with Webhook storage (but I'm sure it could be figured out)

How could this support services where you have to write a file before you know enough to read it?

  • for example, services that generate a unique ID when you upload a file and then return that ID in the response. get_flow() needs that ID to work.
  • there is probably a workaround where, for such a service, you can build(), update details of the storage based on the response, then use flow.register(build=False)

Do any details of the HTTP client need to be customizable?

  • in my proposal, I'm only proposing customizing requests. I think that's a good starting point.
    • Do client details like retry logic need to be customizable, or could they be hardcoded to reasonable defaults?
    • are flow objects ever so large that you need to use things like multipart upload?

Alternatives

Prefect Cloud Flow Storage

Some of the uses cases mentioned above might be solved by introducing a Prefect Cloud storage service, where you just authenticate with Prefect Cloud and it acts as the cloud storage service.

  • pros

    • purpose-built for storing flows, so some things like Content-Type header and expectations about how the object is named can be hard-coded and hidden from users
  • cons

    • violates the design principle of all user code staying in users' infrastructure

Doing Nothing

Maybe this isn't a big enough concern to warrant growing the prefect codebase. All new code comes with maintenance costs, and maybe the added maintenance cost of this feature outweighs the benefit to users of an extension like this.

Closing Thoughts

If the maintainers here agree that this feature is worth pursuing, I want to note that I'd be happy to attempt a pull request. You all have been so careful and thoughtful in the design of the boundaries between different components, I feel confident that I could come up with a reasonable implementation.

Thanks for your time and consideration!

@jameslamb jameslamb added the feature A new feature label Jun 22, 2020
@jameslamb
Copy link
Author

I know there is a lot of activity happening in prefect right now (it's exciting to see!), and I don't want this to get lost.

Are the maintainers here open to this idea, and would you consider a pull request for it if I prepared one?

Totally understand if the answer is "this deserves some careful thought and we have higher priorities right now".

@joshmeek
Copy link

joshmeek commented Jul 1, 2020

@jameslamb Apologies, this got a bit lost. I'm all for this idea if you want to prepare a PR!

How could this work with multiple flows?

I don't think it needs to. Multiple flows tbh are only useful for Docker storage and (for example) the most recent file-based GitHub storage is a one-to-one relationship.

How could this support services where you have to write a file before you know enough to read it?

Yeah you would do something like this in build. We do something similar in S3 storage where we generate a key and then set it on the storage object so that way when the storage object is serialized it has all of the information needed to retrieve the flow.

Do any details of the HTTP client need to be customizable?

I think having some reasonable defaults for requests at the start would be a great way to go and then we can go from there.

@jcrist jcrist closed this as completed in 541b1c1 Aug 4, 2020
jcrist added a commit that referenced this issue Aug 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature A new feature
Projects
None yet
Development

No branches or pull requests

2 participants