Skip to content

add ebi updates and MaxRSS_helper #3349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions notebooks/resource-allocation/generate-allocation-summary-arrays.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
from qiita_core.util import MaxRSS_helper
from qiita_db.software import Software
import datetime
from io import StringIO
from subprocess import check_output
import pandas as pd
from os.path import join

# This is an example script to collect the data we need from SLURM, the plan
# is that in the near future we will clean up and add these to the Qiita's main
# code and then have cronjobs to run them.

# at time of writting we have:
# qp-spades spades
# (*) qp-woltka Woltka v0.1.4
# qp-woltka SynDNA Woltka
# qp-woltka Calculate Cell Counts
# (*) qp-meta Sortmerna v2.1b
# (*) qp-fastp-minimap2 Adapter and host filtering v2023.12
# ... and the admin plugin
# (*) qp-klp
# Here we are only going to create summaries for (*)


sacct = ['sacct', '-p',
'--format=JobName,JobID,ElapsedRaw,MaxRSS,ReqMem', '-j']
# for the non admin jobs, we will use jobs from the last six months
six_months = datetime.date.today() - datetime.timedelta(weeks=6*4)

print('The current "sofware - commands" that use job-arrays are:')
for s in Software.iter():
if 'ENVIRONMENT="' in s.environment_script:
for c in s.commands:
print(f"{s.name} - {c.name}")

# 1. Command: woltka

fn = join('/panfs', 'qiita', 'jobs_woltka.tsv.gz')
print(f"Generating the summary for the woltka jobs: {fn}.")

cmds = [c for s in Software.iter(False)
if 'woltka' in s.name for c in s.commands]
jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and
j.heartbeat.date() > six_months and j.input_artifacts]

data = []
for j in jobs:
size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths])
jid, mjid = j.external_id.strip().split()
rvals = StringIO(check_output(sacct + [jid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
jwt = _d.ElapsedRaw.max()

rvals = StringIO(check_output(sacct + [mjid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
mwt = _d.ElapsedRaw.max()

data.append({
'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main',
'db': j.parameters.values['Database'].split('/')[-1]})
data.append(
{'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge',
'db': j.parameters.values['Database'].split('/')[-1]})
df = pd.DataFrame(data)
df.to_csv(fn, sep='\t', index=False)

# 2. qp-meta Sortmerna

fn = join('/panfs', 'qiita', 'jobs_sortmerna.tsv.gz')
print(f"Generating the summary for the woltka jobs: {fn}.")

# for woltka we will only use jobs from the last 6 months
cmds = [c for s in Software.iter(False)
if 'minimap2' in s.name.lower() for c in s.commands]
jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and
j.heartbeat.date() > six_months and j.input_artifacts]

data = []
for j in jobs:
size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths])
jid, mjid = j.external_id.strip().split()
rvals = StringIO(check_output(sacct + [jid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
jwt = _d.ElapsedRaw.max()

rvals = StringIO(check_output(sacct + [mjid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
mwt = _d.ElapsedRaw.max()

data.append({
'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'})
data.append(
{'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'})
df = pd.DataFrame(data)
df.to_csv(fn, sep='\t', index=False)


# 3. Adapter and host filtering. Note that there is a new version deployed on
# Jan 2024 so the current results will not be the most accurate

fn = join('/panfs', 'qiita', 'jobs_adapter_host.tsv.gz')
print(f"Generating the summary for the woltka jobs: {fn}.")

# for woltka we will only use jobs from the last 6 months
cmds = [c for s in Software.iter(False)
if 'meta' in s.name.lower() for c in s.commands]
jobs = [j for c in cmds if 'sortmerna' in c.name.lower()
for j in c.processing_jobs if j.status == 'success' and
j.heartbeat.date() > six_months and j.input_artifacts]

data = []
for j in jobs:
size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop appears to be repeated several times. In the future we can generalize it into a function.

jid, mjid = j.external_id.strip().split()
rvals = StringIO(check_output(sacct + [jid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
jwt = _d.ElapsedRaw.max()

rvals = StringIO(check_output(sacct + [mjid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
mwt = _d.ElapsedRaw.max()

data.append({
'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'})
data.append(
{'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'})
df = pd.DataFrame(data)
df.to_csv(fn, sep='\t', index=False)


# 4. The SPP!

fn = join('/panfs', 'qiita', 'jobs_spp.tsv.gz')
print(f"Generating the summary for the SPP jobs: {fn}.")

# for the SPP we will look at jobs from the last year
year = datetime.date.today() - datetime.timedelta(days=365)
cmds = [c for s in Software.iter(False)
if s.name == 'qp-klp' for c in s.commands]
jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and
j.heartbeat.date() > year]

# for the SPP we need to find the jobs that were actually run, this means
# looping throught the existing slurm jobs and finding them
max_inter = 2000

data = []
for job in jobs:
jei = int(job.external_id)
rvals = StringIO(
check_output(sacct + [str(jei)]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
mem = _d.MaxRSS.apply(
lambda x: x if type(x) is not str else MaxRSS_helper(x)).max()
wt = _d.ElapsedRaw.max()
# the current "easy" way to determine if amplicon or other is to check
# the file extension of the filename
stype = 'other'
if job.parameters.values['sample_sheet']['filename'].endswith('.txt'):
stype = 'amplicon'
rid = job.parameters.values['run_identifier']
data.append(
{'jid': job.id, 'sjid': jei, 'mem': mem, 'stype': stype, 'wt': wt,
'type': 'main', 'rid': rid, 'name': _d.JobName[0]})

# let's look for the convert job
for jid in range(jei + 1, jei + max_inter):
rvals = StringIO(check_output(sacct + [str(jid)]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
if [1 for x in _d.JobName.values if x.startswith(job.id)]:
cjid = int(_d.JobID[0])
mem = _d.MaxRSS.apply(
lambda x: x if type(x) is not str else MaxRSS_helper(x)).max()
wt = _d.ElapsedRaw.max()

data.append(
{'jid': job.id, 'sjid': cjid, 'mem': mem, 'stype': stype,
'wt': wt, 'type': 'convert', 'rid': rid,
'name': _d.JobName[0]})

# now let's look for the next step, if amplicon that's fastqc but
# if other that's qc/nuqc
for jid in range(cjid + 1, cjid + max_inter):
rvals = StringIO(
check_output(sacct + [str(jid)]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
if [1 for x in _d.JobName.values if x.startswith(job.id)]:
qc_jid = _d.JobIDRaw.apply(
lambda x: int(x.split('.')[0])).max()
qcmem = _d.MaxRSS.apply(
lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
qcwt = _d.ElapsedRaw.max()

if stype == 'amplicon':
data.append(
{'jid': job.id, 'sjid': qc_jid, 'mem': qcmem,
'stype': stype, 'wt': qcwt, 'type': 'fastqc',
'rid': rid, 'name': _d.JobName[0]})
else:
data.append(
{'jid': job.id, 'sjid': qc_jid, 'mem': qcmem,
'stype': stype, 'wt': qcwt, 'type': 'qc',
'rid': rid, 'name': _d.JobName[0]})
for jid in range(qc_jid + 1, qc_jid + max_inter):
rvals = StringIO(check_output(
sacct + [str(jid)]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
if [1 for x in _d.JobName.values if x.startswith(
job.id)]:
fqc_jid = _d.JobIDRaw.apply(
lambda x: int(x.split('.')[0])).max()
fqcmem = _d.MaxRSS.apply(
lambda x: x if type(x) is not str
else MaxRSS_helper(x)).max()
fqcwt = _d.ElapsedRaw.max()
data.append(
{'jid': job.id, 'sjid': fqc_jid,
'mem': fqcmem, 'stype': stype,
'wt': fqcwt, 'type': 'fastqc',
'rid': rid, 'name': _d.JobName[0]})
break
break
break

df = pd.DataFrame(data)
df.to_csv(fn, sep='\t', index=False)
14 changes: 2 additions & 12 deletions notebooks/resource-allocation/generate-allocation-summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from json import loads
from os.path import join

from qiita_core.util import MaxRSS_helper
from qiita_db.exceptions import QiitaDBUnknownIDError
from qiita_db.processing_job import ProcessingJob
from qiita_db.software import Software
Expand Down Expand Up @@ -117,19 +118,8 @@
print('Make sure that only 0/K/M exist', set(
df.MaxRSS.apply(lambda x: str(x)[-1])))


def _helper(x):
if x[-1] == 'K':
y = float(x[:-1]) * 1000
elif x[-1] == 'M':
y = float(x[:-1]) * 1000000
else:
y = float(x)
return y


# Generating new columns
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: _helper(str(x)))
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
df['ElapsedRawTime'] = df.ElapsedRaw.apply(
lambda x: timedelta(seconds=float(x)))

Expand Down
16 changes: 15 additions & 1 deletion qiita_core/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from qiita_core.util import (
qiita_test_checker, execute_as_transaction, get_qiita_version,
is_test_environment, get_release_info)
is_test_environment, get_release_info, MaxRSS_helper)
from qiita_db.meta_util import (
generate_biom_and_metadata_release, generate_plugin_releases)
import qiita_db as qdb
Expand Down Expand Up @@ -82,6 +82,20 @@ def test_get_release_info(self):
self.assertEqual(biom_metadata_release, ('', '', ''))
self.assertNotEqual(archive_release, ('', '', ''))

def test_MaxRSS_helper(self):
tests = [
('6', 6.0),
('6K', 6000),
('6M', 6000000),
('6G', 6000000000),
('6.9', 6.9),
('6.9K', 6900),
('6.9M', 6900000),
('6.9G', 6900000000),
]
for x, y in tests:
self.assertEqual(MaxRSS_helper(x), y)


if __name__ == '__main__':
main()
12 changes: 12 additions & 0 deletions qiita_core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,15 @@ def get_release_info(study_status='public'):
archive_release = ((md5sum, filepath, timestamp))

return (biom_metadata_release, archive_release)


def MaxRSS_helper(x):
if x[-1] == 'K':
y = float(x[:-1]) * 1000
elif x[-1] == 'M':
y = float(x[:-1]) * 1000000
elif x[-1] == 'G':
y = float(x[:-1]) * 1000000000
else:
y = float(x)
return y
Loading