Skip to content

Fix 1812 #2371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qiita_db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def create(cls, owner, name, description, from_default=False,
'analysis': a_id,
'merge_dup_sample_ids': merge_duplicated_sample_ids})
job = qdb.processing_job.ProcessingJob.create(
owner, params)
owner, params, True)
sql = """INSERT INTO qiita.analysis_processing_job
(analysis_id, processing_job_id)
VALUES (%s, %s)"""
Expand Down
2 changes: 1 addition & 1 deletion qiita_db/handlers/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def post(self):
params = qdb.software.Parameters.load(cmd, json_str=params_dict)

job = qdb.processing_job.ProcessingJob.create(
qdb.user.User(user), params)
qdb.user.User(user), params, True)

if status:
job._set_status(status)
Expand Down
71 changes: 60 additions & 11 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from multiprocessing import Process
from os.path import join
from itertools import chain
from collections import defaultdict
from collections import defaultdict, Iterable
from json import dumps, loads
from time import sleep

Expand Down Expand Up @@ -119,7 +119,7 @@ def exists(cls, job_id):
return qdb.sql_connection.TRN.execute_fetchlast()

@classmethod
def create(cls, user, parameters):
def create(cls, user, parameters, force=False):
"""Creates a new job in the system

Parameters
Expand All @@ -128,14 +128,56 @@ def create(cls, user, parameters):
The user executing the job
parameters : qiita_db.software.Parameters
The parameters of the job being executed
force : bool
Force creation on duplicated parameters

Returns
-------
qiita_db.processing_job.ProcessingJob
The newly created job

Notes
-----
If force is True the job is gonna be created even if another job
exists with the same parameters
"""
with qdb.sql_connection.TRN:
command = parameters.command

# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email, processing_job_status
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE command_id = %s AND processing_job_status IN (
'success', 'waiting', 'running') """

# we need to use ILIKE because of booleans as they can be
# false or False
_format = "command_parameters->>'%s' ILIKE '%s'"
params = []
for k, v in viewitems(parameters.values):
# this is necessary in case we have an Iterable as a value
# but that is not unicode or string
if isinstance(v, Iterable) and not isinstance(v, (str,
unicode)):
for vv in v:
params.append(_format % (k, vv))
else:
params.append(_format % (k, v))
params = " AND ".join(params)

sql = sql + ' AND ' if params else sql
qdb.sql_connection.TRN.add(sql + params, [command.id])
existing_jobs = qdb.sql_connection.TRN.execute_fetchindex()
if existing_jobs and not force:
raise ValueError(
'Cannot create job because the parameters are the same as '
'jobs that are queued, running or already have '
'succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status in existing_jobs]))

sql = """INSERT INTO qiita.processing_job
(email, command_id, command_parameters,
processing_job_status_id)
Expand Down Expand Up @@ -630,7 +672,7 @@ def _complete_artifact_transformation(self, artifacts_data):
validate_params = qdb.software.Parameters.load(
cmd, values_dict=values_dict)
validator_jobs.append(
ProcessingJob.create(self.user, validate_params))
ProcessingJob.create(self.user, validate_params, True))

# Change the current step of the job

Expand Down Expand Up @@ -1014,7 +1056,8 @@ def _common_creation_steps(cls, user, root_jobs, name=None):
return cls(w_id)

@classmethod
def from_default_workflow(cls, user, dflt_wf, req_params, name=None):
def from_default_workflow(cls, user, dflt_wf, req_params, name=None,
force=False):
"""Creates a new processing workflow from a default workflow

Parameters
Expand All @@ -1029,6 +1072,8 @@ def from_default_workflow(cls, user, dflt_wf, req_params, name=None):
parameter name.
name : str, optional
Name of the workflow. Default: generated from user's name
force : bool
Force creation on duplicated parameters

Returns
-------
Expand Down Expand Up @@ -1072,7 +1117,7 @@ def from_default_workflow(cls, user, dflt_wf, req_params, name=None):
n: ProcessingJob.create(
user,
qdb.software.Parameters.from_default_params(
p, req_params[c]))
p, req_params[c]), force)
for n, (c, p) in viewitems(roots)}
root_jobs = node_to_job.values()

Expand Down Expand Up @@ -1113,7 +1158,7 @@ def from_default_workflow(cls, user, dflt_wf, req_params, name=None):
# the current job, so create it
new_job = ProcessingJob.create(
user, qdb.software.Parameters.from_default_params(
dflt_params, job_req_params))
dflt_params, job_req_params), force)
node_to_job[n] = new_job

# Create the parent-child links in the DB
Expand All @@ -1123,7 +1168,7 @@ def from_default_workflow(cls, user, dflt_wf, req_params, name=None):
return cls._common_creation_steps(user, root_jobs, name)

@classmethod
def from_scratch(cls, user, parameters, name=None):
def from_scratch(cls, user, parameters, name=None, force=False):
"""Creates a new processing workflow from scratch

Parameters
Expand All @@ -1134,13 +1179,15 @@ def from_scratch(cls, user, parameters, name=None):
The parameters of the first job in the workflow
name : str, optional
Name of the workflow. Default: generated from user's name
force : bool
Force creation on duplicated parameters

Returns
-------
qiita_db.processing_job.ProcessingWorkflow
The newly created workflow
"""
job = ProcessingJob.create(user, parameters)
job = ProcessingJob.create(user, parameters, force)
return cls._common_creation_steps(user, [job], name)

@property
Expand Down Expand Up @@ -1249,7 +1296,7 @@ def _raise_if_not_in_construction(self):
"Workflow not in construction")

def add(self, dflt_params, connections=None, req_params=None,
opt_params=None):
opt_params=None, force=False):
"""Adds a new job to the workflow

Parameters
Expand All @@ -1268,6 +1315,8 @@ def add(self, dflt_params, connections=None, req_params=None,
opt_params : dict of {str: object}, optional
The optional parameters to change from the default set, keyed by
parameter name. Default: None, use the values in `dflt_params`
force : bool
Force creation on duplicated parameters

Raises
------
Expand All @@ -1289,7 +1338,7 @@ def add(self, dflt_params, connections=None, req_params=None,

new_job = ProcessingJob.create(
self.user, qdb.software.Parameters.from_default_params(
dflt_params, req_params, opt_params=opt_params))
dflt_params, req_params, opt_params=opt_params), force)

# SQL used to create the edges between jobs
sql = """INSERT INTO qiita.parent_processing_job
Expand All @@ -1303,7 +1352,7 @@ def add(self, dflt_params, connections=None, req_params=None,
# workflow, so it is a new root job
new_job = ProcessingJob.create(
self.user, qdb.software.Parameters.from_default_params(
dflt_params, req_params, opt_params=opt_params))
dflt_params, req_params, opt_params=opt_params), force)
sql = """INSERT INTO qiita.processing_job_workflow_root
(processing_job_workflow_id, processing_job_id)
VALUES (%s, %s)"""
Expand Down
2 changes: 1 addition & 1 deletion qiita_db/support_files/patches/python_patches/43.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
cmd_out, "command_output", "name")

# the owner of the study will create the job
job = PJ.create(c.study.owner, c.processing_parameters)
job = PJ.create(c.study.owner, c.processing_parameters, True)
with qdb.sql_connection.TRN:
sql = """INSERT INTO
qiita.artifact_output_processing_job
Expand Down
48 changes: 30 additions & 18 deletions qiita_db/test/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from qiita_core.qiita_settings import qiita_config


def _create_job():
def _create_job(force=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we default to True, are there situations in the codebase where this is False?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, this API has changed I believe since the existing functionality would be force=False if I understand the docs above. Will this change be a problem elsewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...or was the default behavior previously to force the execution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a local function to avoid duplication of code that by default always ignores/forces the duplicated jobs. There is a single test (below) that doesn't force this behavior: _create_job(False)

job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'),
qdb.software.Parameters.load(
Expand All @@ -37,7 +37,8 @@ def _create_job():
"qual_score_window": 0, "disable_primers": False,
"reverse_primers": "disable",
"reverse_primer_mismatches": 0,
"truncate_ambi_bases": False, "input_data": 1}))
"truncate_ambi_bases": False, "input_data": 1}),
force)
return job


Expand Down Expand Up @@ -246,7 +247,8 @@ def test_create(self):
exp_params = qdb.software.Parameters.load(exp_command,
json_str=json_str)
exp_user = qdb.user.User('test@foo.bar')
obs = qdb.processing_job.ProcessingJob.create(exp_user, exp_params)
obs = qdb.processing_job.ProcessingJob.create(
exp_user, exp_params, True)
self.assertEqual(obs.user, exp_user)
self.assertEqual(obs.command, exp_command)
self.assertEqual(obs.parameters, exp_params)
Expand All @@ -256,6 +258,15 @@ def test_create(self):
self.assertEqual(obs.step, None)
self.assertTrue(obs in qdb.artifact.Artifact(1).jobs())

def test_create_duplicated(self):
job = _create_job()
job._set_status('running')
with self.assertRaisesRegexp(ValueError, 'Cannot create job because '
'the parameters are the same as jobs '
'that are queued, running or already '
'have succeeded:'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this passing if this is not a regex, and there's no pattern to match here? Shouldn't there be a job id, or a regex indicating that this should match a job id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause it behaves as an IN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

_create_job(False)

def test_set_status(self):
job = _create_job()
self.assertEqual(job.status, 'in_construction')
Expand Down Expand Up @@ -328,7 +339,7 @@ def test_complete_multiple_outputs(self):
'out1', "command_output", "name"),
'name': 'out1'})})
user = qdb.user.User('test@foo.bar')
obs1 = qdb.processing_job.ProcessingJob.create(user, params)
obs1 = qdb.processing_job.ProcessingJob.create(user, params, True)
obs1._set_status('running')
params = qdb.software.Parameters.load(
qdb.software.Command(4),
Expand All @@ -339,7 +350,7 @@ def test_complete_multiple_outputs(self):
'cmd_out_id': qdb.util.convert_to_id(
'out1', "command_output", "name"),
'name': 'out1'})})
obs2 = qdb.processing_job.ProcessingJob.create(user, params)
obs2 = qdb.processing_job.ProcessingJob.create(user, params, True)
obs2._set_status('running')
# Make sure that we link the original job with its validator jobs
job._set_validator_jobs([obs1, obs2])
Expand Down Expand Up @@ -416,7 +427,8 @@ def test_complete_no_artifact_data(self):
qdb.user.User('test@foo.bar'),
qdb.software.Parameters.load(
qdb.software.Command(5),
values_dict={"input_data": 1}))
values_dict={"input_data": 1}),
True)
job._set_status('running')
job.complete(False, error='Some Error')
self.assertEqual(job.status, 'error')
Expand Down Expand Up @@ -450,7 +462,7 @@ def test_complete_type(self):
values_dict={'template': pt.id, 'files': fp,
'artifact_type': 'BIOM'})
obs = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params)
qdb.user.User('test@foo.bar'), params, True)
obs._set_status('running')
obs.complete(True, artifacts_data=artifacts_data)
self.assertEqual(obs.status, 'success')
Expand Down Expand Up @@ -513,7 +525,7 @@ def test_complete_failure(self):
'cmd_out_id': 3})}
)
obs = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params)
qdb.user.User('test@foo.bar'), params, True)
job._set_validator_jobs([obs])
obs.complete(False, error="Validation failure")
self.assertEqual(obs.status, 'error')
Expand Down Expand Up @@ -601,7 +613,7 @@ def test_update_children(self):
name = "Test processing workflow"

tester = qdb.processing_job.ProcessingWorkflow.from_scratch(
exp_user, exp_params, name=name)
exp_user, exp_params, name=name, force=True)

parent = tester.graph.nodes()[0]
connections = {parent: {'demultiplexed': 'input_data'}}
Expand Down Expand Up @@ -643,7 +655,7 @@ def test_outputs(self):
'name': 'outArtifact'})}
)
obs = qdb.processing_job.ProcessingJob.create(
qdb.user.User('test@foo.bar'), params)
qdb.user.User('test@foo.bar'), params, True)
job._set_validator_jobs([obs])
exp_artifact_count = qdb.util.get_count('qiita.artifact') + 1
obs._complete_artifact_definition(artifact_data)
Expand Down Expand Up @@ -722,7 +734,7 @@ def test_from_default_workflow(self):
name = "Test processing workflow"

obs = qdb.processing_job.ProcessingWorkflow.from_default_workflow(
exp_user, dflt_wf, req_params, name=name)
exp_user, dflt_wf, req_params, name=name, force=True)
self.assertEqual(obs.name, name)
self.assertEqual(obs.user, exp_user)
obs_graph = obs.graph
Expand Down Expand Up @@ -786,7 +798,7 @@ def test_from_scratch(self):
name = "Test processing workflow"

obs = qdb.processing_job.ProcessingWorkflow.from_scratch(
exp_user, exp_params, name=name)
exp_user, exp_params, name=name, force=True)
self.assertEqual(obs.name, name)
self.assertEqual(obs.user, exp_user)
obs_graph = obs.graph
Expand All @@ -811,12 +823,12 @@ def test_add(self):
name = "Test processing workflow"

obs = qdb.processing_job.ProcessingWorkflow.from_scratch(
exp_user, exp_params, name=name)
exp_user, exp_params, name=name, force=True)

parent = obs.graph.nodes()[0]
connections = {parent: {'demultiplexed': 'input_data'}}
dflt_params = qdb.software.DefaultParameters(10)
obs.add(dflt_params, connections=connections)
obs.add(dflt_params, connections=connections, force=True)

obs_graph = obs.graph
self.assertTrue(isinstance(obs_graph, nx.DiGraph))
Expand All @@ -843,7 +855,7 @@ def test_add(self):
# This also tests that the `graph` property returns the graph correctly
# when there are root nodes that don't have any children
dflt_params = qdb.software.DefaultParameters(1)
obs.add(dflt_params, req_params={'input_data': 1})
obs.add(dflt_params, req_params={'input_data': 1}, force=True)

obs_graph = obs.graph
self.assertTrue(isinstance(obs_graph, nx.DiGraph))
Expand Down Expand Up @@ -887,7 +899,7 @@ def test_remove(self):
name = "Test processing workflow"

tester = qdb.processing_job.ProcessingWorkflow.from_scratch(
exp_user, exp_params, name=name)
exp_user, exp_params, name=name, force=True)

parent = tester.graph.nodes()[0]
connections = {parent: {'demultiplexed': 'input_data'}}
Expand All @@ -910,7 +922,7 @@ def test_remove(self):
name = "Test processing workflow"

tester = qdb.processing_job.ProcessingWorkflow.from_default_workflow(
exp_user, dflt_wf, req_params, name=name)
exp_user, dflt_wf, req_params, name=name, force=True)

tester.remove(tester.graph.edges()[0][0], cascade=True)

Expand All @@ -929,7 +941,7 @@ def test_remove_error(self):
name = "Test processing workflow"

tester = qdb.processing_job.ProcessingWorkflow.from_default_workflow(
exp_user, dflt_wf, req_params, name=name)
exp_user, dflt_wf, req_params, name=name, force=True)

with self.assertRaises(
qdb.exceptions.QiitaDBOperationNotPermittedError):
Expand Down
Loading