Conversation
# Conflicts: # executorlib/standalone/scheduler.py # executorlib/task_scheduler/file/queue_spawner.py
# Conflicts: # executorlib/standalone/command.py # executorlib/task_scheduler/file/shared.py # executorlib/task_scheduler/interactive/slurmspawner.py # tests/test_standalone_command.py
WalkthroughAdds a PySQA-backed block-allocation pathway. Flux and Slurm executors now conditionally create a block-allocation scheduler via create_pysqa_block_allocation_scheduler when block_allocation=True; otherwise they use the file-based executor. Introduces PysqaSpawner and a factory to build BlockAllocationTaskScheduler. Tests added for Flux block allocation, PysqaSpawner lifecycle/commands, and Slurm init gating. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant U as User code
participant Exec as Flux/Slurm Executor
participant FFactory as create_file_executor
participant BFactory as create_pysqa_block_allocation_scheduler
participant Sched as BlockAllocationTaskScheduler
participant Sp as PysqaSpawner
U->>Exec: __init__(block_allocation, resource_dict, ...)
alt block_allocation == True
Exec->>BFactory: build scheduler (backend="flux" or "slurm", config, resources)
BFactory->>Sp: construct PysqaSpawner(config/resources)
BFactory->>Sched: BlockAllocationTaskScheduler(spawner=Sp, max_workers)
Exec-->>U: executor with block-allocation scheduler
else
Exec->>FFactory: create_file_executor(...)
Exec-->>U: executor with file-based scheduler
end
sequenceDiagram
autonumber
participant Sched as BlockAllocationTaskScheduler
participant Sp as PysqaSpawner
participant QA as QueueAdapter
participant Q as Queue (Slurm/Flux)
participant W as Worker process
Sched->>Sp: bootup(command_lst, stop_function?)
Sp->>QA: submit(cores, cwd, cmd)
QA->>Q: enqueue job
Sp->>Sp: _check_process_helper()
Q-->>QA: status (PENDING/RUNNING/ERROR)
QA-->>Sp: status
alt status == RUNNING
Sp-->>Sched: True (started)
else status == ERROR
Sp->>QA: terminate()
Sp-->>Sched: False
end
loop polling
Sched->>Sp: poll()
Sp->>QA: status()
QA-->>Sp: RUNNING/PENDING/DONE
Sp-->>Sched: bool
end
Sched->>Sp: shutdown(wait=True)
Sp->>QA: terminate()
Sp-->>Sched: complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
for more information, see https://pre-commit.ci
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #763 +/- ##
==========================================
+ Coverage 97.84% 98.12% +0.27%
==========================================
Files 32 33 +1
Lines 1530 1649 +119
==========================================
+ Hits 1497 1618 +121
+ Misses 33 31 -2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
# Conflicts: # executorlib/standalone/interactive/communication.py # executorlib/standalone/interactive/spawner.py # executorlib/task_scheduler/interactive/blockallocation.py # executorlib/task_scheduler/interactive/shared.py # executorlib/task_scheduler/interactive/spawner_flux.py
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (11)
executorlib/executor/slurm.py (1)
189-211: Non-block path: init_function will raise in create_file_executor; consider an early, clearer error.Passing init_function to create_file_executor triggers a ValueError (“not available with the pysqa based backend”). A short pre-check here with a tailored message would be more direct for users.
super().__init__( executor=create_file_executor( max_workers=max_workers, backend="slurm", max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, pmi_mode=pmi_mode, flux_executor=None, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, + block_allocation=block_allocation, + init_function=init_function, # will raise downstream disable_dependencies=disable_dependencies, ) )Alternatively (preferred), add an explicit guard just before the call:
if init_function is not None: raise ValueError("init_function is unsupported with the pysqa file backend; use block_allocation=True.")executorlib/executor/flux.py (1)
381-401: Non-block path mirrors existing behavior; minor polish only.Looks fine. If you adopt the earlier pre-check for init_function in Slurm, consider mirroring it here for consistency.
tests/test_fluxclusterexecutor.py (4)
64-78: Avoid brittle assertions on cache directory size.
len(os.listdir("executorlib_cache"))is environment- and implementation-dependent. Assert minimum expectations or patterns instead.Apply this diff:
- self.assertEqual(len(os.listdir("executorlib_cache")), 2) + # block-allocation creates at least 2 artifacts (task + input) + self.assertGreaterEqual(len(os.listdir("executorlib_cache")), 2)
79-97: Same here: relax directory-size assertion for robustness.Use a lower bound rather than an exact count.
- self.assertEqual(len(os.listdir("executorlib_cache")), 4) + # two tasks -> expect at least four artifacts + self.assertGreaterEqual(len(os.listdir("executorlib_cache")), 4)
28-31: Speed up test without reducing signal.A full second sleep slows CI. 100–200 ms is typically enough to exercise scheduling here.
- sleep(1) + sleep(0.2)
171-190: Prefer public APIs over private members in tests.The test reaches into
_processand_check_process_helper, which is fragile. Assert viapoll()and observable behavior instead.- process_id = interface_flux._process - interface_flux.shutdown(wait=True) - interface_flux._process = process_id - self.assertFalse(interface_flux.poll()) - self.assertFalse(interface_flux._check_process_helper(command_lst=["sleep", "1"])) + interface_flux.shutdown(wait=True) + self.assertFalse(interface_flux.poll())tests/test_standalone_interactive_backend.py (1)
138-140: Make the failure assertion backend-agnostic.On some systems SLURM binaries exist but submission fails, raising
subprocess.CalledProcessErrorinstead ofFileNotFoundError.- with self.assertRaises(FileNotFoundError): + import subprocess + with self.assertRaises((FileNotFoundError, subprocess.CalledProcessError)): interface_slurm.bootup(command_lst=["sleep", "1"])executorlib/task_scheduler/interactive/spawner_pysqa.py (4)
129-138: Add actionable error messages for Flux constraints.Bare
ValueError()hinders debugging.- if self._num_nodes is not None: - raise ValueError() - if self._threads_per_core > 1: - raise ValueError() - if self._gpus_per_core > 0: - raise ValueError() - if self._exclusive: - raise ValueError() - if self._openmpi_oversubscribe: - raise ValueError() + if self._num_nodes is not None: + raise ValueError("Flux backend: num_nodes is not supported when cores>1") + if self._threads_per_core > 1: + raise ValueError("Flux backend: threads_per_core>1 is not supported") + if self._gpus_per_core > 0: + raise ValueError("Flux backend: gpus_per_core>0 is not supported") + if self._exclusive: + raise ValueError("Flux backend: exclusive allocation is not supported") + if self._openmpi_oversubscribe: + raise ValueError("Flux backend: oversubscribe is not supported")
140-142: Fix misleading error message for multi-core without backend.Backend cannot be None when
cores > 1.- raise ValueError( - f"backend should be None, slurm or flux, not {self._backend}" - ) + raise ValueError( + f"When cores>1, backend must be 'slurm' or 'flux'; got {self._backend!r}" + )
147-160: Silence unused-arg warning forwaitor implement waiting.Currently unused; keep interface but declare intent.
def shutdown(self, wait: bool = True): """ Method to shutdown the subprocess interface. @@ - if self._process is not None: + # 'wait' is intentionally unused for queue-backed jobs + if self._process is not None:Alternatively:
- def shutdown(self, wait: bool = True): + def shutdown(self, wait: bool = True): # noqa: ARG002
90-98: Consider a bootup timeout.An endless loop can hang indefinitely without a
stop_function. Expose astartup_timeoutor retry/backoff.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
executorlib/executor/flux.py(1 hunks)executorlib/executor/slurm.py(1 hunks)executorlib/task_scheduler/interactive/spawner_pysqa.py(1 hunks)tests/test_fluxclusterexecutor.py(5 hunks)tests/test_slurmclusterexecutor.py(2 hunks)tests/test_standalone_interactive_backend.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
tests/test_slurmclusterexecutor.py (1)
executorlib/executor/slurm.py (1)
SlurmClusterExecutor(22-229)
executorlib/task_scheduler/interactive/spawner_pysqa.py (4)
executorlib/standalone/inputcheck.py (1)
validate_number_of_cores(176-200)executorlib/standalone/interactive/spawner.py (1)
BaseSpawner(9-64)executorlib/standalone/scheduler.py (2)
pysqa_execute_command(32-65)terminate_with_pysqa(8-29)executorlib/task_scheduler/interactive/blockallocation.py (1)
BlockAllocationTaskScheduler(29-197)
tests/test_fluxclusterexecutor.py (1)
executorlib/task_scheduler/interactive/spawner_pysqa.py (2)
PysqaSpawner(16-208)_check_process_helper(190-205)
executorlib/executor/slurm.py (2)
executorlib/task_scheduler/interactive/spawner_pysqa.py (1)
create_pysqa_block_allocation_scheduler(211-248)executorlib/task_scheduler/file/task_scheduler.py (1)
create_file_executor(85-131)
tests/test_standalone_interactive_backend.py (1)
executorlib/task_scheduler/interactive/spawner_pysqa.py (3)
PysqaSpawner(16-208)create_pysqa_block_allocation_scheduler(211-248)_check_process_helper(190-205)
executorlib/executor/flux.py (6)
executorlib/task_scheduler/interactive/spawner_pysqa.py (1)
create_pysqa_block_allocation_scheduler(211-248)executorlib/task_scheduler/interactive/blockallocation.py (2)
max_workers(95-96)max_workers(99-121)executorlib/task_scheduler/interactive/dependency.py (2)
max_workers(87-95)max_workers(98-111)executorlib/executor/base.py (2)
max_workers(26-27)max_workers(30-31)executorlib/task_scheduler/base.py (2)
max_workers(36-37)max_workers(40-41)executorlib/task_scheduler/file/task_scheduler.py (1)
create_file_executor(85-131)
🪛 Ruff (0.12.2)
executorlib/task_scheduler/interactive/spawner_pysqa.py
140-142: Avoid specifying long messages outside the exception class
(TRY003)
147-147: Unused method argument: wait
(ARG002)
178-178: Probable use of insecure hash functions in hashlib: md5
(S324)
198-200: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: notebooks_integration
🔇 Additional comments (2)
executorlib/executor/slurm.py (1)
169-187: Block-allocation via PySQA path looks correct; verify default cache_directory semantics.create_pysqa_block_allocation_scheduler forces cache_directory to os.path.abspath(".") when None, which may diverge from the documented default "executorlib_cache". Please confirm intended behavior across Slurm/Flux so users don’t see different defaults depending on block_allocation.
executorlib/executor/flux.py (1)
361-379: Flux block-allocation via PySQA path is consistent with Slurm; re-check default cache_directory.Same note as for Slurm: create_pysqa_block_allocation_scheduler defaults to "." when cache_directory is None. Ensure this matches documented defaults and user expectations.
| class PysqaSpawner(BaseSpawner): | ||
| def __init__( | ||
| self, | ||
| cwd: Optional[str] = None, | ||
| cores: int = 1, | ||
| threads_per_core: int = 1, | ||
| gpus_per_core: int = 0, | ||
| num_nodes: Optional[int] = None, | ||
| exclusive: bool = False, | ||
| openmpi_oversubscribe: bool = False, | ||
| slurm_cmd_args: Optional[list[str]] = None, | ||
| pmi_mode: Optional[str] = None, | ||
| config_directory: Optional[str] = None, | ||
| backend: Optional[str] = None, | ||
| **kwargs, | ||
| ): |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Plumb cache_directory and worker_id into the spawner; don’t forward them to PySQA.
Ensures the worker dir logic can use these values and avoids leaking non-PySQA keys to submit_job.
class PysqaSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
+ cache_directory: Optional[str] = None,
**kwargs,
):
@@
- self._pysqa_submission_kwargs = kwargs
+ # Internal/non-PySQA fields
+ self._worker_id = kwargs.pop("worker_id", None)
+ # Accept via explicit arg or kwargs for BC
+ self._cache_directory = cache_directory or kwargs.pop("cache_directory", None)
+ # Remaining kwargs are forwarded to PySQA submit_job
+ self._pysqa_submission_kwargs = kwargs📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class PysqaSpawner(BaseSpawner): | |
| def __init__( | |
| self, | |
| cwd: Optional[str] = None, | |
| cores: int = 1, | |
| threads_per_core: int = 1, | |
| gpus_per_core: int = 0, | |
| num_nodes: Optional[int] = None, | |
| exclusive: bool = False, | |
| openmpi_oversubscribe: bool = False, | |
| slurm_cmd_args: Optional[list[str]] = None, | |
| pmi_mode: Optional[str] = None, | |
| config_directory: Optional[str] = None, | |
| backend: Optional[str] = None, | |
| **kwargs, | |
| ): | |
| class PysqaSpawner(BaseSpawner): | |
| def __init__( | |
| self, | |
| cwd: Optional[str] = None, | |
| cores: int = 1, | |
| threads_per_core: int = 1, | |
| gpus_per_core: int = 0, | |
| num_nodes: Optional[int] = None, | |
| exclusive: bool = False, | |
| openmpi_oversubscribe: bool = False, | |
| slurm_cmd_args: Optional[list[str]] = None, | |
| pmi_mode: Optional[str] = None, | |
| config_directory: Optional[str] = None, | |
| backend: Optional[str] = None, | |
| cache_directory: Optional[str] = None, | |
| **kwargs, | |
| ): | |
| # … (other initialization, e.g. self.cwd = cwd, etc.) | |
| # Internal/non-PySQA fields | |
| self._worker_id = kwargs.pop("worker_id", None) | |
| # Accept via explicit arg or kwargs for BC | |
| self._cache_directory = cache_directory or kwargs.pop("cache_directory", None) | |
| # Remaining kwargs are forwarded to PySQA submit_job | |
| self._pysqa_submission_kwargs = kwargs |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/spawner_pysqa.py around lines 16 to
31, the PysqaSpawner __init__ must accept and store cache_directory and
worker_id for use by the worker directory logic and must not forward these
non-PySQA keys to PySQA submit_job; add optional parameters cache_directory:
Optional[str] = None and worker_id: Optional[str] = None to the constructor
signature, assign them to self.cache_directory and self.worker_id, and ensure
any call site that forwards kwargs to PySQA strips these keys from the kwargs
(or builds a new dict without them) before calling submit_job so only valid
PySQA arguments are passed.
| def _start_process_helper( | ||
| self, command_lst: list[str], queue_adapter: QueueAdapter | ||
| ) -> int: | ||
| hash = hashlib.md5(str(self).encode()).hexdigest() | ||
| if self._cwd is not None: | ||
| working_directory = os.path.join(self._cwd, hash) | ||
| else: | ||
| working_directory = os.path.abspath(hash) | ||
| return queue_adapter.submit_job( | ||
| command=" ".join(self.generate_command(command_lst=command_lst)), | ||
| working_directory=working_directory, | ||
| cores=int(self._cores * self._threads_per_core), | ||
| **self._pysqa_submission_kwargs, | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Stop hashing into cwd; derive working_directory from cache_directory + worker_id (addresses open TODO).
- Avoids insecure md5 and shadowing built-in
hash. - Produces stable per-worker directories under the cache dir as requested in the PR checklist.
- def _start_process_helper(
- self, command_lst: list[str], queue_adapter: QueueAdapter
- ) -> int:
- hash = hashlib.md5(str(self).encode()).hexdigest()
- if self._cwd is not None:
- working_directory = os.path.join(self._cwd, hash)
- else:
- working_directory = os.path.abspath(hash)
+ def _start_process_helper(
+ self, command_lst: list[str], queue_adapter: QueueAdapter
+ ) -> int:
+ # Prefer cache_directory + worker_id; fall back to a short blake2 hash
+ base_dir = (
+ self._cache_directory
+ or (self._cwd if self._cwd is not None else os.path.abspath("."))
+ )
+ if getattr(self, "_worker_id", None) is not None:
+ worker_dir = f"worker_{self._worker_id}"
+ else:
+ # stable-ish fallback without using insecure md5
+ worker_dir = "worker_" + hashlib.blake2s(
+ repr(self).encode(), digest_size=8
+ ).hexdigest()
+ working_directory = os.path.join(base_dir, worker_dir)
+ os.makedirs(working_directory, exist_ok=True)
return queue_adapter.submit_job(
command=" ".join(self.generate_command(command_lst=command_lst)),
working_directory=working_directory,
cores=int(self._cores * self._threads_per_core),
**self._pysqa_submission_kwargs,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _start_process_helper( | |
| self, command_lst: list[str], queue_adapter: QueueAdapter | |
| ) -> int: | |
| hash = hashlib.md5(str(self).encode()).hexdigest() | |
| if self._cwd is not None: | |
| working_directory = os.path.join(self._cwd, hash) | |
| else: | |
| working_directory = os.path.abspath(hash) | |
| return queue_adapter.submit_job( | |
| command=" ".join(self.generate_command(command_lst=command_lst)), | |
| working_directory=working_directory, | |
| cores=int(self._cores * self._threads_per_core), | |
| **self._pysqa_submission_kwargs, | |
| ) | |
| def _start_process_helper( | |
| self, command_lst: list[str], queue_adapter: QueueAdapter | |
| ) -> int: | |
| # Prefer cache_directory + worker_id; fall back to a short blake2 hash | |
| base_dir = ( | |
| self._cache_directory | |
| or (self._cwd if self._cwd is not None else os.path.abspath(".")) | |
| ) | |
| if getattr(self, "_worker_id", None) is not None: | |
| worker_dir = f"worker_{self._worker_id}" | |
| else: | |
| # stable-ish fallback without using insecure md5 | |
| worker_dir = "worker_" + hashlib.blake2s( | |
| repr(self).encode(), digest_size=8 | |
| ).hexdigest() | |
| working_directory = os.path.join(base_dir, worker_dir) | |
| os.makedirs(working_directory, exist_ok=True) | |
| return queue_adapter.submit_job( | |
| command=" ".join(self.generate_command(command_lst=command_lst)), | |
| working_directory=working_directory, | |
| cores=int(self._cores * self._threads_per_core), | |
| **self._pysqa_submission_kwargs, | |
| ) |
🧰 Tools
🪛 Ruff (0.12.2)
178-178: Probable use of insecure hash functions in hashlib: md5
(S324)
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/spawner_pysqa.py around lines 175-188,
replace the current MD5-based working_directory logic (which shadows the
built-in name `hash` and hashes self) with a stable per-worker path derived from
the configured cache directory and the worker id: build working_directory =
os.path.join(self._cache_directory, str(self._worker_id)) (or use the existing
cache dir attribute name if different), ensure the cache directory exists
(os.makedirs(..., exist_ok=True)), avoid using hashlib/md5 and do not shadow
built-ins, and keep the rest of the queue_adapter.submit_job call unchanged so
each worker uses a persistent, predictable subdirectory under the cache dir.
| try: | ||
| import pysqa | ||
|
|
||
| skip_pysqa_test = False | ||
| except ImportError: | ||
| skip_pysqa_test = True | ||
|
|
There was a problem hiding this comment.
Good: explicit PySQA gating flag. Also gate earlier tests that rely on PySQA.
The TestCacheExecutorPysqa class (Lines 50-54) relies on the file backend, which needs PySQA. Add skip_pysqa_test to that skipIf to avoid ImportError when PySQA isn’t installed.
Proposed change outside this hunk (around Line 50):
@unittest.skipIf(
skip_slurm_test or skip_mpi4py_test or skip_h5py_test or skip_pysqa_test,
"h5py or mpi4py or SLURM or pysqa are not installed; tests skipped.",
)🤖 Prompt for AI Agents
In tests/test_slurmclusterexecutor.py around lines 50 to 54, the
TestCacheExecutorPysqa class skip decorator needs to include the earlier defined
skip_pysqa_test flag so tests that depend on PySQA are skipped when it’s not
installed; update the @unittest.skipIf condition to add skip_pysqa_test to the
OR list and adjust the skip message to mention pysqa (e.g., "h5py or mpi4py or
SLURM or pysqa are not installed; tests skipped.").
| @unittest.skipIf(skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped.") | ||
| class TestSlurmClusterInit(unittest.TestCase): | ||
| def test_slurm_cluster_block_allocation(self): | ||
| with self.assertRaises(ValueError): | ||
| SlurmClusterExecutor(block_allocation=True) | ||
|
|
||
| def test_slurm_cluster_file(self): | ||
| self.assertTrue(SlurmClusterExecutor(block_allocation=False)) No newline at end of file |
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Fix test: block_allocation no longer raises ValueError when PySQA is available.
The code now instantiates a PySQA-backed BlockAllocationTaskScheduler; expecting ValueError is incorrect and will fail when PySQA is installed. Assert successful construction instead. Also, make the “file” test assert type rather than truthiness.
@unittest.skipIf(skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped.")
class TestSlurmClusterInit(unittest.TestCase):
def test_slurm_cluster_block_allocation(self):
- with self.assertRaises(ValueError):
- SlurmClusterExecutor(block_allocation=True)
+ exe = SlurmClusterExecutor(block_allocation=True)
+ self.assertIsInstance(exe, SlurmClusterExecutor)
def test_slurm_cluster_file(self):
- self.assertTrue(SlurmClusterExecutor(block_allocation=False))
+ exe = SlurmClusterExecutor(block_allocation=False)
+ self.assertIsInstance(exe, SlurmClusterExecutor)If you specifically want to test behavior when PySQA is missing, add a separate test guarded by skipUnless(skip_pysqa_test) and assert ImportError on construction (since the import happens before any ValueError).
🏁 Script executed:
#!/bin/bash
set -e
# Locate the SlurmClusterExecutor class definition
rg -nP --type=py 'class\s+SlurmClusterExecutor' -C5
# Inspect its __init__ method for block_allocation handling
rg -nP --type=py 'def\s+__init__\s*\(.*block_allocation' -C5
# Search for all occurrences of block_allocation within this class
rg -nP --type=py --context 5 'block_allocation' --no-headingLength of output: 925
🏁 Script executed:
#!/bin/bash
set -e
# Show the beginning of the SlurmClusterExecutor implementation
sed -n '1,200p' executorlib/executor/slurm.py
# Find any __init__ definitions in slurm.py
rg -nP --type=py 'def\s+__init__' -n executorlib/executor/slurm.py -C3
# Search for block_allocation occurrences across the repository
rg -nP --type=py 'block_allocation' -n -C3Length of output: 93539
Update tests in tests/test_slurmclusterexecutor.py to reflect PySQA-backed behavior.
- In test_slurm_cluster_block_allocation, replace the ValueError assertion with:
exe = SlurmClusterExecutor(block_allocation=True) self.assertIsInstance(exe, SlurmClusterExecutor)
- In test_slurm_cluster_file, replace
assertTrue(...)with:exe = SlurmClusterExecutor(block_allocation=False) self.assertIsInstance(exe, SlurmClusterExecutor)
- Add a new test guarded by
@unittest.skipUnless(skip_pysqa_test, ...)that verifiesSlurmClusterExecutor(block_allocation=True)raisesImportErrorwhen PySQA is unavailable.
🤖 Prompt for AI Agents
tests/test_slurmclusterexecutor.py lines 120-127: update the tests to reflect
PySQA-backed behavior by replacing the ValueError expectation with creating the
executor and asserting its type (exe =
SlurmClusterExecutor(block_allocation=True); self.assertIsInstance(exe,
SlurmClusterExecutor)), replace the assertTrue call with the same pattern for
block_allocation=False, and add a new test function decorated with
@unittest.skipUnless(skip_pysqa_test, "pysqa is not installed, so the pysqa
tests are skipped.") that asserts SlurmClusterExecutor(block_allocation=True)
raises ImportError when PySQA is unavailable.
A different formulation would be
block_allocationfor theSlurmClusterExecutorand theFluxClusterExecutor.Open Tasks:
Summary by CodeRabbit