Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions qiita_plugins/qiita_client/qiita_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from .qiita_client import QiitaClient, format_payload
from .exceptions import (QiitaClientError, NotFoundError, BadRequestError,
ForbiddenError)
from .qiita_client import QiitaClient

__all__ = ["QiitaClient", "format_payload"]
__all__ = ["QiitaClient", "QiitaClientError", "NotFoundError",
"BadRequestError", "ForbiddenError"]
23 changes: 23 additions & 0 deletions qiita_plugins/qiita_client/qiita_client/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------


class QiitaClientError(Exception):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're used in the Qiita client and I think there are 3 different errors that the developer using the QiitaClient can decide what to do (either propagate the error or try something else). For example, this allows me to code a different behavior in the _heartbeat function below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K, thanks.

pass


class NotFoundError(QiitaClientError):
pass


class BadRequestError(QiitaClientError):
pass


class ForbiddenError(QiitaClientError):
pass
92 changes: 71 additions & 21 deletions qiita_plugins/qiita_client/qiita_client/qiita_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import threading
from json import dumps

from .exceptions import (QiitaClientError, NotFoundError, BadRequestError,
ForbiddenError)

JOB_COMPLETED = False


def heartbeat(qclient, url):
def _heartbeat(qclient, url):
"""Send the heartbeat calls to the server

Parameters
Expand All @@ -24,16 +27,30 @@ def heartbeat(qclient, url):
url : str
The url to issue the heartbeat
"""
while not JOB_COMPLETED:
json_reply = qclient.post(url, data='')
if not json_reply or not json_reply['success']:
# The server did not accept our heartbeat - stop doing it
break
# Perform the heartbeat every 5 seconds
time.sleep(5)
retries = 2
while not JOB_COMPLETED and retries > 0:
try:
qclient.post(url, data='')
retries = 2
except requests.ConnectionError:
# This error occurs when the Qiita server is not reachable. This
# may occur when we are updating the server, and we don't want
# the job to fail. In this case, we wait for 5 min and try again
time.sleep(300)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth mentioning this in the docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

retries -= 1
except QiitaClientError:
# If we raised the error, we propagate it since it is a problem
# with the request that we are executing
raise
except Exception as e:
# If it is any other exception, raise a RuntimeError
raise RuntimeError("Error executing heartbeat: %s" % str(e))

# Perform the heartbeat every 30 seconds
time.sleep(30)

def format_payload(success, error_msg=None, artifacts_info=None):

def _format_payload(success, error_msg=None, artifacts_info=None):
"""Generates the payload dictionary for the job

Parameters
Expand Down Expand Up @@ -181,8 +198,19 @@ def _request_retry(self, req, url, **kwargs):

Returns
-------
dict
The JSON information in the request response
dict or None
The JSON information in the request response, if any

Raises
------
NotFoundError
If the request returned a 404 error
BadRequestError
If the request returned a 400 error
ForbiddenError
If the request returned a 403 error
RuntimeError
If the request did not succeed due to unknown causes

Notes
-----
Expand All @@ -202,15 +230,27 @@ def _request_retry(self, req, url, **kwargs):
"""
url = self._server_url + url
retries = 2
json_reply = None
while retries > 0:
retries -= 1
r = self._request_oauth2(req, url, verify=self._verify, **kwargs)
r.close()
if r.status_code == 200:
json_reply = r.json()
break
return json_reply
# There are some error codes that the specification says that they
# shouldn't be retried
if r.status_code == 404:
raise NotFoundError(r.text)
elif r.status_code == 403:
raise ForbiddenError(r.text)
elif r.status_code == 400:
raise BadRequestError(r.text)
elif r.status_code == 200:
try:
return r.json()
except ValueError:
return None

raise RuntimeError(
"Request '%s %s' did not succeed. Status code: %d. Message: %s"
% (req.__name__, url, r.status_code, r.text))

def get(self, url, **kwargs):
"""Execute a get request against the Qiita server
Expand Down Expand Up @@ -311,7 +351,8 @@ def start_heartbeat(self, job_id):
The job id
"""
url = "/qiita_db/jobs/%s/heartbeat/" % job_id
heartbeat_thread = threading.Thread(target=heartbeat, args=(self, url))
heartbeat_thread = threading.Thread(target=_heartbeat,
args=(self, url))
heartbeat_thread.daemon = True
heartbeat_thread.start()

Expand Down Expand Up @@ -343,15 +384,23 @@ def update_job_step(self, job_id, new_step):
json_payload = dumps({'step': new_step})
self.post("/qiita_db/jobs/%s/step/" % job_id, data=json_payload)

def complete_job(self, job_id, payload):
def complete_job(self, job_id, success, error_msg=None,
artifacts_info=None):
"""Stops the heartbeat thread and send the job results to the server

Parameters
----------
job_id : str
The job id
payload : dict
The job's results
success : bool
Whether if the job completed successfully or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the if from this sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

error_msg : str, optional
If `success` is False, ther error message to include.
If `success` is True, it is ignored
artifacts_info : list of (str, str, list of (str, str))
For each artifact that needs to be created, the command output
name, the artifact type and the list of files attached to the
artifact.

See Also
--------
Expand All @@ -360,6 +409,7 @@ def complete_job(self, job_id, payload):
# Stop the heartbeat thread
global JOB_COMPLETED
JOB_COMPLETED = True
json_payload = dumps(_format_payload(success, error_msg=error_msg,
artifacts_info=artifacts_info))
# Create the URL where we have to post the results
json_payload = dumps(payload)
self.post("/qiita_db/jobs/%s/complete/" % job_id, data=json_payload)
40 changes: 18 additions & 22 deletions qiita_plugins/qiita_client/qiita_client/tests/test_qiita_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@

import httpretty

from qiita_client.qiita_client import QiitaClient, format_payload
from qiita_client.qiita_client import QiitaClient, _format_payload


class UtilTests(TestCase):
def test_format_payload(self):
ainfo = [
("demultiplexed", "Demultiplexed",
[("fp1", "preprocessed_fasta"), ("fp2", "preprocessed_fastq")])]
obs = format_payload(True, artifacts_info=ainfo, error_msg="Ignored")
obs = _format_payload(True, artifacts_info=ainfo, error_msg="Ignored")
exp = {'success': True, 'error': '',
'artifacts':
{'demultiplexed':
Expand All @@ -31,8 +31,8 @@ def test_format_payload(self):
self.assertEqual(obs, exp)

def test_format_payload_error(self):
obs = format_payload(False, error_msg="Some error",
artifacts_info=['ignored'])
obs = _format_payload(False, error_msg="Some error",
artifacts_info=['ignored'])
exp = {'success': False, 'error': 'Some error', 'artifacts': None}
self.assertEqual(obs, exp)

Expand Down Expand Up @@ -107,8 +107,8 @@ def test_get_error(self):
httpretty.GET,
"https://test_server.com/qiita_db/artifacts/1/type/",
status=500)
obs = self.tester.get("/qiita_db/artifacts/1/type/")
self.assertIsNone(obs)
with self.assertRaises(RuntimeError):
self.tester.get("/qiita_db/artifacts/1/type/")

@httpretty.activate
def test_post(self):
Expand All @@ -126,8 +126,8 @@ def test_post_error(self):
httpretty.POST,
"https://test_server.com/qiita_db/artifacts/1/type/",
status=500)
obs = self.tester.post("/qiita_db/artifacts/1/type/")
self.assertIsNone(obs)
with self.assertRaises(RuntimeError):
self.tester.post("/qiita_db/artifacts/1/type/")

@httpretty.activate
def test_patch(self):
Expand All @@ -149,10 +149,10 @@ def test_patch_error(self):
"https://test_server.com/qiita_db/artifacts/1/filepaths/",
status=500
)
obs = self.tester.patch(
'/qiita_db/artifacts/1/filepaths/', 'test',
'/html_summary/', value='/path/to/html_summary')
self.assertIsNone(obs)
with self.assertRaises(RuntimeError):
self.tester.patch(
'/qiita_db/artifacts/1/filepaths/', 'test',
'/html_summary/', value='/path/to/html_summary')

def test_patch_value_error(self):
# Add, replace or test
Expand Down Expand Up @@ -203,17 +203,13 @@ def test_complete_job(self):
httpretty.register_uri(
httpretty.POST,
"https://test_server.com/qiita_db/jobs/example-job/complete/",
body='{"success": true, "error": ""}'
)
body="")
job_id = "example-job"
payload = {
'success': True, 'error': '',
'artifacts': [
{'artifact_type': "Demultiplexed",
'filepaths': [("fp1", "preprocessed_fasta"),
("fp2", "preprocessed_fastq")]}]}

self.tester.complete_job(job_id, payload)
ainfo = [
("demultiplexed", "Demultiplexed",
[("fp1", "preprocessed_fasta"), ("fp2", "preprocessed_fastq")])]

self.tester.complete_job(job_id, True, artifacts_info=ainfo)


CERT_FP = """-----BEGIN CERTIFICATE-----
Expand Down