Skip to content

Commit

Permalink
Merge pull request #630 from PrefectHQ/integration-finesse
Browse files Browse the repository at this point in the history
Finesse a few things for smoother integration with Cloud
  • Loading branch information
cicdw authored Feb 8, 2019
2 parents 163b78e + f3f58e6 commit 47146f4
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Removed the `BokehRunner` and associated webapp - [#609](https://github.com/PrefectHQ/prefect/issues/609)
- Renamed `ResultHandler` methods from `serialize` / `deserialize` to `write` / `read` - [#612](https://github.com/PrefectHQ/prefect/pull/612)
- Refactor all `State` objects to store fully hydrated `Result` objects which track information about how results should be handled - [#612](https://github.com/PrefectHQ/prefect/pull/612), [#616](https://github.com/PrefectHQ/prefect/pull/616)
- `Client.create_flow_run` now returns a string instead of a `GraphQLResult` object to match the API of `deploy` - [#630](https://github.com/PrefectHQ/prefect/pull/630)

## 0.4.1 <Badge text="beta" type="success"/>

Expand Down
8 changes: 4 additions & 4 deletions src/prefect/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _request(
def request_fn() -> "requests.models.Response":
headers = {"Authorization": "Bearer {}".format(self.token)}
if method == "GET":
response = requests.get(url, headers=headers, json=params)
response = requests.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=params)
elif method == "DELETE":
Expand Down Expand Up @@ -353,7 +353,7 @@ def create_flow_run(
flow_id: str,
parameters: dict = None,
scheduled_start_time: datetime.datetime = None,
) -> GraphQLResult:
) -> str:
"""
Create a new flow run for the given flow id. If `start_time` is not provided, the flow run will be scheduled to start immediately.
Expand All @@ -363,7 +363,7 @@ def create_flow_run(
- scheduled_start_time (datetime, optional): the time to schedule the execution for; if not provided, defaults to now
Returns:
- GraphQLResult: a `DotDict` with an `"id"` key representing the id of the newly created flow run
- str: the ID of the newly-created flow run
Raises:
- ClientError: if the GraphQL query is bad for any reason
Expand All @@ -381,7 +381,7 @@ def create_flow_run(
scheduledStartTime=scheduled_start_time.isoformat()
) # type: ignore
res = self.graphql(create_mutation, input=inputs)
return res.createFlowRun.flow_run # type: ignore
return res.createFlowRun.flow_run.id # type: ignore

def get_flow_run_info(self, flow_run_id: str) -> FlowRunInfoResult:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ debug = false
# the Prefect Server address
graphql = "http://localhost:4200"
log = "http://localhost:4202/log"
result_handler = "http://localhost:4204"
result_handler = "http://localhost:4204/result-handler"
use_local_secrets = true
heartbeat_interval = 30.0

Expand Down
7 changes: 4 additions & 3 deletions src/prefect/engine/cloud/result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class CloudResultHandler(ResultHandler):

def __init__(self, result_handler_service: str = None) -> None:
self._client = None
self.result_handler_service = result_handler_service
if result_handler_service is None:
self.result_handler_service = config.cloud.result_handler
else:
self.result_handler_service = result_handler_service
super().__init__()

def _initialize_client(self) -> None:
Expand All @@ -40,8 +43,6 @@ def _initialize_client(self) -> None:
"""
if self._client is None:
self._client = Client() # type: ignore
if self.result_handler_service is None:
self.result_handler_service = config.cloud.result_handler

def read(self, uri: str) -> Any:
"""
Expand Down
18 changes: 11 additions & 7 deletions tests/engine/cloud/test_cloud_result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest

from prefect import config
from prefect.client import Client
from prefect.engine.cloud import CloudResultHandler
from prefect.utilities.configuration import set_temporary_config
Expand All @@ -15,22 +16,25 @@ def requests_post(*args, result=None, **kwargs):


class TestCloudHandler:
def test_cloud_handler_initializes_with_no_args(self):
handler = CloudResultHandler()
def test_cloud_handler_initializes_with_no_args_and_reads_from_config(self):
with set_temporary_config({"cloud.result_handler": "http://foo:bar"}):
handler = CloudResultHandler()
assert handler._client is None
assert handler.result_handler_service is None
assert handler.result_handler_service == "http://foo:bar"

def test_cloud_handler_init_args_override_config(self):
handler = CloudResultHandler("ftp://old-school")
assert handler.result_handler_service == "ftp://old-school"

def test_cloud_handler_pulls_settings_from_config_after_first_method_call(
self, monkeypatch
):
def test_cloud_handler_creates_client_after_first_method_call(self, monkeypatch):
client = MagicMock(post=requests_post)
monkeypatch.setattr(
"prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client)
)
with set_temporary_config({"cloud.result_handler": "http://foo.bar:4204"}):
handler = CloudResultHandler()
handler.write("random string")
assert handler.result_handler_service == "http://foo.bar:4204"
assert handler._client == client

@pytest.mark.parametrize("data", [None, "my_string", 42])
def test_cloud_handler_sends_jsonable_packages(self, data, monkeypatch):
Expand Down
27 changes: 21 additions & 6 deletions tests/serialization/test_result_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from prefect.engine.cloud.result_handler import CloudResultHandler
from prefect.engine.result_handlers import ResultHandler, LocalResultHandler
from prefect.serialization.result_handlers import ResultHandlerSchema
from prefect.utilities.configuration import set_temporary_config


class TestLocalResultHandler:
Expand Down Expand Up @@ -31,10 +32,11 @@ def test_deserialize_local_result_handler(self, dir):

class TestCloudResultHandler:
def test_serialize_with_no_attributes(self):
serialized = ResultHandlerSchema().dump(CloudResultHandler())
with set_temporary_config({"cloud.result_handler": "website"}):
serialized = ResultHandlerSchema().dump(CloudResultHandler())
assert isinstance(serialized, dict)
assert serialized["type"] == "CloudResultHandler"
assert serialized["result_handler_service"] is None
assert serialized["result_handler_service"] == "website"
assert "client" not in serialized

def test_serialize_with_attributes(self):
Expand All @@ -46,14 +48,27 @@ def test_serialize_with_attributes(self):
assert serialized["result_handler_service"] == "http://foo.bar"
assert "client" not in serialized

@pytest.mark.parametrize("result_handler_service", [None, "http://foo.bar"])
def test_deserialize_cloud_result_handler(self, result_handler_service):
def test_deserialize_cloud_result_handler(self):
schema = ResultHandlerSchema()
handler = CloudResultHandler(result_handler_service=result_handler_service)
handler = CloudResultHandler(result_handler_service="http://foo.bar")
handler._client = Client()
obj = schema.load(schema.dump(handler))
assert isinstance(obj, CloudResultHandler)
assert hasattr(obj, "logger")
assert obj.logger.name == "prefect.CloudResultHandler"
assert obj.result_handler_service == result_handler_service
assert obj.result_handler_service == "http://foo.bar"
assert obj._client is None

def test_deserialize_cloud_result_handler_with_None_populates_from_config(self):
schema = ResultHandlerSchema()
handler = CloudResultHandler()
handler.result_handler_service = None
handler._client = Client()
serialized = schema.dump(handler)
with set_temporary_config({"cloud.result_handler": "new-service"}):
obj = schema.load(serialized)
assert isinstance(obj, CloudResultHandler)
assert hasattr(obj, "logger")
assert obj.logger.name == "prefect.CloudResultHandler"
assert obj.result_handler_service == "new-service"
assert obj._client is None

0 comments on commit 47146f4

Please sign in to comment.