Skip to content

Commit

Permalink
Merge pull request #1757 from buildtesters/paths_for_sched_libs_in_co…
Browse files Browse the repository at this point in the history
…nfiguration

Add support for 'paths' in configuration to allow one to search for scheduler binary in non-standard location
  • Loading branch information
shahzebsiddiqui authored Apr 19, 2024
2 parents a1bf767 + 1976fa2 commit 50deac0
Show file tree
Hide file tree
Showing 31 changed files with 308 additions and 144 deletions.
2 changes: 1 addition & 1 deletion aws_oddc/sleep.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
buildspecs:
hostname_test:
type: script
executor: generic.torque.e4spro
executor: generic.torque.lbl
description: run sleep for 5 seconds
pbs: ["-l nodes=1"]
run: |
Expand Down
12 changes: 6 additions & 6 deletions buildtest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
console,
)
from buildtest.exceptions import BuildTestError, ConfigurationError
from buildtest.scheduler.detection import LSF, PBS, Cobalt, Slurm, Torque
from buildtest.schemas.defaults import custom_validator
from buildtest.schemas.utils import load_recipe, load_schema
from buildtest.system import LSF, PBS, Cobalt, Slurm, Torque
from buildtest.utils.file import resolve_path
from buildtest.utils.shell import Shell
from buildtest.utils.tools import deep_get
Expand Down Expand Up @@ -259,7 +259,7 @@ def _validate_lsf_executors(self):

executor_type = "lsf"

lsf = LSF()
lsf = LSF(custom_dirs=deep_get(self.target_config, "paths", "lsf"))
if not lsf.active():
return

Expand Down Expand Up @@ -296,7 +296,7 @@ def _validate_slurm_executors(self):
return

executor_type = "slurm"
slurm = Slurm()
slurm = Slurm(custom_dirs=deep_get(self.target_config, "paths", "slurm"))

if not slurm.active():
return
Expand Down Expand Up @@ -349,7 +349,7 @@ def _validate_cobalt_executors(self):

executor_type = "cobalt"

cobalt = Cobalt()
cobalt = Cobalt(custom_dirs=deep_get(self.target_config, "paths", "cobalt"))
if not cobalt.active():
return

Expand Down Expand Up @@ -418,7 +418,7 @@ def _validate_pbs_executors(self):

executor_type = "pbs"

pbs = PBS()
pbs = PBS(custom_dirs=deep_get(self.target_config, "paths", "pbs"))
if not pbs.active():
return

Expand Down Expand Up @@ -453,7 +453,7 @@ def _validate_torque_executors(self):

executor_type = "torque"

torque = Torque()
torque = Torque(custom_dirs=deep_get(self.target_config, "paths", "torque"))
if not torque.active():
return

Expand Down
2 changes: 0 additions & 2 deletions buildtest/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,12 @@ def _cancel_job_if_elapsedtime_exceeds_timeout(self, builder):
def _cancel_job_if_pendtime_exceeds_maxpendtime(self, builder):
builder.job.pendtime = time.time() - builder.job.submittime
builder.job.pendtime = round(builder.job.pendtime, 2)

if builder.job.pendtime > self.maxpendtime:
builder.job.cancel()
builder.failed()
console.print(
f"[blue]{builder}[/]: [red]Cancelling Job {builder.job.get()} because job exceeds max pend time of {self.maxpendtime} sec with current pend time of {builder.job.pendtime} sec[/red] "
)
return

def __str__(self):
return self.name
Expand Down
9 changes: 7 additions & 2 deletions buildtest/executors/cobalt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.cobalt import CobaltJob
from buildtest.utils.file import is_file, read_file
from buildtest.utils.tools import check_binaries, deep_get

logger = logging.getLogger(__name__)

Expand All @@ -36,9 +37,13 @@ def __init__(
super().__init__(name, settings, site_configs, timeout=timeout)

self.queue = self._settings.get("queue")
self.custom_dirs = deep_get(site_configs.target_config, "paths", "cobalt")

def launcher_command(self, numprocs, numnodes):
batch_cmd = ["qsub"]
self.cobalt_cmds = check_binaries(
["qsub", "qstat", "qdel"], custom_dirs=self.custom_dirs
)
batch_cmd = [self.cobalt_cmds["qsub"]]

if self.queue:
batch_cmd += [f"-q {self.queue}"]
Expand Down Expand Up @@ -86,7 +91,7 @@ def run(self, builder):
job_id = int(out)
builder.metadata["jobid"] = job_id

builder.job = CobaltJob(job_id)
builder.job = CobaltJob(job_id, self.cobalt_cmds)

msg = f"[blue]{builder}[/]: JobID: {builder.metadata['jobid']} dispatched to scheduler"
console.print(msg)
Expand Down
17 changes: 11 additions & 6 deletions buildtest/executors/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from buildtest.defaults import console
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.lsf import LSFJob
from buildtest.utils.tools import check_binaries, deep_get

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,12 +41,16 @@ def __init__(
)

self.queue = self._settings.get("queue")
self.custom_dirs = deep_get(site_configs.target_config, "paths", "lsf")

def launcher_command(self, numprocs=None, numnodes=None):
"""This command returns the launcher command and any options specified in configuration file. This
is useful when generating the build script in the BuilderBase class
"""
cmd = ["bsub"]
self.lsf_cmds = check_binaries(
["bsub", "bjobs", "bkill"], custom_dirs=self.custom_dirs
)
cmd = [self.lsf_cmds["bsub"]]

if self.queue:
cmd += [f"-q {self.queue}"]
Expand Down Expand Up @@ -94,25 +99,25 @@ def run(self, builder):
out = " ".join(out)
pattern = r"(\d+)"
# output in the form: 'Job <58654> is submitted to queue <batch>' and applying regular expression to get job ID
m = re.search(pattern, out)
regex_match = re.search(pattern, out)
self.logger.debug(f"Applying regular expression '{pattern}' to output: '{out}'")

# if there is no match we raise error
if not m:
if not regex_match:
self.logger.debug(f"Unable to find LSF Job ID in output: '{out}'")
builder.failed()
return builder

try:
job_id = int(m.group(0))
job_id = int(regex_match.group(0))
except ValueError:
self.logger.debug(
f"Unable to convert '{m.group(0)}' to int to extract Job ID"
f"Unable to convert '{regex_match.group(0)}' to int to extract Job ID"
)
builder.failed()
return builder

builder.job = LSFJob(job_id)
builder.job = LSFJob(job_id, self.lsf_cmds)

builder.metadata["jobid"] = job_id

Expand Down
22 changes: 19 additions & 3 deletions buildtest/executors/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from buildtest.defaults import console
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.pbs import PBSJob
from buildtest.scheduler.pbs import PBSJob, TorqueJob
from buildtest.utils.tools import check_binaries, deep_get

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,9 +38,21 @@ def __init__(
)

self.queue = self._settings.get("queue")
self.custom_dirs = None

if isinstance(self, PBSExecutor):
self.custom_dirs = deep_get(site_configs.target_config, "paths", "pbs")
elif isinstance(self, TorqueExecutor):
self.custom_dirs = deep_get(site_configs.target_config, "paths", "torque")

def launcher_command(self, numprocs=None, numnodes=None):
batch_cmd = ["qsub"]
batch_cmd = []

self.pbs_cmds = check_binaries(
["qsub", "qstat", "qdel"], custom_dirs=self.custom_dirs
)

batch_cmd += [self.pbs_cmds["qsub"]]

if self.queue:
batch_cmd += [f"-q {self.queue}"]
Expand Down Expand Up @@ -86,7 +99,10 @@ def run(self, builder):
out = command.get_output()
JobID = " ".join(out).strip()

builder.job = PBSJob(JobID)
if isinstance(self, TorqueExecutor):
builder.job = TorqueJob(JobID, self.pbs_cmds)
elif isinstance(self, PBSExecutor):
builder.job = PBSJob(JobID, self.pbs_cmds)

# store job id
builder.metadata["jobid"] = builder.job.get()
Expand Down
4 changes: 3 additions & 1 deletion buildtest/executors/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ def poll(self, pending_jobs):
jobs = [
builder
for builder in jobs
if builder.job.is_running() or builder.job.is_pending()
if builder.job.is_running()
or builder.job.is_pending()
or builder.job.is_suspended()
]

def _print_job_details(self, active_jobs):
Expand Down
15 changes: 12 additions & 3 deletions buildtest/executors/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from buildtest.defaults import console
from buildtest.executors.base import BaseExecutor
from buildtest.scheduler.slurm import SlurmJob
from buildtest.utils.tools import check_binaries, deep_get

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,10 +43,14 @@ def __init__(
self.cluster = self._settings.get("cluster")
self.partition = self._settings.get("partition")
self.qos = self._settings.get("qos")
self.custom_dirs = deep_get(site_configs.target_config, "paths", "slurm")

def launcher_command(self, numprocs=None, numnodes=None):
"""Return sbatch launcher command with options used to submit job"""
sbatch_cmd = ["sbatch", "--parsable"]
self.slurm_cmds = check_binaries(
["sbatch", "scontrol", "sacct", "scancel"], custom_dirs=self.custom_dirs
)
sbatch_cmd = [self.slurm_cmds["sbatch"], "--parsable"]

if self.partition:
sbatch_cmd += [f"-p {self.partition}"]
Expand Down Expand Up @@ -103,7 +108,11 @@ def run(self, builder):
else:
builder.metadata["jobid"] = int(parse_jobid)

builder.job = SlurmJob(builder.metadata["jobid"], self.cluster)
builder.job = SlurmJob(
jobID=builder.metadata["jobid"],
cluster=self.cluster,
slurm_cmds=self.slurm_cmds,
)

msg = f"[blue]{builder}[/blue]: JobID {builder.metadata['jobid']} dispatched to scheduler"
console.print(msg)
Expand All @@ -121,7 +130,7 @@ def gather(self, builder):
builder (buildtest.buildsystem.base.BuilderBase): An instance object of BuilderBase type
"""
builder.record_endtime()

builder.job.retrieve_jobdata()
builder.metadata["job"] = builder.job.jobdata()

builder.metadata["result"]["returncode"] = builder.job.exitcode()
Expand Down
9 changes: 5 additions & 4 deletions buildtest/scheduler/cobalt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ class CobaltJob(Job):
is pending, running, complete, suspended.
"""

def __init__(self, jobID):
def __init__(self, jobID, cobalt_cmds):
super().__init__(jobID)
self._outfile = str(jobID) + ".output"
self._errfile = str(jobID) + ".error"
self._cobaltlog = str(jobID) + ".cobaltlog"
self.cobalt_cmds = cobalt_cmds

def is_pending(self):
"""Return ``True`` if job is pending otherwise returns ``False``. When cobalt recieves job it is
Expand Down Expand Up @@ -60,7 +61,7 @@ def poll(self):
"""Poll job by running ``qstat -l --header State <jobid>`` which retrieves job state."""

# get Job State by running 'qstat -l --header <jobid>'
query = f"qstat -l --header State {self.jobid}"
query = f"{self.cobalt_cmds['qstat']} -l --header State {self.jobid}"
logger.debug(f"Getting Job State for '{self.jobid}' by running: '{query}'")
cmd = BuildTestCommand(query)
cmd.execute()
Expand Down Expand Up @@ -98,7 +99,7 @@ def retrieve_jobdata(self):
"""

# 'qstat -lf <jobid>' will get all fields of Job.
qstat_cmd = f"qstat -lf {self.jobid}"
qstat_cmd = f"{self.cobalt_cmds['qstat']} -lf {self.jobid}"
logger.debug(f"Executing command: {qstat_cmd}")
cmd = BuildTestCommand(qstat_cmd)
cmd.execute()
Expand All @@ -119,7 +120,7 @@ def cancel(self):
``maxpendtime`` if job is pending.
"""

query = f"qdel {self.jobid}"
query = f"{self.cobalt_cmds['qdel']} {self.jobid}"
logger.debug(f"Cancelling job {self.jobid} by running: {query}")
cmd = BuildTestCommand(query)
cmd.execute()
Expand Down
Loading

0 comments on commit 50deac0

Please sign in to comment.