Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,39 @@ jobs:
with:
token: ${{ secrets.CODECOV_TOKEN }}

unittest_slurm_mpich:
needs: [black]
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
ports:
- "8888:3306"
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- uses: actions/checkout@v4
- uses: koesterlab/setup-slurm-action@v1
timeout-minutes: 5
- name: Conda config
shell: bash -l {0}
run: echo -e "channels:\n - conda-forge\n" > .condarc
- uses: conda-incubator/setup-miniconda@v3
with:
python-version: '3.13'
miniforge-version: latest
condarc-file: .condarc
environment-file: .ci_support/environment-mpich.yml
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: |
pip install . --no-deps --no-build-isolation
cd tests
python -m unittest test_slurmclusterexecutor.py
python -m unittest test_slurmjobexecutor.py

unittest_mpich:
needs: [black]
runs-on: ${{ matrix.operating-system }}
Expand Down Expand Up @@ -400,7 +433,7 @@ jobs:
GH_TOKEN: ${{secrets.GITHUB_TOKEN}}

uml:
needs: [unittest_old, unittest_win, unittest_openmpi, unittest_mpich, unittest_flux_openmpi, unittest_flux_mpich, notebooks, benchmark, minimal, pip_check, mypy]
needs: [unittest_slurm_mpich, unittest_old, unittest_win, unittest_openmpi, unittest_mpich, unittest_flux_openmpi, unittest_flux_mpich, notebooks, benchmark, minimal, pip_check, mypy]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
108 changes: 108 additions & 0 deletions tests/test_slurmclusterexecutor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import importlib
import unittest
import shutil

from executorlib import SlurmClusterExecutor
from executorlib.standalone.serialize import cloudpickle_register

if shutil.which("srun") is not None:
skip_slurm_test = False
else:
skip_slurm_test = True

skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None

try:
from executorlib.task_scheduler.file.hdf import dump

skip_h5py_test = False
except ImportError:
skip_h5py_test = True

submission_template = """\
#!/bin/bash
#SBATCH --output=time.out
#SBATCH --job-name={{job_name}}
#SBATCH --chdir={{working_directory}}
#SBATCH --get-user-env=L
#SBATCH --cpus-per-task={{cores}}

{{command}}
"""


def mpi_funct(i):
from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
return i, size, rank


@unittest.skipIf(
skip_slurm_test or skip_mpi4py_test or skip_h5py_test,
"h5py or mpi4py or SLRUM are not installed, so the h5py, slurm and mpi4py tests are skipped.",
)
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with SlurmClusterExecutor(
resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template},
block_allocation=False,
cache_directory="executorlib_cache",
terminate_tasks_on_shutdown=False,
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
self.assertTrue(fs1.done())

def test_executor_no_cwd(self):
with SlurmClusterExecutor(
resource_dict={"cores": 2, "submission_template": submission_template},
block_allocation=False,
cache_directory="executorlib_cache",
terminate_tasks_on_shutdown=True,
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
self.assertTrue(fs1.done())

def test_executor_existing_files(self):
with SlurmClusterExecutor(
resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template},
block_allocation=False,
cache_directory="executorlib_cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs1.done())
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
for file_name in os.listdir("executorlib_cache"):
file_path = os.path.join("executorlib_cache", file_name )
os.remove(file_path)
if ".h5" in file_path:
task_key = file_path[:-5] + "_i.h5"
dump(file_name=task_key, data_dict={"a": 1})

with SlurmClusterExecutor(
resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template},
block_allocation=False,
cache_directory="executorlib_cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs1.done())
self.assertEqual(len(os.listdir("executorlib_cache")), 3)

def tearDown(self):
shutil.rmtree("executorlib_cache", ignore_errors=True)
28 changes: 28 additions & 0 deletions tests/test_slurmjobexecutor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import shutil
import unittest

from executorlib import SlurmJobExecutor


if shutil.which("srun") is not None:
skip_slurm_test = False
else:
skip_slurm_test = True


def calc(i):
return i


@unittest.skipIf(
skip_slurm_test, "Slurm is not installed, so the Slurm tests are skipped."
)
class TestSlurmBackend(unittest.TestCase):
def test_slurm_executor_serial(self):
with SlurmJobExecutor() as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
self.assertEqual(fs_1.result(), 1)
self.assertEqual(fs_2.result(), 2)
self.assertTrue(fs_1.done())
self.assertTrue(fs_2.done())
Loading