Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def __init__(
init_function=init_function,
disable_dependencies=disable_dependencies,
wait=wait,
refresh_rate=refresh_rate,
)
)
else:
Expand Down
1 change: 1 addition & 0 deletions src/executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ def __init__(
disable_dependencies=disable_dependencies,
execute_function=execute_in_subprocess,
wait=wait,
refresh_rate=refresh_rate,
)
)
else:
Expand Down
1 change: 1 addition & 0 deletions src/executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def __init__(
init_function=init_function,
disable_dependencies=disable_dependencies,
wait=wait,
refresh_rate=refresh_rate,
)
)
else:
Expand Down
14 changes: 13 additions & 1 deletion src/executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import queue
from concurrent.futures import Future
from time import sleep
from typing import Any, Callable, Optional

from executorlib.standalone.command import get_cache_execute_command
Expand All @@ -17,6 +18,7 @@ def __init__(self, file_name: str, selector: Optional[int | str] = None):

Args:
file_name (str): The name of the file.
selector (int | str, optional): The selector to select a specific part of the result. Defaults to None.

"""
self._file_name = file_name
Expand Down Expand Up @@ -62,6 +64,7 @@ def execute_tasks_h5(
disable_dependencies: bool = False,
pmi_mode: Optional[str] = None,
wait: bool = True,
refresh_rate: float = 0.01,
) -> None:
"""
Execute tasks stored in a queue using HDF5 files.
Expand All @@ -78,6 +81,7 @@ def execute_tasks_h5(
disable_dependencies (boolean): Disable resolving future objects during the submission.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01.

Returns:
None
Expand All @@ -101,6 +105,7 @@ def execute_tasks_h5(
terminate_function=terminate_function,
pysqa_config_directory=pysqa_config_directory,
backend=backend,
refresh_rate=refresh_rate,
)
if not task_dict["cancel_futures"] and wait:
_cancel_processes(
Expand All @@ -117,6 +122,7 @@ def execute_tasks_h5(
terminate_function=terminate_function,
pysqa_config_directory=pysqa_config_directory,
backend=backend,
refresh_rate=refresh_rate,
)
for value in memory_dict.values():
if not value.done():
Expand Down Expand Up @@ -198,6 +204,7 @@ def execute_tasks_h5(
terminate_function=terminate_function,
pysqa_config_directory=pysqa_config_directory,
backend=backend,
refresh_rate=refresh_rate,
)


Expand Down Expand Up @@ -297,6 +304,7 @@ def _refresh_memory_dict(
terminate_function: Optional[Callable] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
refresh_rate: float = 0.01,
) -> dict:
"""
Refresh memory dictionary
Expand All @@ -308,6 +316,7 @@ def _refresh_memory_dict(
terminate_function (callable): The function to terminate the tasks.
pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend).
backend (str): name of the backend used to spawn tasks.
refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01.

Returns:
dict: Updated memory dictionary
Expand All @@ -321,7 +330,7 @@ def _refresh_memory_dict(
pysqa_config_directory=pysqa_config_directory,
backend=backend,
)
return {
memory_updated_dict = {
key: _check_task_output(
task_key=key,
future_obj=value,
Expand All @@ -330,6 +339,9 @@ def _refresh_memory_dict(
for key, value in memory_dict.items()
if not value.done()
}
if len(memory_updated_dict) == len(memory_dict):
sleep(refresh_rate)
return memory_updated_dict


def _cancel_processes(
Expand Down
5 changes: 5 additions & 0 deletions src/executorlib/task_scheduler/file/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
disable_dependencies: bool = False,
pmi_mode: Optional[str] = None,
wait: bool = True,
refresh_rate: float = 0.01,
):
"""
Initialize the FileExecutor.
Expand All @@ -52,6 +53,7 @@ def __init__(
disable_dependencies (boolean): Disable resolving future objects during the submission.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
wait (bool): Whether to wait for the completion of all tasks before shutting down the executor.
refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01.
"""
super().__init__(max_cores=None)
default_resource_dict = {
Expand All @@ -75,6 +77,7 @@ def __init__(
"backend": backend,
"disable_dependencies": disable_dependencies,
"pmi_mode": pmi_mode,
"refresh_rate": refresh_rate,
"wait": wait,
}
self._set_process(
Expand Down Expand Up @@ -102,6 +105,7 @@ def create_file_executor(
disable_dependencies: bool = False,
execute_function: Callable = execute_with_pysqa,
wait: bool = True,
refresh_rate: float = 0.01,
):
if block_allocation:
raise ValueError(
Expand Down Expand Up @@ -133,4 +137,5 @@ def create_file_executor(
terminate_function=terminate_function,
pmi_mode=pmi_mode,
wait=wait,
refresh_rate=refresh_rate,
)
Loading