diff --git a/CHANGELOG.md b/CHANGELOG.md index 1244035ffdc0..82e2464546a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - The CLI command `prefect execute-flow` and `prefect execute-cloud-flow` no longer exist - [#1059](https://github.com/PrefectHQ/prefect/pull/1059) - The `slack_notifier` state handler now uses a `webhook_secret` kwarg to pull the URL from a Secret - [#1075](https://github.com/PrefectHQ/prefect/issues/1075) +- Remove the `CloudResultHandler` default result handler - [#1198](https://github.com/PrefectHQ/prefect/pull/1198) ### Contributors diff --git a/docs/outline.toml b/docs/outline.toml index 9847ecbf5122..e7f3bad7340b 100644 --- a/docs/outline.toml +++ b/docs/outline.toml @@ -126,7 +126,7 @@ classes = ["JSONResultHandler", "GCSResultHandler", "LocalResultHandler", "S3Res [pages.engine.cloud] title = "Cloud" module = "prefect.engine.cloud" -classes = ["CloudFlowRunner", "CloudTaskRunner", "CloudResultHandler"] +classes = ["CloudFlowRunner", "CloudTaskRunner"] [pages.environments.storage] title = "Storage" diff --git a/src/prefect/config.toml b/src/prefect/config.toml index e2fda4b76a22..eefd0a0cd392 100644 --- a/src/prefect/config.toml +++ b/src/prefect/config.toml @@ -7,7 +7,6 @@ debug = false api = "https://api.prefect.io" graphql = "${cloud.api}/graphql/alpha" log = "${cloud.api}/log" -result_handler = "${cloud.api}/result-handler" use_local_secrets = true heartbeat_interval = 30.0 @@ -69,8 +68,8 @@ run_on_schedule = true default_class = "prefect.engine.flow_runner.FlowRunner" [engine.result_handler] - # the default task runner, specified using a full path - default_class = "prefect.engine.cloud.CloudResultHandler" + # the default result handler, specified using a full path + default_class = "" [engine.task_runner] # the default task runner, specified using a full path diff --git a/src/prefect/engine/__init__.py b/src/prefect/engine/__init__.py index b90f8cf278f6..4af08ca91471 100644 --- a/src/prefect/engine/__init__.py +++ b/src/prefect/engine/__init__.py @@ -86,18 +86,17 @@ def get_default_result_handler_class() -> type: value is a string, it will attempt to load the already-imported object. Otherwise, the value is returned. - Defaults to `CloudResultHandler` if the string config value can not be loaded + Defaults to `None` if the string config value can not be loaded """ config_value = config.get_nested("engine.result_handler.default_class") if isinstance(config_value, str): + if not config_value: + return lambda *args, **kwargs: None # type: ignore try: return prefect.utilities.serialization.from_qualified_name(config_value) except ValueError: - warn( - "Could not import {}; using " - "prefect.engine.cloud.CloudResultHandler instead.".format(config_value) - ) - return prefect.engine.cloud.CloudResultHandler + warn("Could not import {}; using " "None instead.".format(config_value)) + return lambda *args, **kwargs: None # type: ignore else: return config_value diff --git a/src/prefect/engine/cloud/__init__.py b/src/prefect/engine/cloud/__init__.py index fa729aecf60c..33d633727ef1 100644 --- a/src/prefect/engine/cloud/__init__.py +++ b/src/prefect/engine/cloud/__init__.py @@ -1,3 +1,2 @@ -from prefect.engine.cloud.result_handler import CloudResultHandler from prefect.engine.cloud.task_runner import CloudTaskRunner from prefect.engine.cloud.flow_runner import CloudFlowRunner diff --git a/src/prefect/engine/cloud/result_handler.py b/src/prefect/engine/cloud/result_handler.py deleted file mode 100644 index 7aa3bf8def8a..000000000000 --- a/src/prefect/engine/cloud/result_handler.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Result Handlers provide the hooks that Prefect uses to store task results in production; a `ResultHandler` can be provided to a `Flow` at creation. - -Anytime a task needs its output or inputs stored, a result handler is used to determine where this data should be stored (and how it can be retrieved). -""" -import base64 -import tempfile -from typing import Any, Optional - -import cloudpickle - -from prefect import config -from prefect.client.client import Client -from prefect.engine.result_handlers import ResultHandler - - -class CloudResultHandler(ResultHandler): - """ - Hook for storing and retrieving task results from Prefect cloud storage. - - Args: - - result_handler_service (str, optional): the location of the service - which will further process and store the results; if not provided, will default to - the value of `cloud.result_handler` in your config file - """ - - def __init__(self, result_handler_service: str = None) -> None: - self._client = None # type: Optional[Client] - if result_handler_service is None: - self.result_handler_service = config.cloud.result_handler - else: - self.result_handler_service = result_handler_service - super().__init__() - - def _initialize_client(self) -> None: - """ - Helper method for ensuring that CloudHandlers which are initialized locally - do not attempt to start a Client. This is important because CloudHandlers are - currently attached to `Flow` objects which need to be serialized / deserialized - independently of cloud settings. - - This will instantiate a Client upon the first call to (de)serialize. - """ - if self._client is None: - self._client = Client() - - def read(self, uri: str) -> Any: - """ - Read a result from the given URI location. - - Args: - - uri (str): the path to the location of a result - - Returns: - - the deserialized result from the provided URI - """ - self._initialize_client() - - self.logger.debug("Starting to read result from {}...".format(uri)) - res = self._client.get( # type: ignore - "/", server=self.result_handler_service, **{"uri": uri} - ) - - try: - return_val = cloudpickle.loads(base64.b64decode(res.get("result", ""))) - except EOFError: - return_val = None - self.logger.debug("Finished reading result from {}...".format(uri)) - - return return_val - - def write(self, result: Any) -> str: - """ - Write the provided result to Prefect Cloud. - - Args: - - result (Any): the result to store - - Returns: - - str: the URI path to the result in Cloud storage - """ - self._initialize_client() - - binary_data = base64.b64encode(cloudpickle.dumps(result)).decode() - self.logger.debug( - "Starting to upload result to {}...".format(self.result_handler_service) - ) - res = self._client.post( # type: ignore - "/", server=self.result_handler_service, **{"result": binary_data} - ) - self.logger.debug( - "Finished uploading result to {}...".format(self.result_handler_service) - ) - return res.get("uri", "") diff --git a/src/prefect/environments/execution/cloud/environment.py b/src/prefect/environments/execution/cloud/environment.py index 45dcf3f91b5f..9f2129b18b29 100644 --- a/src/prefect/environments/execution/cloud/environment.py +++ b/src/prefect/environments/execution/cloud/environment.py @@ -259,12 +259,11 @@ def _populate_job_yaml( env[0]["value"] = prefect.config.cloud.graphql env[1]["value"] = prefect.config.cloud.log - env[2]["value"] = prefect.config.cloud.result_handler - env[3]["value"] = prefect.config.cloud.auth_token - env[4]["value"] = flow_run_id - env[5]["value"] = prefect.context.get("namespace", "") - env[6]["value"] = docker_name - env[7]["value"] = flow_file_path + env[2]["value"] = prefect.config.cloud.auth_token + env[3]["value"] = flow_run_id + env[4]["value"] = prefect.context.get("namespace", "") + env[5]["value"] = docker_name + env[6]["value"] = flow_file_path # set image yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] = docker_name @@ -292,9 +291,8 @@ def _populate_worker_pod_yaml(self, yaml_obj: dict) -> dict: env[0]["value"] = prefect.config.cloud.graphql env[1]["value"] = prefect.config.cloud.log - env[2]["value"] = prefect.config.cloud.result_handler - env[3]["value"] = prefect.config.cloud.auth_token - env[4]["value"] = prefect.context.get("flow_run_id", "") + env[2]["value"] = prefect.config.cloud.auth_token + env[3]["value"] = prefect.context.get("flow_run_id", "") if self.private_registry: namespace = prefect.context.get("namespace", "") diff --git a/src/prefect/serialization/result_handlers.py b/src/prefect/serialization/result_handlers.py index 1c4f15ea4f48..3a8f24a40848 100644 --- a/src/prefect/serialization/result_handlers.py +++ b/src/prefect/serialization/result_handlers.py @@ -3,7 +3,6 @@ from marshmallow import ValidationError, fields, post_load -from prefect.engine.cloud.result_handler import CloudResultHandler from prefect.engine.result_handlers import ( GCSResultHandler, JSONResultHandler, @@ -39,13 +38,6 @@ def create_object(self, data: dict, **kwargs: Any) -> None: return None -class CloudResultHandlerSchema(BaseResultHandlerSchema): - class Meta: - object_class = CloudResultHandler - - result_handler_service = fields.String(allow_none=True) - - class GCSResultHandlerSchema(BaseResultHandlerSchema): class Meta: object_class = GCSResultHandler @@ -84,7 +76,6 @@ class ResultHandlerSchema(OneOfSchema): "ResultHandler": BaseResultHandlerSchema, "GCSResultHandler": GCSResultHandlerSchema, "S3ResultHandler": S3ResultHandlerSchema, - "CloudResultHandler": CloudResultHandlerSchema, "JSONResultHandler": JSONResultHandlerSchema, "LocalResultHandler": LocalResultHandlerSchema, } diff --git a/tests/engine/cloud/test_cloud_result_handler.py b/tests/engine/cloud/test_cloud_result_handler.py deleted file mode 100644 index 7047c2f62ee4..000000000000 --- a/tests/engine/cloud/test_cloud_result_handler.py +++ /dev/null @@ -1,64 +0,0 @@ -import json -import os -import tempfile -from unittest.mock import MagicMock - -import pytest - -from prefect import config -from prefect.client import Client -from prefect.engine.cloud import CloudResultHandler -from prefect.utilities.configuration import set_temporary_config - - -def requests_post(*args, result=None, **kwargs): - return dict(uri=json.dumps(result)) - - -class TestCloudHandler: - def test_cloud_handler_initializes_with_no_args_and_reads_from_config(self): - with set_temporary_config({"cloud.result_handler": "http://foo:bar"}): - handler = CloudResultHandler() - assert handler._client is None - assert handler.result_handler_service == "http://foo:bar" - - def test_cloud_handler_init_args_override_config(self): - handler = CloudResultHandler("ftp://old-school") - assert handler.result_handler_service == "ftp://old-school" - - def test_cloud_handler_creates_client_after_first_method_call(self, monkeypatch): - client = MagicMock(post=requests_post) - monkeypatch.setattr( - "prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client) - ) - with set_temporary_config({"cloud.result_handler": "http://foo.bar:4204"}): - handler = CloudResultHandler() - handler.write("random string") - assert handler._client == client - - @pytest.mark.parametrize("data", [None, "my_string", 42]) - def test_cloud_handler_sends_jsonable_packages(self, data, monkeypatch): - client = MagicMock(post=requests_post) - monkeypatch.setattr( - "prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client) - ) - handler = CloudResultHandler() - assert isinstance(handler.write(data), str) - - def test_cloud_handler_can_interpret_contents_of_standard_uri(self, monkeypatch): - binary_data = "gASVDQAAAAAAAACMCW15IHNlY3JldJQu" - client = MagicMock(get=lambda *args, **kwargs: dict(result=binary_data)) - monkeypatch.setattr( - "prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client) - ) - handler = CloudResultHandler() - assert handler.read(uri="http://look-here") == "my secret" - - def test_cloud_handler_handles_empty_buckets(self, monkeypatch): - binary_data = "" - client = MagicMock(get=lambda *args, **kwargs: dict(result=binary_data)) - monkeypatch.setattr( - "prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client) - ) - handler = CloudResultHandler() - assert handler.read(uri="http://look-here") is None diff --git a/tests/engine/cloud/test_cloud_task_runner.py b/tests/engine/cloud/test_cloud_task_runner.py index ddab3ec00245..9141cac9291a 100644 --- a/tests/engine/cloud/test_cloud_task_runner.py +++ b/tests/engine/cloud/test_cloud_task_runner.py @@ -12,7 +12,7 @@ from prefect.client import Client from prefect.core import Edge, Task from prefect.engine.cache_validators import all_inputs -from prefect.engine.cloud import CloudResultHandler, CloudTaskRunner +from prefect.engine.cloud import CloudTaskRunner from prefect.engine.result import NoResult, Result, SafeResult from prefect.engine.result_handlers import ( JSONResultHandler, diff --git a/tests/engine/test_defaults.py b/tests/engine/test_defaults.py index b96b786617e0..a071edf5eecb 100644 --- a/tests/engine/test_defaults.py +++ b/tests/engine/test_defaults.py @@ -89,7 +89,7 @@ def test_default_task_runner_with_bad_config(): def test_default_result_handler(): - assert engine.get_default_result_handler_class() is engine.cloud.CloudResultHandler + assert engine.get_default_result_handler_class()() is None def test_default_result_handler_responds_to_config(): @@ -121,7 +121,4 @@ def test_default_result_handler_with_bad_config(): {"engine.result_handler.default_class": "prefect.engine. bad import path"} ): with pytest.warns(UserWarning): - assert ( - engine.get_default_result_handler_class() - is engine.cloud.CloudResultHandler - ) + assert engine.get_default_result_handler_class()() is None diff --git a/tests/environments/execution/test_cloud_environment.py b/tests/environments/execution/test_cloud_environment.py index 7c403e7b3346..246210c54a52 100644 --- a/tests/environments/execution/test_cloud_environment.py +++ b/tests/environments/execution/test_cloud_environment.py @@ -182,7 +182,6 @@ def test_populate_job_yaml(): { "cloud.graphql": "gql_test", "cloud.log": "log_test", - "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } ): @@ -205,12 +204,11 @@ def test_populate_job_yaml(): assert env[0]["value"] == "gql_test" assert env[1]["value"] == "log_test" - assert env[2]["value"] == "rh_test" - assert env[3]["value"] == "auth_test" - assert env[4]["value"] == "id_test" - assert env[5]["value"] == "namespace_test" - assert env[6]["value"] == "test1/test2:test3" - assert env[7]["value"] == "test4" + assert env[2]["value"] == "auth_test" + assert env[3]["value"] == "id_test" + assert env[4]["value"] == "namespace_test" + assert env[5]["value"] == "test1/test2:test3" + assert env[6]["value"] == "test4" assert ( yaml_obj["spec"]["template"]["spec"]["containers"][0]["image"] @@ -232,7 +230,6 @@ def test_populate_worker_pod_yaml(): { "cloud.graphql": "gql_test", "cloud.log": "log_test", - "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } ): @@ -246,9 +243,8 @@ def test_populate_worker_pod_yaml(): assert env[0]["value"] == "gql_test" assert env[1]["value"] == "log_test" - assert env[2]["value"] == "rh_test" - assert env[3]["value"] == "auth_test" - assert env[4]["value"] == "id_test" + assert env[2]["value"] == "auth_test" + assert env[3]["value"] == "id_test" assert yaml_obj["spec"]["containers"][0]["image"] == "my_image" @@ -267,7 +263,6 @@ def test_populate_worker_pod_yaml_with_private_registry(): { "cloud.graphql": "gql_test", "cloud.log": "log_test", - "cloud.result_handler": "rh_test", "cloud.auth_token": "auth_test", } ): diff --git a/tests/serialization/test_result_handlers.py b/tests/serialization/test_result_handlers.py index 07af83c28678..f9568ae78314 100644 --- a/tests/serialization/test_result_handlers.py +++ b/tests/serialization/test_result_handlers.py @@ -4,7 +4,6 @@ import prefect from prefect.client import Client -from prefect.engine.cloud.result_handler import CloudResultHandler from prefect.engine.result_handlers import ( GCSResultHandler, JSONResultHandler, @@ -122,50 +121,6 @@ def test_deserialize_local_result_handler(self, dir): assert obj.dir == dir -class TestCloudResultHandler: - def test_serialize_with_no_attributes(self): - with set_temporary_config({"cloud.result_handler": "website"}): - serialized = ResultHandlerSchema().dump(CloudResultHandler()) - assert isinstance(serialized, dict) - assert serialized["type"] == "CloudResultHandler" - assert serialized["result_handler_service"] == "website" - assert "client" not in serialized - - def test_serialize_with_attributes(self): - handler = CloudResultHandler(result_handler_service="http://foo.bar") - handler.client = Client() - serialized = ResultHandlerSchema().dump(handler) - assert isinstance(serialized, dict) - assert serialized["type"] == "CloudResultHandler" - assert serialized["result_handler_service"] == "http://foo.bar" - assert "client" not in serialized - - def test_deserialize_cloud_result_handler(self): - schema = ResultHandlerSchema() - handler = CloudResultHandler(result_handler_service="http://foo.bar") - handler._client = Client() - obj = schema.load(schema.dump(handler)) - assert isinstance(obj, CloudResultHandler) - assert hasattr(obj, "logger") - assert obj.logger.name == "prefect.CloudResultHandler" - assert obj.result_handler_service == "http://foo.bar" - assert obj._client is None - - def test_deserialize_cloud_result_handler_with_None_populates_from_config(self): - schema = ResultHandlerSchema() - handler = CloudResultHandler() - handler.result_handler_service = None - handler._client = Client() - serialized = schema.dump(handler) - with set_temporary_config({"cloud.result_handler": "new-service"}): - obj = schema.load(serialized) - assert isinstance(obj, CloudResultHandler) - assert hasattr(obj, "logger") - assert obj.logger.name == "prefect.CloudResultHandler" - assert obj.result_handler_service == "new-service" - assert obj._client is None - - @pytest.mark.xfail(raises=ImportError, reason="google extras not installed.") class TestGCSResultHandler: def test_serialize(self):