Skip to content

initial fixes after qiita-rc 09.2023 #3320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions qiita_db/handlers/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ def post(self):
values['template'] = prep_id
cmd = qdb.software.Command.get_validator(atype)
params = qdb.software.Parameters.load(cmd, values_dict=values)
if add_default_workflow:
if add_default_workflow or add_default_workflow == 'True':
pwk = qdb.processing_job.ProcessingWorkflow.from_scratch(
user, params, name=f'ProcessingWorkflow for {job_id}')
user, params, name=f'ProcessingWorkflow for {prep_id}')
# the new job is the first job in the workflow
new_job = list(pwk.graph.nodes())[0]
# adding default pipeline to the preparation
Expand Down
7 changes: 4 additions & 3 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,11 @@ def _get_predecessors(workflow, node):

cmds_to_create.reverse()
current_job = None
loop_starting_job = starting_job
for i, (cmd, params, rp) in enumerate(cmds_to_create):
if starting_job is not None:
previous_job = starting_job
starting_job = None
if loop_starting_job is not None:
previous_job = loop_starting_job
loop_starting_job = None
else:
previous_job = current_job
if previous_job is None:
Expand Down
78 changes: 45 additions & 33 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def get_resource_allocation_info(self):
part = f'{days}-{str(td)}'
else:
part = str(td)
part = part.split('.')[0]
else:
part = naturalsize(
value, gnu=True, format='%.0f')
Expand Down Expand Up @@ -1305,11 +1306,17 @@ def _complete_artifact_definition(self, artifact_data):
an = None
data_type = 'Job Output Folder'

qdb.artifact.Artifact.create(
artifact = qdb.artifact.Artifact.create(
filepaths, atype, prep_template=pt, analysis=an,
data_type=data_type, name=job_params['name'])
self._set_status('success')

# we need to update the children jobs to replace the input
# for the newly created artifact via the validator
for c in self.children:
self._helper_update_children({atype: artifact.id})
c.submit()

def _complete_artifact_transformation(self, artifacts_data):
"""Performs the needed steps to complete an artifact transformation job

Expand Down Expand Up @@ -1678,6 +1685,42 @@ def validator_jobs(self):
for jid in qdb.sql_connection.TRN.execute_fetchflatten():
yield ProcessingJob(jid)

def _helper_update_children(self, new_map):
ready = []
sql = """SELECT command_parameters, pending
FROM qiita.processing_job
WHERE processing_job_id = %s"""
sql_update = """UPDATE qiita.processing_job
SET command_parameters = %s,
pending = %s
WHERE processing_job_id = %s"""
sql_link = """INSERT INTO qiita.artifact_processing_job
(artifact_id, processing_job_id)
VALUES (%s, %s)"""

for c in self.children:
qdb.sql_connection.TRN.add(sql, [c.id])
params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
for pname, out_name in pending[self.id].items():
a_id = new_map[out_name]
params[pname] = str(a_id)
del pending[self.id]
# Link the input artifact with the child job
qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])

# Force to insert a NULL in the DB if pending is empty
pending = pending if pending else None
qdb.sql_connection.TRN.add(sql_update,
[dumps(params), pending, c.id])
qdb.sql_connection.TRN.execute()

if pending is None:
# The child already has all the parameters
# Add it to the ready list
ready.append(c)

return ready

def _update_children(self, mapping):
"""Updates the children of the current job to populate the input params

Expand All @@ -1691,7 +1734,6 @@ def _update_children(self, mapping):
list of qiita_db.processing_job.ProcessingJob
The list of childrens that are ready to be submitted
"""
ready = []
with qdb.sql_connection.TRN:
sql = """SELECT command_output_id, name
FROM qiita.command_output
Expand All @@ -1701,37 +1743,7 @@ def _update_children(self, mapping):
res = qdb.sql_connection.TRN.execute_fetchindex()
new_map = {name: mapping[oid] for oid, name in res}

sql = """SELECT command_parameters, pending
FROM qiita.processing_job
WHERE processing_job_id = %s"""
sql_update = """UPDATE qiita.processing_job
SET command_parameters = %s,
pending = %s
WHERE processing_job_id = %s"""
sql_link = """INSERT INTO qiita.artifact_processing_job
(artifact_id, processing_job_id)
VALUES (%s, %s)"""
for c in self.children:
qdb.sql_connection.TRN.add(sql, [c.id])
params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
for pname, out_name in pending[self.id].items():
a_id = new_map[out_name]
params[pname] = str(a_id)
del pending[self.id]
# Link the input artifact with the child job
qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])

# Force to insert a NULL in the DB if pending is empty
pending = pending if pending else None
qdb.sql_connection.TRN.add(sql_update,
[dumps(params), pending, c.id])
qdb.sql_connection.TRN.execute()

if pending is None:
# The child already has all the parameters
# Add it to the ready list
ready.append(c)
return ready
return self._helper_update_children(new_map)

def _update_and_launch_children(self, mapping):
"""Updates the children of the current job to populate the input params
Expand Down