Skip to content

Commit 45e345c

Browse files
committed
add ebi updates and MaxRSS_helper
1 parent af2ce16 commit 45e345c

File tree

5 files changed

+318
-39
lines changed

5 files changed

+318
-39
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
from qiita_core.util import MaxRSS_helper
2+
from qiita_db.software import Software
3+
import datetime
4+
from io import StringIO
5+
from subprocess import check_output
6+
import pandas as pd
7+
from os.path import join
8+
9+
# This is an example script to collect the data we need from SLURM, the plan
10+
# is that in the near future we will clean up and add these to the Qiita's main
11+
# code and then have cronjobs to run them.
12+
13+
# at time of writting we have:
14+
# qp-spades spades
15+
# (*) qp-woltka Woltka v0.1.4
16+
# qp-woltka SynDNA Woltka
17+
# qp-woltka Calculate Cell Counts
18+
# (*) qp-meta Sortmerna v2.1b
19+
# (*) qp-fastp-minimap2 Adapter and host filtering v2023.12
20+
# ... and the admin plugin
21+
# (*) qp-klp
22+
# Here we are only going to create summaries for (*)
23+
24+
25+
sacct = ['sacct', '-p',
26+
'--format=JobName,JobID,ElapsedRaw,MaxRSS,ReqMem', '-j']
27+
# for the non admin jobs, we will use jobs from the last six months
28+
six_months = datetime.date.today() - datetime.timedelta(weeks=6*4)
29+
30+
print('The current "sofware - commands" that use job-arrays are:')
31+
for s in Software.iter():
32+
if 'ENVIRONMENT="' in s.environment_script:
33+
for c in s.commands:
34+
print(f"{s.name} - {c.name}")
35+
36+
# 1. Command: woltka
37+
38+
fn = join('/panfs', 'qiita', 'jobs_woltka.tsv.gz')
39+
print(f"Generating the summary for the woltka jobs: {fn}.")
40+
41+
cmds = [c for s in Software.iter(False)
42+
if 'woltka' in s.name for c in s.commands]
43+
jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and
44+
j.heartbeat.date() > six_months and j.input_artifacts]
45+
46+
data = []
47+
for j in jobs:
48+
size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths])
49+
jid, mjid = j.external_id.strip().split()
50+
rvals = StringIO(check_output(sacct + [jid]).decode('ascii'))
51+
_d = pd.read_csv(rvals, sep='|')
52+
jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
53+
else MaxRSS_helper(x)).max()
54+
jwt = _d.ElapsedRaw.max()
55+
56+
rvals = StringIO(check_output(sacct + [mjid]).decode('ascii'))
57+
_d = pd.read_csv(rvals, sep='|')
58+
mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
59+
else MaxRSS_helper(x)).max()
60+
mwt = _d.ElapsedRaw.max()
61+
62+
data.append({
63+
'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main',
64+
'db': j.parameters.values['Database'].split('/')[-1]})
65+
data.append(
66+
{'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge',
67+
'db': j.parameters.values['Database'].split('/')[-1]})
68+
df = pd.DataFrame(data)
69+
df.to_csv(fn, sep='\t', index=False)
70+
71+
# 2. qp-meta Sortmerna
72+
73+
fn = join('/panfs', 'qiita', 'jobs_sortmerna.tsv.gz')
74+
print(f"Generating the summary for the woltka jobs: {fn}.")
75+
76+
# for woltka we will only use jobs from the last 6 months
77+
cmds = [c for s in Software.iter(False)
78+
if 'minimap2' in s.name.lower() for c in s.commands]
79+
jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and
80+
j.heartbeat.date() > six_months and j.input_artifacts]
81+
82+
data = []
83+
for j in jobs:
84+
size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths])
85+
jid, mjid = j.external_id.strip().split()
86+
rvals = StringIO(check_output(sacct + [jid]).decode('ascii'))
87+
_d = pd.read_csv(rvals, sep='|')
88+
jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
89+
else MaxRSS_helper(x)).max()
90+
jwt = _d.ElapsedRaw.max()
91+
92+
rvals = StringIO(check_output(sacct + [mjid]).decode('ascii'))
93+
_d = pd.read_csv(rvals, sep='|')
94+
mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
95+
else MaxRSS_helper(x)).max()
96+
mwt = _d.ElapsedRaw.max()
97+
98+
data.append({
99+
'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'})
100+
data.append(
101+
{'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'})
102+
df = pd.DataFrame(data)
103+
df.to_csv(fn, sep='\t', index=False)
104+
105+
106+
# 3. Adapter and host filtering. Note that there is a new version deployed on
107+
# Jan 2024 so the current results will not be the most accurate
108+
109+
fn = join('/panfs', 'qiita', 'jobs_adapter_host.tsv.gz')
110+
print(f"Generating the summary for the woltka jobs: {fn}.")
111+
112+
# for woltka we will only use jobs from the last 6 months
113+
cmds = [c for s in Software.iter(False)
114+
if 'meta' in s.name.lower() for c in s.commands]
115+
jobs = [j for c in cmds if 'sortmerna' in c.name.lower()
116+
for j in c.processing_jobs if j.status == 'success' and
117+
j.heartbeat.date() > six_months and j.input_artifacts]
118+
119+
data = []
120+
for j in jobs:
121+
size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths])
122+
jid, mjid = j.external_id.strip().split()
123+
rvals = StringIO(check_output(sacct + [jid]).decode('ascii'))
124+
_d = pd.read_csv(rvals, sep='|')
125+
jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
126+
else MaxRSS_helper(x)).max()
127+
jwt = _d.ElapsedRaw.max()
128+
129+
rvals = StringIO(check_output(sacct + [mjid]).decode('ascii'))
130+
_d = pd.read_csv(rvals, sep='|')
131+
mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str
132+
else MaxRSS_helper(x)).max()
133+
mwt = _d.ElapsedRaw.max()
134+
135+
data.append({
136+
'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'})
137+
data.append(
138+
{'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'})
139+
df = pd.DataFrame(data)
140+
df.to_csv(fn, sep='\t', index=False)
141+
142+
143+
# 4. The SPP!
144+
145+
fn = join('/panfs', 'qiita', 'jobs_spp.tsv.gz')
146+
print(f"Generating the summary for the SPP jobs: {fn}.")
147+
148+
# for the SPP we will look at jobs from the last year
149+
year = datetime.date.today() - datetime.timedelta(days=365)
150+
cmds = [c for s in Software.iter(False)
151+
if s.name == 'qp-klp' for c in s.commands]
152+
jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and
153+
j.heartbeat.date() > year]
154+
155+
# for the SPP we need to find the jobs that were actually run, this means
156+
# looping throught the existing slurm jobs and finding them
157+
max_inter = 2000
158+
159+
data = []
160+
for job in jobs:
161+
jei = int(job.external_id)
162+
rvals = StringIO(
163+
check_output(sacct + [str(jei)]).decode('ascii'))
164+
_d = pd.read_csv(rvals, sep='|')
165+
mem = _d.MaxRSS.apply(
166+
lambda x: x if type(x) is not str else MaxRSS_helper(x)).max()
167+
wt = _d.ElapsedRaw.max()
168+
# the current "easy" way to determine if amplicon or other is to check
169+
# the file extension of the filename
170+
stype = 'other'
171+
if job.parameters.values['sample_sheet']['filename'].endswith('.txt'):
172+
stype = 'amplicon'
173+
rid = job.parameters.values['run_identifier']
174+
data.append(
175+
{'jid': job.id, 'sjid': jei, 'mem': mem, 'stype': stype, 'wt': wt,
176+
'type': 'main', 'rid': rid, 'name': _d.JobName[0]})
177+
178+
# let's look for the convert job
179+
for jid in range(jei + 1, jei + max_inter):
180+
rvals = StringIO(check_output(sacct + [str(jid)]).decode('ascii'))
181+
_d = pd.read_csv(rvals, sep='|')
182+
if [1 for x in _d.JobName.values if x.startswith(job.id)]:
183+
cjid = int(_d.JobID[0])
184+
mem = _d.MaxRSS.apply(
185+
lambda x: x if type(x) is not str else MaxRSS_helper(x)).max()
186+
wt = _d.ElapsedRaw.max()
187+
188+
data.append(
189+
{'jid': job.id, 'sjid': cjid, 'mem': mem, 'stype': stype,
190+
'wt': wt, 'type': 'convert', 'rid': rid,
191+
'name': _d.JobName[0]})
192+
193+
# now let's look for the next step, if amplicon that's fastqc but
194+
# if other that's qc/nuqc
195+
for jid in range(cjid + 1, cjid + max_inter):
196+
rvals = StringIO(
197+
check_output(sacct + [str(jid)]).decode('ascii'))
198+
_d = pd.read_csv(rvals, sep='|')
199+
if [1 for x in _d.JobName.values if x.startswith(job.id)]:
200+
qc_jid = _d.JobIDRaw.apply(
201+
lambda x: int(x.split('.')[0])).max()
202+
qcmem = _d.MaxRSS.apply(
203+
lambda x: x if type(x) is not str
204+
else MaxRSS_helper(x)).max()
205+
qcwt = _d.ElapsedRaw.max()
206+
207+
if stype == 'amplicon':
208+
data.append(
209+
{'jid': job.id, 'sjid': qc_jid, 'mem': qcmem,
210+
'stype': stype, 'wt': qcwt, 'type': 'fastqc',
211+
'rid': rid, 'name': _d.JobName[0]})
212+
else:
213+
data.append(
214+
{'jid': job.id, 'sjid': qc_jid, 'mem': qcmem,
215+
'stype': stype, 'wt': qcwt, 'type': 'qc',
216+
'rid': rid, 'name': _d.JobName[0]})
217+
for jid in range(qc_jid + 1, qc_jid + max_inter):
218+
rvals = StringIO(check_output(
219+
sacct + [str(jid)]).decode('ascii'))
220+
_d = pd.read_csv(rvals, sep='|')
221+
if [1 for x in _d.JobName.values if x.startswith(
222+
job.id)]:
223+
fqc_jid = _d.JobIDRaw.apply(
224+
lambda x: int(x.split('.')[0])).max()
225+
fqcmem = _d.MaxRSS.apply(
226+
lambda x: x if type(x) is not str
227+
else MaxRSS_helper(x)).max()
228+
fqcwt = _d.ElapsedRaw.max()
229+
data.append(
230+
{'jid': job.id, 'sjid': fqc_jid,
231+
'mem': fqcmem, 'stype': stype,
232+
'wt': fqcwt, 'type': 'fastqc',
233+
'rid': rid, 'name': _d.JobName[0]})
234+
break
235+
break
236+
break
237+
238+
df = pd.DataFrame(data)
239+
df.to_csv(fn, sep='\t', index=False)

notebooks/resource-allocation/generate-allocation-summary.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from json import loads
66
from os.path import join
77

8+
from qiita_core.util import MaxRSS_helper
89
from qiita_db.exceptions import QiitaDBUnknownIDError
910
from qiita_db.processing_job import ProcessingJob
1011
from qiita_db.software import Software
@@ -117,19 +118,8 @@
117118
print('Make sure that only 0/K/M exist', set(
118119
df.MaxRSS.apply(lambda x: str(x)[-1])))
119120

120-
121-
def _helper(x):
122-
if x[-1] == 'K':
123-
y = float(x[:-1]) * 1000
124-
elif x[-1] == 'M':
125-
y = float(x[:-1]) * 1000000
126-
else:
127-
y = float(x)
128-
return y
129-
130-
131121
# Generating new columns
132-
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: _helper(str(x)))
122+
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
133123
df['ElapsedRawTime'] = df.ElapsedRaw.apply(
134124
lambda x: timedelta(seconds=float(x)))
135125

qiita_core/tests/test_util.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from qiita_core.util import (
1212
qiita_test_checker, execute_as_transaction, get_qiita_version,
13-
is_test_environment, get_release_info)
13+
is_test_environment, get_release_info, MaxRSS_helper)
1414
from qiita_db.meta_util import (
1515
generate_biom_and_metadata_release, generate_plugin_releases)
1616
import qiita_db as qdb
@@ -82,6 +82,20 @@ def test_get_release_info(self):
8282
self.assertEqual(biom_metadata_release, ('', '', ''))
8383
self.assertNotEqual(archive_release, ('', '', ''))
8484

85+
def test_MaxRSS_helper(self):
86+
tests = [
87+
('6', 6.0),
88+
('6K', 6000),
89+
('6M', 6000000),
90+
('6G', 6000000000),
91+
('6.9', 6.9),
92+
('6.9K', 6900),
93+
('6.9M', 6900000),
94+
('6.9G', 6900000000),
95+
]
96+
for x, y in tests:
97+
self.assertEqual(MaxRSS_helper(x), y)
98+
8599

86100
if __name__ == '__main__':
87101
main()

qiita_core/util.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,15 @@ def get_release_info(study_status='public'):
151151
archive_release = ((md5sum, filepath, timestamp))
152152

153153
return (biom_metadata_release, archive_release)
154+
155+
156+
def MaxRSS_helper(x):
157+
if x[-1] == 'K':
158+
y = float(x[:-1]) * 1000
159+
elif x[-1] == 'M':
160+
y = float(x[:-1]) * 1000000
161+
elif x[-1] == 'G':
162+
y = float(x[:-1]) * 1000000000
163+
else:
164+
y = float(x)
165+
return y

0 commit comments

Comments
 (0)