-
Notifications
You must be signed in to change notification settings - Fork 0
/
slurm.py
134 lines (126 loc) · 5.67 KB
/
slurm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from datetime import datetime
import json
from typing import Optional
from flask import current_app as app, Flask
from requests import Session
from slurm_rest import ApiException
from slurm_rest.apis import SlurmApi
from slurm_rest.models import (
V0037JobSubmission,
V0037JobProperties,
V0037JobSubmissionResponse,
)
from .models import db, Analysis, AnalysisState
# Slurm notes:
# environment and cwd are required job properties at minimum
# by default, stdout is captured and written to {cwd}/slurm-{id}.out as the executing user
# "~" in job script expands to /var/run/slurmrest
# job script itself is written to /var/spool/slurm/d/job{id}/slurm_script
# job script runs in a login shell
def run_crg2_on_family(analysis: Analysis) -> Optional[V0037JobSubmissionResponse]:
"""
Precondition: this analysis is a valid trio-ish analysis and has relevant relationships loaded
"""
# Move to a job queue if it blocks requests for too long
datasets = analysis.datasets
family_codename = datasets[0].tissue_sample.participant.family.family_codename
family_codename = family_codename.replace("'", "\\'") # escape quotes for shell
files = {
dataset.tissue_sample.participant.participant_codename: [
file.path for file in dataset.linked_files
]
for dataset in datasets
}
# Will only be used for capturing stdout/stderr instead of explicitly for each
cwd = app.config["SLURM_PWD"]
api_instance: SlurmApi = app.extensions["slurm"]
try:
# This should already be safely shell-escaped so there's no arbitrary code execution
# but if there are further issues then pass the user inputs in through the environment
submitted_job = api_instance.slurmctld_submit_job(
V0037JobSubmission(
script=f"""#!/bin/bash
exec '{app.config["CRG2_ENTRYPOINT"]}' {analysis.analysis_id} '{family_codename}' '{json.dumps(files)}'
""",
job=V0037JobProperties(
environment={"STAGER": True},
current_working_directory=cwd,
name=f"Stager-CRG2 (analysis {analysis.analysis_id}, family {family_codename})",
standard_output=f"stager-crg2-{analysis.analysis_id}.out",
memory_per_node=4096, # MB, equivalent to --mem and SBATCH_MEM_PER_NODE
time_limit=3000, # minutes, 50 hours, equivalent to --time and SBATCH_TIMELIMIT
# partition, nodes, and CPUs are left implied
),
)
)
app.logger.info(
f"Submitted analysis {analysis.analysis_id} to scheduler: {submitted_job}"
)
return submitted_job
except ApiException as e:
app.logger.warn(
f"Exception when calling slurmctld_submit_job for analysis {analysis.analysis_id}",
exc_info=e,
)
def poll_slurm(app: Flask) -> None:
"""
This is a scheduled background task. Because it runs in a separate thread,
it pushes an app context for itself.
"""
with app.app_context():
running_analyses = Analysis.query.filter(
Analysis.analysis_state == AnalysisState.Running,
Analysis.scheduler_id != None,
).all()
app.logger.info(f"Found {len(running_analyses)} running analyses to poll.")
if len(running_analyses):
analyses = {
analysis.scheduler_id: analysis for analysis in running_analyses
}
# api_instance.slurmctld_get_job and api_instance.slurmctld_get_jobs
# are broken due to Slurm not respecting its own OpenAPI schema on the
# typing of .array_job_id (returns an int 0 for a string type)
# The autogenerated V0037JobsResponse will reject this
# api_instance: SlurmApi = app.extensions["slurm"]
session: Session = app.extensions["slurm-requests"]
response = session.get(
app.config["SLURM_ENDPOINT"] + "/slurm/v0.0.37/jobs",
headers={
"X-SLURM-USER-NAME": app.config["SLURM_USER"],
"X-SLURM-USER-TOKEN": app.config["SLURM_JWT"],
},
)
response.raise_for_status()
result = response.json()
if len(result["errors"]) > 0:
app.logger.warning(result["errors"])
for job in result["jobs"]:
job_id = job["job_id"]
if job_id not in analyses:
continue
job_state = job["job_state"]
end_time = datetime.fromtimestamp(job["end_time"])
# V0037JobResponseProperties
# https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES
if job_state in [
"BOOT_FAIL",
"CANCELLED",
"DEADLINE",
"FAILED",
"NODE_FAIL",
"OUT_OF_MEMORY",
"PREEMPTED",
"TIMEOUT",
]:
analyses[job_id].analysis_state = AnalysisState.Error
analyses[job_id].finished = end_time
app.logger.warning(
f"Slurm {job_id} (analysis {analyses[job_id].analysis_id}): {job_state}"
)
elif job_state == "COMPLETED":
analyses[job_id].analysis_state = AnalysisState.Done
analyses[job_id].finished = end_time
app.logger.info(
f"Slurm {job_id} (analysis {analyses[job_id].analysis_id}): {job_state}"
)
db.session.commit()