Skip to content

Commit 24b7d8e

Browse files
josenavasantgonza
authored andcommitted
Remove race condition - Fixes 2143 (#2203)
* Removing race condition * Submit the validator release as another job * Solving some tests * Fixing test * Addressing @antgonza's comments
1 parent 6b59f37 commit 24b7d8e

File tree

5 files changed

+113
-23
lines changed

5 files changed

+113
-23
lines changed

qiita_db/private.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,27 @@ def build_analysis_files(job):
4747
j.submit()
4848
sleep(1)
4949

50+
# The validator jobs no longer finish the job automatically so we need
51+
# to release the validators here
52+
job.release_validators()
5053

51-
TASK_DICT = {'build_analysis_files': build_analysis_files}
54+
55+
def release_validators(job):
56+
"""Waits until all the validators of a job are completed
57+
58+
Parameters
59+
----------
60+
job : qiita_db.processing_job.ProcessingJob
61+
The processing job with the information of the parent job
62+
"""
63+
with qdb.sql_connection.TRN:
64+
qdb.processing_job.ProcessingJob(
65+
job.parameters.values['job']).release_validators()
66+
job._set_status('success')
67+
68+
69+
TASK_DICT = {'build_analysis_files': build_analysis_files,
70+
'release_validators': release_validators}
5271

5372

5473
def private_task(job_id):

qiita_db/processing_job.py

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from itertools import chain
1515
from collections import defaultdict
1616
from json import dumps, loads
17+
from time import sleep
1718

1819
from future.utils import viewitems, viewvalues
1920
import networkx as nx
@@ -420,21 +421,59 @@ def release_validators(self):
420421
"Only artifact transformation and private jobs can "
421422
"release validators")
422423

423-
# Check if all the validators are ready by checking that there is
424-
# no validator processing job whose status is not waiting
424+
# Check if all the validators are completed. Validator jobs can be
425+
# in two states when completed: 'waiting' in case of success
426+
# or 'error' otherwise
425427
sql = """SELECT COUNT(1)
426428
FROM qiita.processing_job_validator pjv
427429
JOIN qiita.processing_job pj ON
428430
pjv.validator_id = pj.processing_job_id
429431
JOIN qiita.processing_job_status USING
430432
(processing_job_status_id)
431433
WHERE pjv.processing_job_id = %s
432-
AND processing_job_status != %s"""
433-
qdb.sql_connection.TRN.add(sql, [self.id, 'waiting'])
434+
AND processing_job_status NOT IN %s"""
435+
sql_args = [self.id, ('waiting', 'error')]
436+
qdb.sql_connection.TRN.add(sql, sql_args)
434437
remaining = qdb.sql_connection.TRN.execute_fetchlast()
435438

436-
if remaining == 0:
437-
# All validators have completed
439+
# Active polling - wait until all validator jobs are completed
440+
while remaining != 0:
441+
self.step = "Validating outputs (%d remaining)" % remaining
442+
sleep(10)
443+
qdb.sql_connection.TRN.add(sql, sql_args)
444+
remaining = qdb.sql_connection.TRN.execute_fetchlast()
445+
446+
# Check if any of the validators errored
447+
sql = """SELECT validator_id
448+
FROM qiita.processing_job_validator pjv
449+
JOIN qiita.processing_job pj
450+
ON pjv.validator_id = pj.processing_job_id
451+
JOIN qiita.processing_job_status USING
452+
(processing_job_status_id)
453+
WHERE pjv.processing_job_id = %s AND
454+
processing_job_status = %s"""
455+
qdb.sql_connection.TRN.add(sql, [self.id, 'error'])
456+
errored = qdb.sql_connection.TRN.execute_fetchflatten()
457+
458+
if errored:
459+
# At least one of the validators failed, Set the rest of the
460+
# validators and the current job as failed
461+
qdb.sql_connection.TRN.add(sql, [self.id, 'waiting'])
462+
waiting = qdb.sql_connection.TRN.execute_fetchflatten()
463+
464+
common_error = "\n".join(
465+
["Validator %s error message: %s"
466+
% (j, ProcessingJob(j).log.msg) for j in errored])
467+
468+
val_error = "%d sister validator jobs failed: %s" % (
469+
len(errored), common_error)
470+
for j in waiting:
471+
ProcessingJob(j)._set_error(val_error)
472+
473+
self._set_error('%d validator jobs failed: %s'
474+
% (len(errored), common_error))
475+
else:
476+
# All validators have successfully completed
438477
sql = """SELECT validator_id
439478
FROM qiita.processing_job_validator
440479
WHERE processing_job_id = %s"""
@@ -460,8 +499,6 @@ def release_validators(self):
460499

461500
self._update_and_launch_children(mapping)
462501
self._set_status('success')
463-
else:
464-
self.step = "Validating outputs (%d remaining)" % remaining
465502

466503
def _complete_artifact_definition(self, artifact_data):
467504
""""Performs the needed steps to complete an artifact definition job
@@ -487,7 +524,6 @@ def _complete_artifact_definition(self, artifact_data):
487524
if job_params['provenance'] is not None:
488525
# The artifact is a result from a previous job
489526
provenance = loads(job_params['provenance'])
490-
job = ProcessingJob(provenance['job'])
491527
if provenance.get('data_type') is not None:
492528
artifact_data = {'data_type': provenance['data_type'],
493529
'artifact_data': artifact_data}
@@ -500,7 +536,6 @@ def _complete_artifact_definition(self, artifact_data):
500536
qdb.sql_connection.TRN.execute()
501537
# Can't create the artifact until all validators are completed
502538
self._set_status('waiting')
503-
job.release_validators()
504539
else:
505540
# The artifact is uploaded by the user or is the initial
506541
# artifact of an analysis
@@ -619,6 +654,16 @@ def _complete_artifact_transformation(self, artifacts_data):
619654
for j in validator_jobs:
620655
j.submit()
621656

657+
# Submit the job that will release all the validators
658+
plugin = qdb.software.Software.from_name_and_version(
659+
'Qiita', 'alpha')
660+
cmd = plugin.get_command('release_validators')
661+
params = qdb.software.Parameters.load(
662+
cmd, values_dict={'job': self.id})
663+
job = ProcessingJob.create(self.user, params)
664+
# Doing the submission outside of the transaction
665+
job.submit()
666+
622667
def _set_validator_jobs(self, validator_jobs):
623668
"""Sets the validator jobs for the current job
624669
@@ -673,15 +718,6 @@ def complete(self, success, artifacts_data=None, error=None):
673718
else:
674719
self._set_status('success')
675720
else:
676-
if self.command.software.type == 'artifact definition':
677-
job_params = self.parameters.values
678-
if job_params.get('provenance') is not None:
679-
# This artifact definition job is a result of a command
680-
# run, if it fails, set up the status of the "parent"
681-
# job also as failed, and assign the sem error message
682-
provenance = loads(job_params['provenance'])
683-
job = ProcessingJob(provenance['job'])
684-
job._set_error(error)
685721
self._set_error(error)
686722

687723
@property

qiita_db/support_files/patches/57.sql

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- Aug 8, 2017
2+
-- Add release validators internal Qiita command
3+
4+
DO $do$
5+
DECLARE
6+
qiita_sw_id bigint;
7+
rv_cmd_id bigint;
8+
BEGIN
9+
SELECT software_id INTO qiita_sw_id
10+
FROM qiita.software
11+
WHERE name = 'Qiita' AND version = 'alpha';
12+
13+
INSERT INTO qiita.software_command (software_id, name, description)
14+
VALUES (qiita_sw_id, 'release_validators', 'Releases the job validators')
15+
RETURNING command_id INTO rv_cmd_id;
16+
17+
INSERT INTO qiita.command_parameter (command_id, parameter_name, parameter_type, required, default_value)
18+
VALUES (rv_cmd_id, 'job', 'string', True, NULL);
19+
END $do$;

qiita_db/test/test_artifact.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ def test_delete_as_output_job(self):
10111011
job.complete(True, artifacts_data=data)
10121012
job = qdb.processing_job.ProcessingJob(
10131013
"bcc7ebcd-39c1-43e4-af2d-822e3589f14d")
1014+
job.release_validators()
10141015
artifact = job.outputs['OTU table']
10151016
self._clean_up_files.extend([afp for _, afp, _ in artifact.filepaths])
10161017

qiita_db/test/test_processing_job.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,12 @@ def test_complete_multiple_outputs(self):
359359
artifact_data_2 = {'filepaths': [(fp2, 'biom')],
360360
'artifact_type': 'BIOM'}
361361
obs2._complete_artifact_definition(artifact_data_2)
362+
self.assertEqual(obs1.status, 'waiting')
363+
self.assertEqual(obs2.status, 'waiting')
364+
self.assertEqual(job.status, 'running')
365+
366+
job.release_validators()
367+
362368
self.assertEqual(obs1.status, 'success')
363369
self.assertEqual(obs2.status, 'success')
364370
self.assertEqual(job.status, 'success')
@@ -386,7 +392,8 @@ def test_complete_artifact_definition(self):
386392
qdb.user.User('test@foo.bar'), params)
387393
job._set_validator_jobs([obs])
388394
obs._complete_artifact_definition(artifact_data)
389-
self.assertEqual(job.status, 'success')
395+
self.assertEqual(obs.status, 'waiting')
396+
self.assertEqual(job.status, 'running')
390397
# Upload case implicitly tested by "test_complete_type"
391398

392399
def test_complete_artifact_transformation(self):
@@ -476,7 +483,9 @@ def test_complete_success(self):
476483

477484
obsjobs = set(self._get_all_job_ids())
478485

479-
self.assertEqual(len(obsjobs), len(alljobs) + 1)
486+
# The complete call above submits 2 new jobs: the validator job and
487+
# the release validators job. Hence the +2
488+
self.assertEqual(len(obsjobs), len(alljobs) + 2)
480489
self._wait_for_job(job)
481490

482491
def test_complete_failure(self):
@@ -501,12 +510,17 @@ def test_complete_failure(self):
501510
)
502511
obs = qdb.processing_job.ProcessingJob.create(
503512
qdb.user.User('test@foo.bar'), params)
513+
job._set_validator_jobs([obs])
504514
obs.complete(False, error="Validation failure")
505515
self.assertEqual(obs.status, 'error')
506516
self.assertEqual(obs.log.msg, 'Validation failure')
507517

518+
self.assertEqual(job.status, 'running')
519+
job.release_validators()
508520
self.assertEqual(job.status, 'error')
509-
self.assertEqual(job.log.msg, 'Validation failure')
521+
self.assertEqual(
522+
job.log.msg, '1 validator jobs failed: Validator %s '
523+
'error message: Validation failure' % obs.id)
510524

511525
def test_complete_error(self):
512526
with self.assertRaises(
@@ -628,6 +642,7 @@ def test_outputs(self):
628642
job._set_validator_jobs([obs])
629643
exp_artifact_count = qdb.util.get_count('qiita.artifact') + 1
630644
obs._complete_artifact_definition(artifact_data)
645+
job.release_validators()
631646
self.assertEqual(job.status, 'success')
632647

633648
obs = job.outputs

0 commit comments

Comments
 (0)