Skip to content

Add option to write flux log files #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Executor:
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -95,6 +96,7 @@ def __init__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand All @@ -117,6 +119,7 @@ def __new__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand Down Expand Up @@ -153,6 +156,7 @@ def __new__(
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -198,6 +202,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
Expand All @@ -215,6 +220,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -235,6 +241,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand Down
3 changes: 3 additions & 0 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from executorlib.standalone.inputcheck import (
check_executor,
check_flux_executor_pmi_mode,
check_flux_log_files,
check_hostname_localhost,
check_max_workers_and_cores,
check_nested_flux_executor,
Expand Down Expand Up @@ -88,6 +89,7 @@ def create_file_executor(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand All @@ -109,6 +111,7 @@ def create_file_executor(
check_hostname_localhost(hostname_localhost=hostname_localhost)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
Expand Down
6 changes: 6 additions & 0 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_executor,
check_flux_log_files,
check_gpus_per_worker,
check_init_function,
check_nested_flux_executor,
Expand Down Expand Up @@ -163,6 +164,7 @@ def create_executor(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
Expand Down Expand Up @@ -193,6 +195,7 @@ def create_executor(
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an Executor
running on a different compute node within the same allocation. And in principle
Expand Down Expand Up @@ -222,6 +225,7 @@ def create_executor(
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
resource_dict["flux_executor_nesting"] = flux_executor_nesting
resource_dict["flux_log_files"] = flux_log_files
if block_allocation:
resource_dict["init_function"] = init_function
max_workers = validate_number_of_cores(
Expand Down Expand Up @@ -250,6 +254,7 @@ def create_executor(
elif backend == "slurm_allocation":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
Expand All @@ -272,6 +277,7 @@ def create_executor(
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down
9 changes: 9 additions & 0 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class FluxPythonSpawner(BaseSpawner):
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
"""

def __init__(
Expand All @@ -45,6 +46,7 @@ def __init__(
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
):
super().__init__(
cwd=cwd,
Expand All @@ -56,6 +58,7 @@ def __init__(
self._flux_executor = flux_executor
self._flux_executor_pmi_mode = flux_executor_pmi_mode
self._flux_executor_nesting = flux_executor_nesting
self._flux_log_files = flux_log_files
self._future = None

def bootup(
Expand Down Expand Up @@ -99,6 +102,12 @@ def bootup(
jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
if self._cwd is not None:
jobspec.cwd = self._cwd
if self._flux_log_files and self._cwd is not None:
jobspec.stderr = os.path.join(self._cwd, "flux.err")
jobspec.stdout = os.path.join(self._cwd, "flux.out")
elif self._flux_log_files:
jobspec.stderr = os.path.abspath("flux.err")
jobspec.stdout = os.path.abspath("flux.out")
self._future = self._flux_executor.submit(jobspec)

def shutdown(self, wait: bool = True):
Expand Down
10 changes: 10 additions & 0 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ def check_flux_executor_pmi_mode(flux_executor_pmi_mode: Optional[str]) -> None:
)


def check_flux_log_files(flux_log_files: Optional[bool]) -> None:
"""
Check if flux_log_files is True and raise a ValueError if it is.
"""
if flux_log_files:
raise ValueError(
"The flux_log_files parameter is only supported for the flux framework backend."
)


def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None:
"""
Check if pysqa_config_directory is None and raise a ValueError if it is not.
Expand Down
44 changes: 44 additions & 0 deletions tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,50 @@ def test_single_task(self):
[[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]],
)

def test_output_files_cwd(self):
dirname = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.makedirs(dirname, exist_ok=True)
file_stdout = os.path.join(dirname, "flux.out")
file_stderr = os.path.join(dirname, "flux.err")
with Executor(
max_cores=1,
resource_dict={"cores": 1, "cwd": dirname},
flux_executor=self.executor,
backend="flux_allocation",
block_allocation=True,
flux_log_files=True,
) as p:
output = p.map(calc, [1, 2, 3])
self.assertEqual(
list(output),
[1, 2, 3],
)
self.assertTrue(os.path.exists(file_stdout))
self.assertTrue(os.path.exists(file_stderr))
os.remove(file_stdout)
os.remove(file_stderr)

def test_output_files_abs(self):
file_stdout = os.path.abspath("flux.out")
file_stderr = os.path.abspath("flux.err")
with Executor(
max_cores=1,
resource_dict={"cores": 1},
flux_executor=self.executor,
backend="flux_allocation",
block_allocation=True,
flux_log_files=True,
) as p:
output = p.map(calc, [1, 2, 3])
self.assertEqual(
list(output),
[1, 2, 3],
)
self.assertTrue(os.path.exists(file_stdout))
self.assertTrue(os.path.exists(file_stderr))
os.remove(file_stdout)
os.remove(file_stderr)

def test_internal_memory(self):
with Executor(
max_cores=1,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
check_executor,
check_init_function,
check_nested_flux_executor,
check_flux_log_files,
check_pmi,
check_plot_dependency_graph,
check_refresh_rate,
Expand Down Expand Up @@ -67,6 +68,10 @@ def test_check_nested_flux_executor(self):
with self.assertRaises(ValueError):
check_nested_flux_executor(nested_flux_executor=True)

def test_check_flux_log_files(self):
with self.assertRaises(ValueError):
check_flux_log_files(flux_log_files=True)
Comment on lines +71 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage with additional test cases.

The current test only verifies that flux_log_files=True raises a ValueError. Consider adding test cases for:

  • flux_log_files=False (valid case)
  • Non-boolean values (invalid case)
 def test_check_flux_log_files(self):
     with self.assertRaises(ValueError):
         check_flux_log_files(flux_log_files=True)
+    # Valid case should not raise
+    check_flux_log_files(flux_log_files=False)
+    # Non-boolean values should raise
+    with self.assertRaises(TypeError):
+        check_flux_log_files(flux_log_files="True")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_check_flux_log_files(self):
with self.assertRaises(ValueError):
check_flux_log_files(flux_log_files=True)
def test_check_flux_log_files(self):
with self.assertRaises(ValueError):
check_flux_log_files(flux_log_files=True)
# Valid case should not raise
check_flux_log_files(flux_log_files=False)
# Non-boolean values should raise
with self.assertRaises(TypeError):
check_flux_log_files(flux_log_files="True")


def test_check_plot_dependency_graph(self):
with self.assertRaises(ValueError):
check_plot_dependency_graph(plot_dependency_graph=True)
Expand Down
Loading