Skip to content

Commit b733ad3

Browse files
authored
initial fixes after qiita-rc 09.2023 (#3320)
* initial fixes after qiita-rc 09.2023 * use _update_and_launch_children * use atype * add _helper_update_children * rm extra line * mv children check out of TRN * main_starting_job * loop_starting_job
1 parent 119ae7c commit b733ad3

File tree

3 files changed

+51
-38
lines changed

3 files changed

+51
-38
lines changed

qiita_db/handlers/artifact.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,9 @@ def post(self):
315315
values['template'] = prep_id
316316
cmd = qdb.software.Command.get_validator(atype)
317317
params = qdb.software.Parameters.load(cmd, values_dict=values)
318-
if add_default_workflow:
318+
if add_default_workflow or add_default_workflow == 'True':
319319
pwk = qdb.processing_job.ProcessingWorkflow.from_scratch(
320-
user, params, name=f'ProcessingWorkflow for {job_id}')
320+
user, params, name=f'ProcessingWorkflow for {prep_id}')
321321
# the new job is the first job in the workflow
322322
new_job = list(pwk.graph.nodes())[0]
323323
# adding default pipeline to the preparation

qiita_db/metadata_template/prep_template.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -921,10 +921,11 @@ def _get_predecessors(workflow, node):
921921

922922
cmds_to_create.reverse()
923923
current_job = None
924+
loop_starting_job = starting_job
924925
for i, (cmd, params, rp) in enumerate(cmds_to_create):
925-
if starting_job is not None:
926-
previous_job = starting_job
927-
starting_job = None
926+
if loop_starting_job is not None:
927+
previous_job = loop_starting_job
928+
loop_starting_job = None
928929
else:
929930
previous_job = current_job
930931
if previous_job is None:

qiita_db/processing_job.py

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ def get_resource_allocation_info(self):
513513
part = f'{days}-{str(td)}'
514514
else:
515515
part = str(td)
516+
part = part.split('.')[0]
516517
else:
517518
part = naturalsize(
518519
value, gnu=True, format='%.0f')
@@ -1305,11 +1306,17 @@ def _complete_artifact_definition(self, artifact_data):
13051306
an = None
13061307
data_type = 'Job Output Folder'
13071308

1308-
qdb.artifact.Artifact.create(
1309+
artifact = qdb.artifact.Artifact.create(
13091310
filepaths, atype, prep_template=pt, analysis=an,
13101311
data_type=data_type, name=job_params['name'])
13111312
self._set_status('success')
13121313

1314+
# we need to update the children jobs to replace the input
1315+
# for the newly created artifact via the validator
1316+
for c in self.children:
1317+
self._helper_update_children({atype: artifact.id})
1318+
c.submit()
1319+
13131320
def _complete_artifact_transformation(self, artifacts_data):
13141321
"""Performs the needed steps to complete an artifact transformation job
13151322
@@ -1678,6 +1685,42 @@ def validator_jobs(self):
16781685
for jid in qdb.sql_connection.TRN.execute_fetchflatten():
16791686
yield ProcessingJob(jid)
16801687

1688+
def _helper_update_children(self, new_map):
1689+
ready = []
1690+
sql = """SELECT command_parameters, pending
1691+
FROM qiita.processing_job
1692+
WHERE processing_job_id = %s"""
1693+
sql_update = """UPDATE qiita.processing_job
1694+
SET command_parameters = %s,
1695+
pending = %s
1696+
WHERE processing_job_id = %s"""
1697+
sql_link = """INSERT INTO qiita.artifact_processing_job
1698+
(artifact_id, processing_job_id)
1699+
VALUES (%s, %s)"""
1700+
1701+
for c in self.children:
1702+
qdb.sql_connection.TRN.add(sql, [c.id])
1703+
params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
1704+
for pname, out_name in pending[self.id].items():
1705+
a_id = new_map[out_name]
1706+
params[pname] = str(a_id)
1707+
del pending[self.id]
1708+
# Link the input artifact with the child job
1709+
qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])
1710+
1711+
# Force to insert a NULL in the DB if pending is empty
1712+
pending = pending if pending else None
1713+
qdb.sql_connection.TRN.add(sql_update,
1714+
[dumps(params), pending, c.id])
1715+
qdb.sql_connection.TRN.execute()
1716+
1717+
if pending is None:
1718+
# The child already has all the parameters
1719+
# Add it to the ready list
1720+
ready.append(c)
1721+
1722+
return ready
1723+
16811724
def _update_children(self, mapping):
16821725
"""Updates the children of the current job to populate the input params
16831726
@@ -1691,7 +1734,6 @@ def _update_children(self, mapping):
16911734
list of qiita_db.processing_job.ProcessingJob
16921735
The list of childrens that are ready to be submitted
16931736
"""
1694-
ready = []
16951737
with qdb.sql_connection.TRN:
16961738
sql = """SELECT command_output_id, name
16971739
FROM qiita.command_output
@@ -1701,37 +1743,7 @@ def _update_children(self, mapping):
17011743
res = qdb.sql_connection.TRN.execute_fetchindex()
17021744
new_map = {name: mapping[oid] for oid, name in res}
17031745

1704-
sql = """SELECT command_parameters, pending
1705-
FROM qiita.processing_job
1706-
WHERE processing_job_id = %s"""
1707-
sql_update = """UPDATE qiita.processing_job
1708-
SET command_parameters = %s,
1709-
pending = %s
1710-
WHERE processing_job_id = %s"""
1711-
sql_link = """INSERT INTO qiita.artifact_processing_job
1712-
(artifact_id, processing_job_id)
1713-
VALUES (%s, %s)"""
1714-
for c in self.children:
1715-
qdb.sql_connection.TRN.add(sql, [c.id])
1716-
params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
1717-
for pname, out_name in pending[self.id].items():
1718-
a_id = new_map[out_name]
1719-
params[pname] = str(a_id)
1720-
del pending[self.id]
1721-
# Link the input artifact with the child job
1722-
qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])
1723-
1724-
# Force to insert a NULL in the DB if pending is empty
1725-
pending = pending if pending else None
1726-
qdb.sql_connection.TRN.add(sql_update,
1727-
[dumps(params), pending, c.id])
1728-
qdb.sql_connection.TRN.execute()
1729-
1730-
if pending is None:
1731-
# The child already has all the parameters
1732-
# Add it to the ready list
1733-
ready.append(c)
1734-
return ready
1746+
return self._helper_update_children(new_map)
17351747

17361748
def _update_and_launch_children(self, mapping):
17371749
"""Updates the children of the current job to populate the input params

0 commit comments

Comments
 (0)