Skip to content

Commit

Permalink
Merge pull request #1187 from PrefectHQ/robust-hits
Browse files Browse the repository at this point in the history
Robust Client calls
  • Loading branch information
cicdw authored Jul 1, 2019
2 parents 7e6ba5b + 46bff54 commit 9f86a58
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Validate configuration objects on initial load - [#1136](https://github.com/PrefectHQ/prefect/pull/1136)
- Add `auto_generated` property to Tasks for convenient filtering - [#1135](https://github.com/PrefectHQ/prefect/pull/1135)
- Disable dask work-stealing in kubernetes via scheduler config - [#1166](https://github.com/PrefectHQ/prefect/pull/1166)
- Implement backoff retry settings on Client calls - [#1187](https://github.com/PrefectHQ/prefect/pull/1187)

### Task Library

Expand Down
17 changes: 14 additions & 3 deletions src/prefect/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import json
import logging
import os

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union

import pendulum
Expand Down Expand Up @@ -209,12 +212,20 @@ def _request(
# write this as a function to allow reuse in next try/except block
def request_fn() -> "requests.models.Response":
headers = {"Authorization": "Bearer {}".format(self.token)}
session = requests.Session()
retries = Retry(
total=6,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
method_whitelist=["DELETE", "GET", "POST"],
)
session.mount("https://", HTTPAdapter(max_retries=retries))
if method == "GET":
response = requests.get(url, headers=headers, params=params)
response = session.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=params)
response = session.post(url, headers=headers, json=params)
elif method == "DELETE":
response = requests.delete(url, headers=headers)
response = session.delete(url, headers=headers)
else:
raise ValueError("Invalid method: {}".format(method))

Expand Down
10 changes: 7 additions & 3 deletions tests/cli/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def test_auth_add(monkeypatch):
json=MagicMock(return_value=dict(data=dict(hello="hi")))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand All @@ -60,7 +62,7 @@ def test_auth_add(monkeypatch):
assert "Auth token added to Prefect config" in result.output


def test_auth_add_failes_query(monkeypatch):
def test_auth_add_failed_query(monkeypatch):
with tempfile.TemporaryDirectory() as temp_dir:

file = "{}/temp_config.toml".format(temp_dir)
Expand All @@ -73,7 +75,9 @@ def test_auth_add_failes_query(monkeypatch):
json=MagicMock(return_value=dict(data=dict(hello=None)))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down
36 changes: 27 additions & 9 deletions tests/cli/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def test_describe_flows(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow=[{"name": "flow"}])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -76,7 +78,9 @@ def test_describe_flows_not_found(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(flow=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand All @@ -96,7 +100,9 @@ def test_describe_flows_populated(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow=[{"name": "flow"}])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -141,7 +147,9 @@ def test_describe_tasks(monkeypatch):
)
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -176,7 +184,9 @@ def test_describe_tasks_flow_not_found(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(flow=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand All @@ -193,7 +203,9 @@ def test_describe_tasks_not_found(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow=[{"tasks": []}])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand All @@ -215,7 +227,9 @@ def test_describe_flow_runs(monkeypatch):
)
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -255,7 +269,9 @@ def test_describe_flow_runs_not_found(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow_run=[])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand All @@ -277,7 +293,9 @@ def test_describe_flow_runs_populated(monkeypatch):
)
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down
4 changes: 3 additions & 1 deletion tests/cli/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def test_execute_cloud_flow_not_found(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow_run=[])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down
44 changes: 33 additions & 11 deletions tests/cli/test_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def test_get_flows(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(flow=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -75,7 +77,9 @@ def test_get_flows_populated(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(flow=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -122,7 +126,9 @@ def test_get_projects(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(project=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -163,7 +169,9 @@ def test_get_projects_populated(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(project=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -200,7 +208,9 @@ def test_get_flow_runs(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow_run=[])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -245,7 +255,9 @@ def test_get_flow_runs_populated(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow_run=[])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -292,7 +304,9 @@ def test_get_tasks(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(task=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -335,7 +349,9 @@ def test_get_tasks_populated(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(task=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -403,7 +419,9 @@ def test_get_logs(monkeypatch):
)
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -446,7 +464,9 @@ def test_get_logs_info(monkeypatch):
)
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down Expand Up @@ -478,7 +498,9 @@ def test_get_logs_fails(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow_run=[])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down
8 changes: 6 additions & 2 deletions tests/cli/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def test_run_cloud(monkeypatch):
json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}])))
)
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

create_flow_run = MagicMock(resurn_value="id")
monkeypatch.setattr(
Expand Down Expand Up @@ -67,7 +69,9 @@ def test_run_cloud_fails(monkeypatch):
post = MagicMock(
return_value=MagicMock(json=MagicMock(return_value=dict(data=dict(flow=[]))))
)
monkeypatch.setattr("requests.post", post)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
Expand Down
Loading

0 comments on commit 9f86a58

Please sign in to comment.