Skip to content

Multiple inputs add default workflow #3371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
66 changes: 51 additions & 15 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,16 +794,25 @@ def _get_predecessors(workflow, node):
# recursive method to get predecessors of a given node
pred = []

for pnode in workflow.graph.predecessors(node):
parents = list(workflow.graph.predecessors(node))
for pnode in parents:
pred = _get_predecessors(workflow, pnode)
cxns = {x[0]: x[2]
for x in workflow.graph.get_edge_data(
pnode, node)['connections'].connections}
data = [pnode, node, cxns]
if pred is None:
pred = [data]
else:
pred.append(data)
pred = []

# making sure that if the node has extra parents they are
# generated first
parents.remove(pnode)
if parents:
for pnode in parents:
# [-1] just adding the parent and not its ancestors
pred.extend([_get_predecessors(workflow, pnode)[-1]])

pred.append(data)
return pred

# Note: we are going to use the final BIOMs to figure out which
Expand Down Expand Up @@ -894,8 +903,9 @@ def _get_predecessors(workflow, node):
# let's just keep one, let's give it preference to the one with the
# most total_conditions_satisfied
_, wk = sorted(workflows, key=lambda x: x[0], reverse=True)[0]
GH = wk.graph
missing_artifacts = dict()
for node, degree in wk.graph.out_degree():
for node, degree in GH.out_degree():
if degree != 0:
continue
mscheme = _get_node_info(wk, node)
Expand All @@ -920,7 +930,7 @@ def _get_predecessors(workflow, node):
icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])
cmds_to_create.append([cdp, cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
Expand All @@ -942,7 +952,7 @@ def _get_predecessors(workflow, node):
'be applied')
reqp[x] = wkartifact_type

cmds_to_create.append([pdp_cmd, params, reqp])
cmds_to_create.append([pdp, pdp_cmd, params, reqp])

if starting_job is not None:
init_artifacts = {
Expand All @@ -953,14 +963,16 @@ def _get_predecessors(workflow, node):
cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
previous_dps = dict()
for i, (dp, cmd, params, rp) in enumerate(cmds_to_create):
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job

req_params = dict()
if previous_job is None:
req_params = dict()
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
Expand All @@ -970,12 +982,35 @@ def _get_predecessors(workflow, node):
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
if len(dp.command.required_parameters) > 1:
for pn in GH.predecessors(node):
info = _get_node_info(wk, pn)
n, cnx, _ = GH.get_edge_data(
pn, node)['connections'].connections[0]
if info not in merging_schemes or \
n not in merging_schemes[info]:
msg = ('This workflow contains a step with '
'multiple inputs so it cannot be '
'completed automatically, please add '
'the commands by hand.')
raise ValueError(msg)
req_params[cnx] = merging_schemes[info][n]
else:
req_params = dict()
connections = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
connections[dname] = iname
if len(dp.command.required_parameters) == 1:
cxns = dict()
for iname, dname in rp.items():
req_params[iname] = f'{previous_job.id}{dname}'
cxns[dname] = iname
connections = {previous_job: cxns}
else:
connections = dict()
for pn in GH.predecessors(node):
pndp = pn.default_parameter
n, cnx, _ = GH.get_edge_data(
pn, node)['connections'].connections[0]
_job = previous_dps[pndp.id]
req_params[cnx] = f'{_job.id}{n}'
connections[_job] = {n: cnx}
params.update(req_params)
job_params = qdb.software.Parameters.load(
cmd, values_dict=params)
Expand All @@ -997,8 +1032,9 @@ def _get_predecessors(workflow, node):
else:
current_job = workflow.add(
job_params, req_params=req_params,
connections={previous_job: connections})
connections=connections)
previous_jobs[current_job] = params
previous_dps[dp.id] = current_job

return workflow

Expand Down
40 changes: 40 additions & 0 deletions qiita_db/metadata_template/test/test_prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,46 @@ def test_artifact_setter(self):
"the parameters are the same as jobs"):
pt.add_default_workflow(qdb.user.User('test@foo.bar'))

# Then, let's clean up again and add a new command/step with 2
# BIOM input artifacts
for pj in wk.graph.nodes:
pj._set_error('Killed')
cmd = qdb.software.Command.create(
qdb.software.Software(1), "Multiple BIOM as inputs", "", {
'req_artifact_1': ['artifact:["BIOM"]', None],
'req_artifact_2': ['artifact:["BIOM"]', None],
}, outputs={'MB-output': 'BIOM'})
cmd_dp = qdb.software.DefaultParameters.create("", cmd)
# creating the new node for the cmd and linking it's two inputs with
# two inputs
sql = f"""
INSERT INTO qiita.default_workflow_node (
default_workflow_id, default_parameter_set_id)
VALUES (1, {cmd_dp.id});
INSERT INTO qiita.default_workflow_edge (
parent_id, child_id)
VALUES (8, 10);
INSERT INTO qiita.default_workflow_edge (
parent_id, child_id)
VALUES (9, 10);
INSERT INTO qiita.default_workflow_edge_connections (
default_workflow_edge_id, parent_output_id, child_input_id)
VALUES (6, 3, 99);
INSERT INTO qiita.default_workflow_edge_connections (
default_workflow_edge_id, parent_output_id, child_input_id)
VALUES (7, 3, 100)
"""
qdb.sql_connection.perform_as_transaction(sql)
wk = pt.add_default_workflow(qdb.user.User('test@foo.bar'))
self.assertEqual(len(wk.graph.nodes), 6)
self.assertEqual(len(wk.graph.edges), 5)
self.assertCountEqual(
[x.command.name for x in wk.graph.nodes],
# we should have 2 split libraries and 3 close reference
['Split libraries FASTQ', 'Split libraries FASTQ',
'Pick closed-reference OTUs', 'Pick closed-reference OTUs',
'Pick closed-reference OTUs', 'Multiple BIOM as inputs'])

# now let's test that an error is raised when there is no valid initial
# input data; this moves the data type from FASTQ to taxa_summary for
# the default_workflow_id = 1
Expand Down