Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions qiita_db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,3 +1177,47 @@ def _add_file(self, filename, filetype, data_type=None):
VALUES (%s, %s{1})""".format(col, dtid)
qdb.sql_connection.TRN.add(sql, [self._id, fpid])
qdb.sql_connection.TRN.execute()

def _slurm_reservation(self):
"""Helper method for the slurm_reservation property"""
with qdb.sql_connection.TRN:
sql = """SELECT slurm_reservation
FROM qiita.{0}
WHERE analysis_id = %s""".format(self._table)
qdb.sql_connection.TRN.add(sql, [self._id])
return qdb.sql_connection.TRN.execute_fetchflatten()

@property
def slurm_reservation(self):
"""Returns a valid reservation if it exists

Returns
-------
str or None
returns the slurm reservation or None
"""
slurm_reservation = self._slurm_reservation()

if slurm_reservation:
slurm_reservation = slurm_reservation[0]
cmd = f"scontrol show reservations {slurm_reservation}"
p_out, p_err, rv = qdb.processing_job._system_call(cmd)
if rv == 0:
return slurm_reservation

return None

@slurm_reservation.setter
def slurm_reservation(self, slurm_reservation):
"""Changes the slurm reservation of the analysis

Parameters
----------
slurm_reservation : str
New slurm_reservation for the analysis
"""
sql = """UPDATE qiita.{0}
SET slurm_reservation = %s
WHERE analysis_id = %s""".format(self._table)
qdb.sql_connection.perform_as_transaction(
sql, [slurm_reservation, self._id])
6 changes: 4 additions & 2 deletions qiita_db/handlers/tests/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ def test_post_job_success(self):
self.assertIsNotNone(cj)
# additionally we can test that job.print_trace is correct
self.assertEqual(job.trace, [
f'{job.id} [Not Available] - Validate',
f' {cj.id} [{cj.external_id}]'])
f'{job.id} [Not Available]: Validate | '
'-p qiita -N 1 -n 1 --mem 90gb --time 150:00:00 --nice 10000',
f' {cj.id} [{cj.external_id}] | '
'-p qiita -N 1 -n 1 --mem 16gb --time 10:00:00 --nice 10000'])

def test_post_job_success_with_archive(self):
pt = npt.assert_warns(
Expand Down
43 changes: 34 additions & 9 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ def by_external_id(cls, external_id):
qdb.sql_connection.TRN.add(sql, [external_id])
return cls(qdb.sql_connection.TRN.execute_fetchlast())

def get_resource_allocation_info(self):
@property
def resource_allocation_info(self):
"""Return resource allocation defined for this job. For
external computational resources only.

Expand All @@ -408,28 +409,40 @@ def get_resource_allocation_info(self):
A resource allocation string useful to the external resource
"""
with qdb.sql_connection.TRN:
analysis = None
if self.command.name == 'complete_job':
jtype = 'COMPLETE_JOBS_RESOURCE_PARAM'
v = loads(self.parameters.values['payload'])
params = self.parameters.values
v = loads(params['payload'])
# assume an empty string for name is preferable to None
name = ''
if v['artifacts'] is not None:
an_element = list(v['artifacts'].keys())[0]
name = v['artifacts'][an_element]['artifact_type']
ia = ProcessingJob(params['job_id']).input_artifacts
if ia:
analysis = ia[0].analysis
elif self.command.name == 'release_validators':
jtype = 'RELEASE_VALIDATORS_RESOURCE_PARAM'
tmp = ProcessingJob(self.parameters.values['job'])
name = tmp.parameters.command.name
analysis = tmp.input_artifacts[0].analysis
elif self.command.name == 'Validate':
jtype = 'VALIDATOR'
name = self.parameters.values['artifact_type']
vals = self.parameters.values
name = vals['artifact_type']
if vals['analysis'] is not None:
analysis = qdb.analysis.Analysis(vals['analysis'])
elif self.id == 'register':
jtype = 'REGISTER'
name = 'REGISTER'
else:
# assume anything else is a command
jtype = 'RESOURCE_PARAMS_COMMAND'
name = self.command.name
ia = self.input_artifacts
if ia:
analysis = ia[0].analysis

# first, query for resources matching name and type
sql = """SELECT allocation FROM
Expand All @@ -454,6 +467,13 @@ def get_resource_allocation_info(self):
"Could not match %s to a resource allocation!" % name)

allocation = result[0]
# adding user_level extra parameters
allocation = f'{allocation} {self.user.slurm_parameters}'.strip()
# adding analysis reservation
if analysis is not None:
sr = analysis.slurm_reservation
if sr is not None:
allocation = f'{allocation} --reservation {sr}'

if ('{samples}' in allocation or '{columns}' in allocation or
'{input_size}' in allocation):
Expand Down Expand Up @@ -1005,7 +1025,7 @@ def submit(self, parent_job_id=None, dependent_jobs_list=None):
# before returning immediately, usually with a job ID that can
# be used to monitor the job's progress.

resource_params = self.get_resource_allocation_info()
resource_params = self.resource_allocation_info

# note that parent_job_id is being passed transparently from
# submit declaration to the launcher.
Expand Down Expand Up @@ -2009,18 +2029,23 @@ def complete_processing_job(self):
def trace(self):
""" Returns as a text array the full trace of the job, from itself
to validators and complete jobs"""
lines = [f'{self.id} [{self.external_id}] - {self.command.name}']
lines = [f'{self.id} [{self.external_id}]: '
f'{self.command.name} | {self.resource_allocation_info}']
cjob = self.complete_processing_job
if cjob is not None:
lines.append(f' {cjob.id} [{cjob.external_id}]')
lines.append(f' {cjob.id} [{cjob.external_id}] | '
f'{cjob.resource_allocation_info}')
vjob = self.release_validator_job
if vjob is not None:
lines.append(f' {vjob.id} [{vjob.external_id}]')
lines.append(f' {vjob.id} [{vjob.external_id}] '
f'| {vjob.resource_allocation_info}')
for v in self.validator_jobs:
lines.append(f' {v.id} [{v.external_id}] - {v.command.name}')
lines.append(f' {v.id} [{v.external_id}]: '
f'{v.command.name} | {v.resource_allocation_info}')
cjob = v.complete_processing_job
if cjob is not None:
lines.append(f' {cjob.id} [{cjob.external_id}]')
lines.append(f' {cjob.id} [{cjob.external_id}] '
f'| {cjob.resource_allocation_info}')
return lines


Expand Down
8 changes: 8 additions & 0 deletions qiita_db/support_files/patches/89.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Nov 1, 2023
-- add creation_job_id to qiita.prep_template
ALTER TABLE qiita.analysis ADD slurm_reservation VARCHAR DEFAULT '' NOT NULL;
ALTER TABLE qiita.user_level ADD slurm_parameters VARCHAR DEFAULT '--nice 10000' NOT NULL;

UPDATE qiita.user_level SET slurm_parameters = '--nice 5000' WHERE name = 'admin';

UPDATE qiita.user_level SET slurm_parameters = '' WHERE name = 'wet-lab admin';
9 changes: 8 additions & 1 deletion qiita_db/support_files/qiita-db.dbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<project name="PostgreSQL" id="Project_fb1" database="PostgreSQL" >
<schema name="qiita" >
<table name="analysis" >
<comment><![CDATA[hHolds analysis information]]></comment>
<comment><![CDATA[Holds analysis information]]></comment>
<column name="analysis_id" type="bigint" length="19" decimal="0" jt="-5" mandatory="y" >
<defo><![CDATA[nextval('qiita.analysis_analysis_id_seq'::regclass)]]></defo>
<comment><![CDATA[Unique identifier for analysis]]></comment>
Expand All @@ -24,6 +24,9 @@
<defo><![CDATA[false]]></defo>
</column>
<column name="logging_id" type="bigint" length="19" decimal="0" jt="-5" />
<column name="slurm_reservation" type="varchar" jt="12" mandatory="y" >
<defo><![CDATA['']]></defo>
</column>
<index name="pk_analysis" unique="PRIMARY_KEY" >
<column name="analysis_id" />
</index>
Expand Down Expand Up @@ -1722,6 +1725,10 @@ Controlled Vocabulary]]></comment>
<comment><![CDATA[One of the user levels (admin, user, guest, etc)]]></comment>
</column>
<column name="description" type="text" decimal="0" jt="12" mandatory="y" />
<column name="slurm_parameters" type="varchar" jt="12" mandatory="y" >
<defo><![CDATA['']]></defo>
<comment><![CDATA[Specific per_user_level slurm parameters]]></comment>
</column>
<index name="pk_user_level" unique="PRIMARY_KEY" >
<column name="user_level_id" />
</index>
Expand Down
Loading