Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finesse a few things for smoother integration with Cloud #630

Merged
merged 3 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Removed the `BokehRunner` and associated webapp - [#609](https://github.com/PrefectHQ/prefect/issues/609)
- Renamed `ResultHandler` methods from `serialize` / `deserialize` to `write` / `read` - [#612](https://github.com/PrefectHQ/prefect/pull/612)
- Refactor all `State` objects to store fully hydrated `Result` objects which track information about how results should be handled - [#612](https://github.com/PrefectHQ/prefect/pull/612), [#616](https://github.com/PrefectHQ/prefect/pull/616)
- `Client.create_flow_run` now returns a string instead of a `GraphQLResult` object to match the API of `deploy` - [#630](https://github.com/PrefectHQ/prefect/pull/630)

## 0.4.1 <Badge text="beta" type="success"/>

Expand Down
8 changes: 4 additions & 4 deletions src/prefect/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _request(
def request_fn() -> "requests.models.Response":
headers = {"Authorization": "Bearer {}".format(self.token)}
if method == "GET":
response = requests.get(url, headers=headers, json=params)
response = requests.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=params)
elif method == "DELETE":
Expand Down Expand Up @@ -353,7 +353,7 @@ def create_flow_run(
flow_id: str,
parameters: dict = None,
scheduled_start_time: datetime.datetime = None,
) -> GraphQLResult:
) -> str:
"""
Create a new flow run for the given flow id. If `start_time` is not provided, the flow run will be scheduled to start immediately.

Expand All @@ -363,7 +363,7 @@ def create_flow_run(
- scheduled_start_time (datetime, optional): the time to schedule the execution for; if not provided, defaults to now

Returns:
- GraphQLResult: a `DotDict` with an `"id"` key representing the id of the newly created flow run
- str: the ID of the newly-created flow run

Raises:
- ClientError: if the GraphQL query is bad for any reason
Expand All @@ -381,7 +381,7 @@ def create_flow_run(
scheduledStartTime=scheduled_start_time.isoformat()
) # type: ignore
res = self.graphql(create_mutation, input=inputs)
return res.createFlowRun.flow_run # type: ignore
return res.createFlowRun.flow_run.id # type: ignore

def get_flow_run_info(self, flow_run_id: str) -> FlowRunInfoResult:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ debug = false
# the Prefect Server address
graphql = "http://localhost:4200"
log = "http://localhost:4202/log"
result_handler = "http://localhost:4204"
result_handler = "http://localhost:4204/result-handler"
use_local_secrets = true
heartbeat_interval = 30.0

Expand Down
7 changes: 4 additions & 3 deletions src/prefect/engine/cloud/result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class CloudResultHandler(ResultHandler):

def __init__(self, result_handler_service: str = None) -> None:
self._client = None
self.result_handler_service = result_handler_service
if result_handler_service is None:
self.result_handler_service = config.cloud.result_handler
else:
self.result_handler_service = result_handler_service
super().__init__()

def _initialize_client(self) -> None:
Expand All @@ -40,8 +43,6 @@ def _initialize_client(self) -> None:
"""
if self._client is None:
self._client = Client() # type: ignore
if self.result_handler_service is None:
self.result_handler_service = config.cloud.result_handler

def read(self, uri: str) -> Any:
"""
Expand Down
18 changes: 11 additions & 7 deletions tests/engine/cloud/test_cloud_result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest

from prefect import config
from prefect.client import Client
from prefect.engine.cloud import CloudResultHandler
from prefect.utilities.configuration import set_temporary_config
Expand All @@ -15,22 +16,25 @@ def requests_post(*args, result=None, **kwargs):


class TestCloudHandler:
def test_cloud_handler_initializes_with_no_args(self):
handler = CloudResultHandler()
def test_cloud_handler_initializes_with_no_args_and_reads_from_config(self):
with set_temporary_config({"cloud.result_handler": "http://foo:bar"}):
handler = CloudResultHandler()
assert handler._client is None
assert handler.result_handler_service is None
assert handler.result_handler_service == "http://foo:bar"

def test_cloud_handler_init_args_override_config(self):
handler = CloudResultHandler("ftp://old-school")
assert handler.result_handler_service == "ftp://old-school"

def test_cloud_handler_pulls_settings_from_config_after_first_method_call(
self, monkeypatch
):
def test_cloud_handler_creates_client_after_first_method_call(self, monkeypatch):
client = MagicMock(post=requests_post)
monkeypatch.setattr(
"prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client)
)
with set_temporary_config({"cloud.result_handler": "http://foo.bar:4204"}):
handler = CloudResultHandler()
handler.write("random string")
assert handler.result_handler_service == "http://foo.bar:4204"
assert handler._client == client

@pytest.mark.parametrize("data", [None, "my_string", 42])
def test_cloud_handler_sends_jsonable_packages(self, data, monkeypatch):
Expand Down
27 changes: 21 additions & 6 deletions tests/serialization/test_result_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from prefect.engine.cloud.result_handler import CloudResultHandler
from prefect.engine.result_handlers import ResultHandler, LocalResultHandler
from prefect.serialization.result_handlers import ResultHandlerSchema
from prefect.utilities.configuration import set_temporary_config


class TestLocalResultHandler:
Expand Down Expand Up @@ -31,10 +32,11 @@ def test_deserialize_local_result_handler(self, dir):

class TestCloudResultHandler:
def test_serialize_with_no_attributes(self):
serialized = ResultHandlerSchema().dump(CloudResultHandler())
with set_temporary_config({"cloud.result_handler": "website"}):
serialized = ResultHandlerSchema().dump(CloudResultHandler())
assert isinstance(serialized, dict)
assert serialized["type"] == "CloudResultHandler"
assert serialized["result_handler_service"] is None
assert serialized["result_handler_service"] == "website"
assert "client" not in serialized

def test_serialize_with_attributes(self):
Expand All @@ -46,14 +48,27 @@ def test_serialize_with_attributes(self):
assert serialized["result_handler_service"] == "http://foo.bar"
assert "client" not in serialized

@pytest.mark.parametrize("result_handler_service", [None, "http://foo.bar"])
def test_deserialize_cloud_result_handler(self, result_handler_service):
def test_deserialize_cloud_result_handler(self):
schema = ResultHandlerSchema()
handler = CloudResultHandler(result_handler_service=result_handler_service)
handler = CloudResultHandler(result_handler_service="http://foo.bar")
handler._client = Client()
obj = schema.load(schema.dump(handler))
assert isinstance(obj, CloudResultHandler)
assert hasattr(obj, "logger")
assert obj.logger.name == "prefect.CloudResultHandler"
assert obj.result_handler_service == result_handler_service
assert obj.result_handler_service == "http://foo.bar"
assert obj._client is None

def test_deserialize_cloud_result_handler_with_None_populates_from_config(self):
schema = ResultHandlerSchema()
handler = CloudResultHandler()
handler.result_handler_service = None
handler._client = Client()
serialized = schema.dump(handler)
with set_temporary_config({"cloud.result_handler": "new-service"}):
obj = schema.load(serialized)
assert isinstance(obj, CloudResultHandler)
assert hasattr(obj, "logger")
assert obj.logger.name == "prefect.CloudResultHandler"
assert obj.result_handler_service == "new-service"
assert obj._client is None