Skip to content

Remove race condition - Fixes 2143 #2203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion qiita_db/private.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,27 @@ def build_analysis_files(job):
j.submit()
sleep(1)

# The validator jobs no longer finish the job automatically so we need
# to release the validators here
job.release_validators()

TASK_DICT = {'build_analysis_files': build_analysis_files}

def release_validators(job):
"""Waits until all the validators of a job are completed

Parameters
----------
job : qiita_db.processing_job.ProcessingJob
The processing job with the information of the parent job
"""
with qdb.sql_connection.TRN:
qdb.processing_job.ProcessingJob(
job.parameters.values['job']).release_validators()
job._set_status('success')


TASK_DICT = {'build_analysis_files': build_analysis_files,
'release_validators': release_validators}


def private_task(job_id):
Expand Down
74 changes: 55 additions & 19 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from itertools import chain
from collections import defaultdict
from json import dumps, loads
from time import sleep

from future.utils import viewitems, viewvalues
import networkx as nx
Expand Down Expand Up @@ -420,21 +421,59 @@ def release_validators(self):
"Only artifact transformation and private jobs can "
"release validators")

# Check if all the validators are ready by checking that there is
# no validator processing job whose status is not waiting
# Check if all the validators are completed. Validator jobs can be
# in two states when completed: 'waiting' in case of success
# or 'error' otherwise
sql = """SELECT COUNT(1)
FROM qiita.processing_job_validator pjv
JOIN qiita.processing_job pj ON
pjv.validator_id = pj.processing_job_id
JOIN qiita.processing_job_status USING
(processing_job_status_id)
WHERE pjv.processing_job_id = %s
AND processing_job_status != %s"""
qdb.sql_connection.TRN.add(sql, [self.id, 'waiting'])
AND processing_job_status NOT IN %s"""
sql_args = [self.id, ('waiting', 'error')]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on the description it sounds like here it should only be waiting, right? If this is incorrect, what happens with erred jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a comment explaining this, but basically here we want to wait until all validators are completed. Validator jobs can be in two states when completed: 'waiting' -> success and 'error' -> error

qdb.sql_connection.TRN.add(sql, sql_args)
remaining = qdb.sql_connection.TRN.execute_fetchlast()

if remaining == 0:
# All validators have completed
# Active polling - wait until all validator jobs are completed
while remaining != 0:
self.step = "Validating outputs (%d remaining)" % remaining
sleep(10)
qdb.sql_connection.TRN.add(sql, sql_args)
remaining = qdb.sql_connection.TRN.execute_fetchlast()

# Check if any of the validators errored
sql = """SELECT validator_id
FROM qiita.processing_job_validator pjv
JOIN qiita.processing_job pj
ON pjv.validator_id = pj.processing_job_id
JOIN qiita.processing_job_status USING
(processing_job_status_id)
WHERE pjv.processing_job_id = %s AND
processing_job_status = %s"""
qdb.sql_connection.TRN.add(sql, [self.id, 'error'])
errored = qdb.sql_connection.TRN.execute_fetchflatten()

if errored:
# At least one of the validators failed, Set the rest of the
# validators and the current job as failed
qdb.sql_connection.TRN.add(sql, [self.id, 'waiting'])
waiting = qdb.sql_connection.TRN.execute_fetchflatten()

common_error = "\n".join(
["Validator %s error message: %s"
% (j, ProcessingJob(j).log.msg) for j in errored])

val_error = "%d sister validator jobs failed: %s" % (
len(errored), common_error)
for j in waiting:
ProcessingJob(j)._set_error(val_error)

self._set_error('%d validator jobs failed: %s'
% (len(errored), common_error))
else:
# All validators have successfully completed
sql = """SELECT validator_id
FROM qiita.processing_job_validator
WHERE processing_job_id = %s"""
Expand All @@ -460,8 +499,6 @@ def release_validators(self):

self._update_and_launch_children(mapping)
self._set_status('success')
else:
self.step = "Validating outputs (%d remaining)" % remaining

def _complete_artifact_definition(self, artifact_data):
""""Performs the needed steps to complete an artifact definition job
Expand All @@ -487,7 +524,6 @@ def _complete_artifact_definition(self, artifact_data):
if job_params['provenance'] is not None:
# The artifact is a result from a previous job
provenance = loads(job_params['provenance'])
job = ProcessingJob(provenance['job'])
if provenance.get('data_type') is not None:
artifact_data = {'data_type': provenance['data_type'],
'artifact_data': artifact_data}
Expand All @@ -500,7 +536,6 @@ def _complete_artifact_definition(self, artifact_data):
qdb.sql_connection.TRN.execute()
# Can't create the artifact until all validators are completed
self._set_status('waiting')
job.release_validators()
else:
# The artifact is uploaded by the user or is the initial
# artifact of an analysis
Expand Down Expand Up @@ -619,6 +654,16 @@ def _complete_artifact_transformation(self, artifacts_data):
for j in validator_jobs:
j.submit()

# Submit the job that will release all the validators
plugin = qdb.software.Software.from_name_and_version(
'Qiita', 'alpha')
cmd = plugin.get_command('release_validators')
params = qdb.software.Parameters.load(
cmd, values_dict={'job': self.id})
job = ProcessingJob.create(self.user, params)
# Doing the submission outside of the transaction
job.submit()

def _set_validator_jobs(self, validator_jobs):
"""Sets the validator jobs for the current job

Expand Down Expand Up @@ -673,15 +718,6 @@ def complete(self, success, artifacts_data=None, error=None):
else:
self._set_status('success')
else:
if self.command.software.type == 'artifact definition':
job_params = self.parameters.values
if job_params.get('provenance') is not None:
# This artifact definition job is a result of a command
# run, if it fails, set up the status of the "parent"
# job also as failed, and assign the sem error message
provenance = loads(job_params['provenance'])
job = ProcessingJob(provenance['job'])
job._set_error(error)
self._set_error(error)

@property
Expand Down
19 changes: 19 additions & 0 deletions qiita_db/support_files/patches/57.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Aug 8, 2017
-- Add release validators internal Qiita command

DO $do$
DECLARE
qiita_sw_id bigint;
rv_cmd_id bigint;
BEGIN
SELECT software_id INTO qiita_sw_id
FROM qiita.software
WHERE name = 'Qiita' AND version = 'alpha';

INSERT INTO qiita.software_command (software_id, name, description)
VALUES (qiita_sw_id, 'release_validators', 'Releases the job validators')
RETURNING command_id INTO rv_cmd_id;

INSERT INTO qiita.command_parameter (command_id, parameter_name, parameter_type, required, default_value)
VALUES (rv_cmd_id, 'job', 'string', True, NULL);
END $do$;
1 change: 1 addition & 0 deletions qiita_db/test/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ def test_delete_as_output_job(self):
job.complete(True, artifacts_data=data)
job = qdb.processing_job.ProcessingJob(
"bcc7ebcd-39c1-43e4-af2d-822e3589f14d")
job.release_validators()
artifact = job.outputs['OTU table']
self._clean_up_files.extend([afp for _, afp, _ in artifact.filepaths])

Expand Down
21 changes: 18 additions & 3 deletions qiita_db/test/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ def test_complete_multiple_outputs(self):
artifact_data_2 = {'filepaths': [(fp2, 'biom')],
'artifact_type': 'BIOM'}
obs2._complete_artifact_definition(artifact_data_2)
self.assertEqual(obs1.status, 'waiting')
self.assertEqual(obs2.status, 'waiting')
self.assertEqual(job.status, 'running')

job.release_validators()

self.assertEqual(obs1.status, 'success')
self.assertEqual(obs2.status, 'success')
self.assertEqual(job.status, 'success')
Expand Down Expand Up @@ -386,7 +392,8 @@ def test_complete_artifact_definition(self):
qdb.user.User('test@foo.bar'), params)
job._set_validator_jobs([obs])
obs._complete_artifact_definition(artifact_data)
self.assertEqual(job.status, 'success')
self.assertEqual(obs.status, 'waiting')
self.assertEqual(job.status, 'running')
# Upload case implicitly tested by "test_complete_type"

def test_complete_artifact_transformation(self):
Expand Down Expand Up @@ -476,7 +483,9 @@ def test_complete_success(self):

obsjobs = set(self._get_all_job_ids())

self.assertEqual(len(obsjobs), len(alljobs) + 1)
# The complete call above submits 2 new jobs: the validator job and
# the release validators job. Hence the +2
self.assertEqual(len(obsjobs), len(alljobs) + 2)
self._wait_for_job(job)

def test_complete_failure(self):
Expand All @@ -501,12 +510,17 @@ def test_complete_failure(self):
)
obs = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params)
job._set_validator_jobs([obs])
obs.complete(False, error="Validation failure")
self.assertEqual(obs.status, 'error')
self.assertEqual(obs.log.msg, 'Validation failure')

self.assertEqual(job.status, 'running')
job.release_validators()
self.assertEqual(job.status, 'error')
self.assertEqual(job.log.msg, 'Validation failure')
self.assertEqual(
job.log.msg, '1 validator jobs failed: Validator %s '
'error message: Validation failure' % obs.id)

def test_complete_error(self):
with self.assertRaises(
Expand Down Expand Up @@ -628,6 +642,7 @@ def test_outputs(self):
job._set_validator_jobs([obs])
exp_artifact_count = qdb.util.get_count('qiita.artifact') + 1
obs._complete_artifact_definition(artifact_data)
job.release_validators()
self.assertEqual(job.status, 'success')

obs = job.outputs
Expand Down