Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Version 062018
* Fixed error in the sample info category summary (https://github.com/biocore/qiita/issues/2610).
* Qiimp has been added to the Qiita GUI.
* We added the qt-shogun plugin.
* Adding qiita_db.processing_job.ProcessingJob.get_validator_jobs to remove duplicated code.

Version 0.2.0-dev
-----------------
Expand Down
67 changes: 32 additions & 35 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,48 +467,30 @@ def release_validators(self):
# Check if all the validators are completed. Validator jobs can be
# in two states when completed: 'waiting' in case of success
# or 'error' otherwise
sql = """SELECT pjv.validator_id
FROM qiita.processing_job_validator pjv
JOIN qiita.processing_job pj ON
pjv.validator_id = pj.processing_job_id
JOIN qiita.processing_job_status USING
(processing_job_status_id)
WHERE pjv.processing_job_id = %s
AND processing_job_status NOT IN %s"""
sql_args = [self.id, ('waiting', 'error')]
qdb.sql_connection.TRN.add(sql, sql_args)
validator_ids = qdb.sql_connection.TRN.execute_fetchindex()
validator_ids = [j.id for j in self.get_validator_jobs
if j.status not in ['waiting', 'error']]

# Active polling - wait until all validator jobs are completed
while validator_ids:
jids = ', '.join([j[0] for j in validator_ids])
self.step = ("Validating outputs (%d remaining) via "
"job(s) %s" % (len(validator_ids), jids))
sleep(10)
qdb.sql_connection.TRN.add(sql, sql_args)
validator_ids = qdb.sql_connection.TRN.execute_fetchindex()
validator_ids = [j.id for j in self.get_validator_jobs
if j.status not in ['waiting', 'error']]

# Check if any of the validators errored
sql = """SELECT validator_id
FROM qiita.processing_job_validator pjv
JOIN qiita.processing_job pj
ON pjv.validator_id = pj.processing_job_id
JOIN qiita.processing_job_status USING
(processing_job_status_id)
WHERE pjv.processing_job_id = %s AND
processing_job_status = %s"""
qdb.sql_connection.TRN.add(sql, [self.id, 'error'])
errored = qdb.sql_connection.TRN.execute_fetchflatten()

errored = [j for j in self.get_validator_jobs
if j.status == 'error']
if errored:
# At least one of the validators failed, Set the rest of the
# validators and the current job as failed
qdb.sql_connection.TRN.add(sql, [self.id, 'waiting'])
waiting = qdb.sql_connection.TRN.execute_fetchflatten()
waiting = [j.id for j in self.get_validator_jobs
if j.status == 'waiting']

common_error = "\n".join(
["Validator %s error message: %s"
% (j, ProcessingJob(j).log.msg) for j in errored])
["Validator %s error message: %s" % (j.id, j.log.msg)
for j in errored])

val_error = "%d sister validator jobs failed: %s" % (
len(errored), common_error)
Expand All @@ -518,18 +500,12 @@ def release_validators(self):
self._set_error('%d validator jobs failed: %s'
% (len(errored), common_error))
else:
# All validators have successfully completed
sql = """SELECT validator_id
FROM qiita.processing_job_validator
WHERE processing_job_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
mapping = {}
# Loop through all validator jobs and release them, allowing
# to create the artifacts. Note that if any artifact creation
# fails, the rollback operation will make sure that the
# previously created artifacts are not in there
for jid in qdb.sql_connection.TRN.execute_fetchflatten():
vjob = ProcessingJob(jid)
for vjob in self.get_validator_jobs:
mapping.update(vjob.release())

if mapping:
Expand Down Expand Up @@ -931,6 +907,27 @@ def children(self):
for jid in qdb.sql_connection.TRN.execute_fetchflatten():
yield ProcessingJob(jid)

@property
def get_validator_jobs(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pythonic thing would be to name this property validator_jobs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K, will change.

"""The validators of this job

Returns
-------
generator of qiita_db.processing_job.ProcessingJob
The validators of this job
"""
with qdb.sql_connection.TRN:
sql = """SELECT validator_id
FROM qiita.processing_job_validator pjv
JOIN qiita.processing_job pj
ON pjv.validator_id = pj.processing_job_id
JOIN qiita.processing_job_status USING (
processing_job_status_id)
WHERE pjv.processing_job_id = %s"""
qdb.sql_connection.TRN.add(sql, [self.id])
for jid in qdb.sql_connection.TRN.execute_fetchflatten():
yield ProcessingJob(jid)

def _update_children(self, mapping):
"""Updates the children of the current job to populate the input params

Expand Down
4 changes: 2 additions & 2 deletions qiita_db/test/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def test_complete_success(self):
job.complete(True, artifacts_data=artifacts_data)
self._wait_for_job(job)
# Retrieve the job that is performing the validation:
val_job = qdb.processing_job.ProcessingJob(job.step.rsplit(" ", 1)[-1])
val_job = list(job.get_validator_jobs)[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this checking only the last job?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, that's the test it was before ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha ok.

# Test the output artifact is going to be named based on the
# input parameters
self.assertEqual(
Expand Down Expand Up @@ -544,7 +544,7 @@ def test_complete_success(self):
self._wait_for_job(job)

# Retrieve the job that is performing the validation:
val_job = qdb.processing_job.ProcessingJob(job.step.rsplit(" ", 1)[-1])
val_job = list(job.get_validator_jobs)[-1]
# Test the output artifact is going to be named based on the
# input parameters
self.assertEqual(
Expand Down
45 changes: 28 additions & 17 deletions scripts/qiita-recover-jobs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def _retrieve_queue_jobs():
# just retriving 'qiita' and ignoring [] (ipython workers)
if 'qiita' in line and '[]' not in line and
# and private jobs
'private' not in line]
'private' not in line and
'STDIN' not in line]
qiita_jids = []
for qj in qiita_jobs:
# to retrieve info about the jobs we need to use the fullname, so
Expand All @@ -53,11 +54,6 @@ def qiita_recover_jobs():
USING (processing_job_status_id)
WHERE processing_job_status = %s"""

sql_validators = """SELECT processing_job_id, array_agg(validator_id)
FROM qiita.processing_job_validator
WHERE processing_job_id in %s
GROUP BY processing_job_id"""

# Step 1: recover jobs that are in queue status
with TRN:
recover_type = 'queued'
Expand All @@ -77,20 +73,35 @@ def qiita_recover_jobs():
qiita_jids = _retrieve_queue_jobs()
jids_to_recover = jids - qiita_jids

# 3.1, and 3.2: checking which jobs have validators, and recover them
# first start validators that are not running
validate = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'Validate']
_submit_jobs(validate, recover_type + '/validate')

# then the release validator
release_validators = [
j for j in jids_to_recover
if ProcessingJob(j).command.name == 'release_validators']
_submit_jobs(release_validators, recover_type + '/release_validators')

jids_to_recover = (jids_to_recover - set(validate) -
set(release_validators))

TRN.add(sql_validators, [tuple(jids_to_recover)])
jids_validator = TRN.execute_fetchindex()
# 3.1, and 3.2: checking which jobs have validators, and recover them
jobs_with_validators = []
for j, validators in jids_validator:
validators = validators[1:-1].split(',')
# we append the main job as it shouldn't be touch
# and all their validators
jobs_with_validators.append(j)
jobs_with_validators.extend(validators)
for j in jids_to_recover:
job = ProcessingJob(j)
status = set([ProcessingJob(v).status
for v in validators if v not in qiita_jids])
validators = job.get_validator_jobs
if not validators:
jobs_with_validators.append(j)
continue
else:
# adding validators to jobs_with_validators to ignore them
# in the next code of block
for vj in validators:
jobs_with_validators.append(vj.id)
status = set([v.status for v in validators
if v.id not in qiita_jids])
# if there are no status, that means that the validators weren't
# created and we should rerun from scratch (Step 4)
if not bool(status):
Expand Down