Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,47 @@ def get_future_from_cache(

def terminate_tasks_in_cache(
cache_directory: str,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete all jobs stored in the cache directory from the queuing system

Args:
cache_directory (str): The directory to store cache files.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
from executorlib.task_scheduler.file.spawner_pysqa import terminate_tasks_in_cache

return terminate_tasks_in_cache(
cache_directory=cache_directory,
config_directory=config_directory,
pysqa_config_directory=pysqa_config_directory,
backend=backend,
)


def terminate_task_in_cache(
cache_directory: str,
cache_key: str,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete a specific job stored in the cache directory from the queuing system

Args:
cache_directory (str): The directory to store cache files.
cache_key (str): The key of the cache file to be deleted.
pysqa_config_directory (str, optional): path to the pysqa config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
from executorlib.task_scheduler.file.spawner_pysqa import terminate_task_in_cache

return terminate_task_in_cache(
cache_directory=cache_directory,
cache_key=cache_key,
pysqa_config_directory=pysqa_config_directory,
backend=backend,
)

Expand Down
33 changes: 30 additions & 3 deletions src/executorlib/task_scheduler/file/spawner_pysqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ def execute_with_pysqa(

def terminate_tasks_in_cache(
cache_directory: str,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete all jobs stored in the cache directory from the queuing system

Args:
cache_directory (str): The directory to store cache files.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
hdf5_file_lst = []
Expand All @@ -116,6 +116,33 @@ def terminate_tasks_in_cache(
if queue_id is not None:
terminate_with_pysqa(
queue_id=queue_id,
config_directory=config_directory,
config_directory=pysqa_config_directory,
backend=backend,
)
os.remove(f)


def terminate_task_in_cache(
cache_directory: str,
cache_key: str,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete a specific job stored in the cache directory from the queuing system

Args:
cache_directory (str): The directory to store cache files.
cache_key (str): The key of the cache file to be deleted.
pysqa_config_directory (str, optional): path to the pysqa config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
file_name = os.path.join(cache_directory, cache_key + "_i.h5")
queue_id = get_queue_id(file_name=file_name)
if queue_id is not None:
terminate_with_pysqa(
queue_id=queue_id,
config_directory=pysqa_config_directory,
backend=backend,
)
os.remove(file_name)
13 changes: 12 additions & 1 deletion tests/unit/executor/test_flux_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

try:
import flux.job
from executorlib import terminate_tasks_in_cache
from executorlib import terminate_tasks_in_cache, terminate_task_in_cache
from executorlib.standalone.hdf import dump
from executorlib.task_scheduler.file.spawner_pysqa import execute_with_pysqa
from executorlib.standalone.scheduler import terminate_with_pysqa
Expand Down Expand Up @@ -195,6 +195,17 @@ def test_terminate_tasks_in_cache(self):
backend="flux",
))

def test_terminate_task_in_cache(self):
file = os.path.join("executorlib_cache", "test_i.h5")
dump(file_name=file, data_dict={"queue_id": 1})
self.assertTrue(os.path.exists(file))
self.assertIsNone(terminate_task_in_cache(
cache_directory="executorlib_cache",
cache_key="test",
backend="flux"
))
self.assertFalse(os.path.exists(file))

def tearDown(self):
shutil.rmtree("executorlib_cache", ignore_errors=True)

Expand Down