Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
261e4f2
Adding DB structures to support (dflt/user) workflows
josenavas Feb 4, 2016
b516a49
Adding default workflow
josenavas Feb 8, 2016
6efbfdb
Fixing small issues with the patch
josenavas Feb 8, 2016
2f1d29e
Adding default workflow object
josenavas Feb 9, 2016
ac860d2
Adding more workflows and tests
josenavas Feb 9, 2016
77ac8eb
Adding artifact output table
josenavas Feb 10, 2016
320f681
Fixing job id type in patch
josenavas Feb 11, 2016
1e0e381
Modifying job object so it is managed internally to the object
josenavas Feb 11, 2016
08bd0b1
Modifying processing job handler and tests to adopt the new processin…
josenavas Feb 11, 2016
9022c1d
Fixing artifact removal and adding specific test
josenavas Feb 11, 2016
e863c01
Fixing the TG plugin to be compliant with the new API
josenavas Feb 11, 2016
b647e2a
Merge branch 'process-study-ui' into workflow-object
josenavas Feb 15, 2016
a9fa2ec
Fixing complete function and changing set_log to set_error
josenavas Feb 15, 2016
f582051
Moving job submission to the object itself
josenavas Feb 16, 2016
de4bceb
Merge branch 'process-study-ui' into workflow-object
josenavas Feb 16, 2016
0d116b0
Merge branch 'artifact-study-pages' of https://github.com/biocore/qii…
josenavas Feb 16, 2016
d732e24
Adding processing job workflow tables
josenavas Feb 18, 2016
f24dd63
Fixing typo in table
josenavas Feb 18, 2016
3ee5a69
Fixing names that DBSchema can't figure out
josenavas Feb 18, 2016
7aa1dda
Sometimes I just hate DBSChema
josenavas Feb 18, 2016
4e7d884
Adding from_default_workflow creation, name and user properties and t…
josenavas Feb 18, 2016
08e4e2c
Fixing another typo - not my best day
josenavas Feb 18, 2016
e911e61
Adding graph property to the workflow object
josenavas Feb 18, 2016
6804cbf
Adding form_scratch constructor, and fixing graph
josenavas Feb 18, 2016
719dd8e
Adding add method
josenavas Feb 18, 2016
3af70a4
Adding remove functionality and a bunch of tests
josenavas Feb 18, 2016
d6a9db0
Add submit function to workflow
josenavas Feb 18, 2016
b59bf52
Adding pending
josenavas Feb 18, 2016
c58d3ab
Implementing execution cascade
josenavas Feb 18, 2016
12b3d7a
Addressing comments
josenavas Feb 18, 2016
6ac0bbd
Addressing @antgonza's comments
josenavas Feb 18, 2016
d78be98
Fixing typo on populate_test_db.sql
josenavas Feb 19, 2016
b3134f8
Merge branch 'artifact-study-pages' of https://github.com/biocore/qii…
josenavas Feb 19, 2016
99f2b71
Merge branch 'artifact-study-pages' of https://github.com/biocore/qii…
josenavas Feb 19, 2016
2a5ff50
Addressing @ElDeveloper's comments
josenavas Feb 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion qiita_db/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ def delete(cls, artifact_id):
raise qdb.exceptions.QiitaDBArtifactDeletionError(
artifact_id, "it has been submitted to VAMPS")

# Check if there is a job queued that will use the artifact
# Check if there is a job queued or running that will use/is using
# the artifact
sql = """SELECT EXISTS(
SELECT *
FROM qiita.artifact_processing_job
Expand Down Expand Up @@ -395,6 +396,11 @@ def delete(cls, artifact_id):
WHERE processing_job_id IN %s"""
qdb.sql_connection.TRN.add(sql, [job_ids])

# Delete the entry from the artifact_output_processing_job table
sql = """DELETE FROM qiita.artifact_output_processing_job
WHERE artifact_id = %s"""
qdb.sql_connection.TRN.add(sql, [artifact_id])

# Detach the artifact from its filepaths
sql = """DELETE FROM qiita.artifact_filepath
WHERE artifact_id = %s"""
Expand Down
44 changes: 16 additions & 28 deletions qiita_db/handlers/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from datetime import datetime
from json import loads

import qiita_db as qdb
Expand Down Expand Up @@ -104,15 +103,11 @@ def post(self, job_id):
with qdb.sql_connection.TRN:
job, success, error_msg = _get_job(job_id)
if success:
job_status = job.status
if job_status == 'queued':
job.status = 'running'
job.heartbeat = datetime.now()
elif job_status == 'running':
job.heartbeat = datetime.now()
else:
try:
job.execute_heartbeat()
except qdb.exceptions.QiitaDBOperationNotPermittedError as e:
success = False
error_msg = 'Job already finished. Status: %s' % job_status
error_msg = str(e)

response = {'success': success, 'error': error_msg}
self.write(response)
Expand Down Expand Up @@ -175,26 +170,19 @@ def post(self, job_id):
with qdb.sql_connection.TRN:
job, success, error_msg = _get_job(job_id)
if success:
if job.status != 'running':
success = False
error_msg = 'Job in a non-running state.'
else:
payload = loads(self.request.body)
if payload['success']:
for artifact_data in payload['artifacts']:
filepaths = artifact_data['filepaths']
atype = artifact_data['artifact_type']
parents = job.input_artifacts
params = job.parameters
qdb.artifact.Artifact.create(
filepaths, atype, parents=parents,
processing_parameters=params)
job.status = 'success'
payload = loads(self.request.body)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind restricting the scope of this try block to just the line that might raise the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

payload_success = payload['success']
if payload_success:
artifacts = payload['artifacts']
error = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should error be '' so we always return a string?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this, got confused with error_msg. However, error_msg will not be defined, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error_msg is defined in line 171.

else:
log = qdb.logger.LogEntry.create(
'Runtime', payload['error'])
job.status = 'error'
job.log = log
artifacts = None
error = payload['error']
job.complete(payload_success, artifacts, error)
except qdb.exceptions.QiitaDBOperationNotPermittedError as e:
success = False
error_msg = str(e)

response = {'success': success, 'error': error_msg}
self.write(response)
13 changes: 6 additions & 7 deletions qiita_db/handlers/tests/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_post_job_already_finished(self):
'', headers=self.header)
self.assertEqual(obs.code, 200)
exp = {'success': False,
'error': 'Job already finished. Status: success'}
'error': "Can't execute heartbeat on job: already completed"}
self.assertEqual(loads(obs.body), exp)

def test_post(self):
Expand Down Expand Up @@ -177,12 +177,13 @@ def test_post_job_does_not_exists(self):
self.assertEqual(loads(obs.body), exp)

def test_post_job_not_running(self):
payload = dumps({'sucess': False, 'error': 'Job failure'})
payload = dumps({'success': False, 'error': 'Job failure'})
obs = self.post(
'/qiita_db/jobs/063e553b-327c-4818-ab4a-adfe58e49860/complete/',
payload, headers=self.header)
self.assertEqual(obs.code, 200)
exp = {'success': False, 'error': "Job in a non-running state."}
exp = {'success': False,
'error': "Can't complete job: not in a running state"}
self.assertEqual(loads(obs.body), exp)

def test_post_job_failure(self):
Expand All @@ -209,10 +210,8 @@ def test_post_job_success(self):
exp_artifact_count = qdb.util.get_count('qiita.artifact') + 1
payload = dumps(
{'success': True, 'error': '',
'artifacts': [
{'filepaths': [(fp, 'biom')],
'artifact_type': 'BIOM'}
]})
'artifacts': {'OTU table': {'filepaths': [(fp, 'biom')],
'artifact_type': 'BIOM'}}})
obs = self.post(
'/qiita_db/jobs/bcc7ebcd-39c1-43e4-af2d-822e3589f14d/complete/',
payload, headers=self.header)
Expand Down
87 changes: 75 additions & 12 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
# -----------------------------------------------------------------------------

from uuid import UUID
from datetime import datetime

from future.utils import viewitems

import qiita_db as qdb

Expand Down Expand Up @@ -186,8 +189,7 @@ def status(self):
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()

@status.setter
def status(self, value):
def _set_status(self, value):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I changed the this from being a property setter to be an internal function - so the tests for the property setter still applies (I renamed them)

"""Sets the status of the job

Parameters
Expand Down Expand Up @@ -219,6 +221,60 @@ def status(self, value):
qdb.sql_connection.TRN.add(sql, [new_status, self.id])
qdb.sql_connection.TRN.execute()

def complete(self, success, artifacts_data=None, error=None):
"""Completes the job, either with a success or error status

Parameters
----------
success : bool
Whether the job has completed successfully or not
artifacts_data : dict of dicts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is also optional, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

The generated artifact information, keyed by output name.
The format of each of the internal dictionaries must be
{'filepaths': list of (str, str), 'artifact_type': str}
where `filepaths` contains the list of filepaths and filepath types
for the artifact and `artifact_type` the type of the artifact
error : str, optional
If the job was not successful, the error message

Raises
------
qiita_db.exceptions.QiitaDBOperationNotPermittedError
If the job is not in running state
"""
with qdb.sql_connection.TRN:
if self.status != 'running':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Can't complete job: not in a running state")
if success:
if artifacts_data:
artifact_ids = []
for out_name, a_data in viewitems(artifacts_data):
filepaths = a_data['filepaths']
atype = a_data['artifact_type']
parents = self.input_artifacts
params = self.parameters
a = qdb.artifact.Artifact.create(
filepaths, atype, parents=parents,
processing_parameters=params)
cmd_out_id = qdb.util.convert_to_id(
out_name, "command_output", "name")
artifact_ids.append((cmd_out_id, a.id))
if artifact_ids:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases will artifact_ids be empty and what should happen? I don't think there is test for the case where artifact_ids doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there is no command that doesn't generate an artifact. I'm preparing the system for such cases. One example may be EBI submission - they don't generate new artifacts. Since EBI submission is not a plugin command, I can't test.

sql = """INSERT INTO
qiita.artifact_output_processing_job
(artifact_id, processing_job_id,
command_output_id)
VALUES (%s, %s, %s)"""
sql_params = [[aid, self.id, out_id]
for out_id, aid in artifact_ids]
qdb.sql_connection.TRN.add(sql, sql_params, many=True)
self._set_status('success')
else:
log = qdb.logger.LogEntry.create('Runtime', error)
self._set_status('error')
self._set_log(log)

@property
def log(self):
"""The log entry attached to the job if it failed
Expand All @@ -240,8 +296,7 @@ def log(self):
res = qdb.logger.LogEntry(log_id)
return res

@log.setter
def log(self, value):
def _set_log(self, value):
"""Attaches a log entry to the job

Parameters
Expand Down Expand Up @@ -281,20 +336,28 @@ def heartbeat(self):
qdb.sql_connection.TRN.add(sql, [self.id])
return qdb.sql_connection.TRN.execute_fetchlast()

@heartbeat.setter
def heartbeat(self, value):
"""Sets the timestamp for the last received heartbeat
def execute_heartbeat(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only changes the heartbeat from one state to another, right? If so, this name is confusing and the first line of the docstring is misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide a better name? I didn't like it either, but I couldn't come up with a good name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_heartbeat_state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""Updates the heartbeat of the job

Parameters
----------
value : datetime
The new timestamp
In case that the job is in `queued` status, it changes the status to
`running`.

Raises
------
QiitaDBOperationNotPermittedError
If the job is already completed
"""
with qdb.sql_connection.TRN:
status = self.status
if status == 'queued':
self._set_status('running')
elif status != 'running':
raise qdb.exceptions.QiitaDBOperationNotPermittedError(
"Can't execute heartbeat on job: already completed")
sql = """UPDATE qiita.processing_job
SET heartbeat = %s
WHERE processing_job_id = %s"""
qdb.sql_connection.TRN.add(sql, [value, self.id])
qdb.sql_connection.TRN.add(sql, [datetime.now(), self.id])
qdb.sql_connection.TRN.execute()

@property
Expand Down
Loading