-
Notifications
You must be signed in to change notification settings - Fork 79
Workflow object #1641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow object #1641
Changes from 11 commits
261e4f2
b516a49
6efbfdb
2f1d29e
ac860d2
77ac8eb
320f681
1e0e381
08bd0b1
9022c1d
e863c01
b647e2a
a9fa2ec
f582051
de4bceb
0d116b0
d732e24
f24dd63
3ee5a69
7aa1dda
4e7d884
08e4e2c
e911e61
6804cbf
719dd8e
3af70a4
d6a9db0
b59bf52
c58d3ab
12b3d7a
6ac0bbd
d78be98
b3134f8
99f2b71
2a5ff50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,6 @@ | |
| # The full license is in the file LICENSE, distributed with this software. | ||
| # ----------------------------------------------------------------------------- | ||
|
|
||
| from datetime import datetime | ||
| from json import loads | ||
|
|
||
| import qiita_db as qdb | ||
|
|
@@ -104,15 +103,11 @@ def post(self, job_id): | |
| with qdb.sql_connection.TRN: | ||
| job, success, error_msg = _get_job(job_id) | ||
| if success: | ||
| job_status = job.status | ||
| if job_status == 'queued': | ||
| job.status = 'running' | ||
| job.heartbeat = datetime.now() | ||
| elif job_status == 'running': | ||
| job.heartbeat = datetime.now() | ||
| else: | ||
| try: | ||
| job.execute_heartbeat() | ||
| except qdb.exceptions.QiitaDBOperationNotPermittedError as e: | ||
| success = False | ||
| error_msg = 'Job already finished. Status: %s' % job_status | ||
| error_msg = str(e) | ||
|
|
||
| response = {'success': success, 'error': error_msg} | ||
| self.write(response) | ||
|
|
@@ -175,26 +170,19 @@ def post(self, job_id): | |
| with qdb.sql_connection.TRN: | ||
| job, success, error_msg = _get_job(job_id) | ||
| if success: | ||
| if job.status != 'running': | ||
| success = False | ||
| error_msg = 'Job in a non-running state.' | ||
| else: | ||
| payload = loads(self.request.body) | ||
| if payload['success']: | ||
| for artifact_data in payload['artifacts']: | ||
| filepaths = artifact_data['filepaths'] | ||
| atype = artifact_data['artifact_type'] | ||
| parents = job.input_artifacts | ||
| params = job.parameters | ||
| qdb.artifact.Artifact.create( | ||
| filepaths, atype, parents=parents, | ||
| processing_parameters=params) | ||
| job.status = 'success' | ||
| payload = loads(self.request.body) | ||
| try: | ||
| payload_success = payload['success'] | ||
| if payload_success: | ||
| artifacts = payload['artifacts'] | ||
| error = None | ||
|
||
| else: | ||
| log = qdb.logger.LogEntry.create( | ||
| 'Runtime', payload['error']) | ||
| job.status = 'error' | ||
| job.log = log | ||
| artifacts = None | ||
| error = payload['error'] | ||
| job.complete(payload_success, artifacts, error) | ||
| except qdb.exceptions.QiitaDBOperationNotPermittedError as e: | ||
| success = False | ||
| error_msg = str(e) | ||
|
|
||
| response = {'success': success, 'error': error_msg} | ||
| self.write(response) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,9 @@ | |
| # ----------------------------------------------------------------------------- | ||
|
|
||
| from uuid import UUID | ||
| from datetime import datetime | ||
|
|
||
| from future.utils import viewitems | ||
|
|
||
| import qiita_db as qdb | ||
|
|
||
|
|
@@ -186,8 +189,7 @@ def status(self): | |
| qdb.sql_connection.TRN.add(sql, [self.id]) | ||
| return qdb.sql_connection.TRN.execute_fetchlast() | ||
|
|
||
| @status.setter | ||
| def status(self, value): | ||
| def _set_status(self, value): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that I changed the this from being a property setter to be an internal function - so the tests for the property setter still applies (I renamed them) |
||
| """Sets the status of the job | ||
|
|
||
| Parameters | ||
|
|
@@ -219,6 +221,60 @@ def status(self, value): | |
| qdb.sql_connection.TRN.add(sql, [new_status, self.id]) | ||
| qdb.sql_connection.TRN.execute() | ||
|
|
||
| def complete(self, success, artifacts_data=None, error=None): | ||
| """Completes the job, either with a success or error status | ||
|
|
||
| Parameters | ||
| ---------- | ||
| success : bool | ||
| Whether the job has completed successfully or not | ||
| artifacts_data : dict of dicts | ||
|
||
| The generated artifact information, keyed by output name. | ||
| The format of each of the internal dictionaries must be | ||
| {'filepaths': list of (str, str), 'artifact_type': str} | ||
| where `filepaths` contains the list of filepaths and filepath types | ||
| for the artifact and `artifact_type` the type of the artifact | ||
| error : str, optional | ||
| If the job was not successful, the error message | ||
|
|
||
| Raises | ||
| ------ | ||
| qiita_db.exceptions.QiitaDBOperationNotPermittedError | ||
| If the job is not in running state | ||
| """ | ||
| with qdb.sql_connection.TRN: | ||
| if self.status != 'running': | ||
|
||
| raise qdb.exceptions.QiitaDBOperationNotPermittedError( | ||
| "Can't complete job: not in a running state") | ||
| if success: | ||
| if artifacts_data: | ||
| artifact_ids = [] | ||
| for out_name, a_data in viewitems(artifacts_data): | ||
| filepaths = a_data['filepaths'] | ||
| atype = a_data['artifact_type'] | ||
| parents = self.input_artifacts | ||
| params = self.parameters | ||
| a = qdb.artifact.Artifact.create( | ||
| filepaths, atype, parents=parents, | ||
| processing_parameters=params) | ||
| cmd_out_id = qdb.util.convert_to_id( | ||
| out_name, "command_output", "name") | ||
| artifact_ids.append((cmd_out_id, a.id)) | ||
| if artifact_ids: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which cases will artifact_ids be empty and what should happen? I don't think there is test for the case where artifact_ids doesn't exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently there is no command that doesn't generate an artifact. I'm preparing the system for such cases. One example may be EBI submission - they don't generate new artifacts. Since EBI submission is not a plugin command, I can't test. |
||
| sql = """INSERT INTO | ||
| qiita.artifact_output_processing_job | ||
| (artifact_id, processing_job_id, | ||
| command_output_id) | ||
| VALUES (%s, %s, %s)""" | ||
| sql_params = [[aid, self.id, out_id] | ||
| for out_id, aid in artifact_ids] | ||
| qdb.sql_connection.TRN.add(sql, sql_params, many=True) | ||
| self._set_status('success') | ||
| else: | ||
| log = qdb.logger.LogEntry.create('Runtime', error) | ||
| self._set_status('error') | ||
| self._set_log(log) | ||
|
|
||
| @property | ||
| def log(self): | ||
| """The log entry attached to the job if it failed | ||
|
|
@@ -240,8 +296,7 @@ def log(self): | |
| res = qdb.logger.LogEntry(log_id) | ||
| return res | ||
|
|
||
| @log.setter | ||
| def log(self, value): | ||
| def _set_log(self, value): | ||
| """Attaches a log entry to the job | ||
|
|
||
| Parameters | ||
|
|
@@ -281,20 +336,28 @@ def heartbeat(self): | |
| qdb.sql_connection.TRN.add(sql, [self.id]) | ||
| return qdb.sql_connection.TRN.execute_fetchlast() | ||
|
|
||
| @heartbeat.setter | ||
| def heartbeat(self, value): | ||
| """Sets the timestamp for the last received heartbeat | ||
| def execute_heartbeat(self): | ||
|
||
| """Updates the heartbeat of the job | ||
|
|
||
| Parameters | ||
| ---------- | ||
| value : datetime | ||
| The new timestamp | ||
| In case that the job is in `queued` status, it changes the status to | ||
| `running`. | ||
|
|
||
| Raises | ||
| ------ | ||
| QiitaDBOperationNotPermittedError | ||
| If the job is already completed | ||
| """ | ||
| with qdb.sql_connection.TRN: | ||
| status = self.status | ||
| if status == 'queued': | ||
| self._set_status('running') | ||
| elif status != 'running': | ||
| raise qdb.exceptions.QiitaDBOperationNotPermittedError( | ||
| "Can't execute heartbeat on job: already completed") | ||
| sql = """UPDATE qiita.processing_job | ||
| SET heartbeat = %s | ||
| WHERE processing_job_id = %s""" | ||
| qdb.sql_connection.TRN.add(sql, [value, self.id]) | ||
| qdb.sql_connection.TRN.add(sql, [datetime.now(), self.id]) | ||
| qdb.sql_connection.TRN.execute() | ||
|
|
||
| @property | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind restricting the scope of this try block to just the line that might raise the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done