Skip to content

Cache: Add working directory parameter #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from typing import Optional

from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_in_subprocess, execute_tasks_h5
Expand All @@ -11,6 +12,7 @@ def __init__(
cache_directory: str = "cache",
execute_function: callable = execute_in_subprocess,
cores_per_worker: int = 1,
cwd: Optional[str] = None,
):
"""
Initialize the FileExecutor.
Expand All @@ -19,6 +21,7 @@ def __init__(
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
cwd (str/None): current working directory where the parallel python task is executed
"""
super().__init__()
cache_directory_path = os.path.abspath(cache_directory)
Expand All @@ -31,6 +34,7 @@ def __init__(
"execute_function": execute_function,
"cache_directory": cache_directory_path,
"cores_per_worker": cores_per_worker,
"cwd": cwd,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding working directory validation

While the implementation is correct, it might be helpful to validate the working directory exists before passing it to the execution context to fail fast with a clear error message.

Consider adding validation in __init__:

if cwd is not None and not os.path.isdir(cwd):
    raise ValueError(f"Working directory does not exist: {cwd}")

},
)
)
14 changes: 10 additions & 4 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess
import sys
from concurrent.futures import Future
from typing import Tuple
from typing import Optional, Tuple

from executorlib.standalone.command import get_command_path
from executorlib.standalone.hdf import dump, get_output
Expand Down Expand Up @@ -48,14 +48,17 @@ def done(self) -> bool:


def execute_in_subprocess(
command: list, task_dependent_lst: list = []
command: list,
task_dependent_lst: list = [],
cwd: Optional[str] = None,
Comment on lines +51 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mutable default argument

Using mutable default arguments in Python can lead to unexpected behavior as the list is created once when the function is defined, not each time it's called.

Replace with:

def execute_in_subprocess(
    command: list,
-   task_dependent_lst: list = [],
+   task_dependent_lst: list = None,
    cwd: Optional[str] = None,
) -> subprocess.Popen:

And add at the beginning of the function:

    if task_dependent_lst is None:
        task_dependent_lst = []
🧰 Tools
🪛 Ruff

52-52: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

) -> subprocess.Popen:
"""
Execute a command in a subprocess.

Args:
command (list): The command to be executed.
task_dependent_lst (list, optional): A list of subprocesses that the current subprocess depends on. Defaults to [].
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
cwd (str/None): current working directory where the parallel python task is executed

Returns:
subprocess.Popen: The subprocess object.
Expand All @@ -65,14 +68,15 @@ def execute_in_subprocess(
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
]
return subprocess.Popen(command, universal_newlines=True)
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)


def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
cores_per_worker: int,
execute_function: callable,
cwd: Optional[str],
) -> None:
"""
Execute tasks stored in a queue using HDF5 files.
Expand All @@ -82,6 +86,7 @@ def execute_tasks_h5(
cache_directory (str): The directory to store the HDF5 files.
cores_per_worker (int): The number of cores per worker.
execute_function (callable): The function to execute the tasks.
cwd (str/None): current working directory where the parallel python task is executed

Returns:
None
Expand Down Expand Up @@ -123,6 +128,7 @@ def execute_tasks_h5(
task_dependent_lst=[
process_dict[k] for k in future_wait_key_lst
],
cwd=cwd,
)
file_name_dict[task_key] = os.path.join(
cache_directory, task_key + ".h5out"
Expand Down
16 changes: 16 additions & 0 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def my_funct(a, b):
return a + b


def list_files_in_working_directory():
return os.listdir(os.getcwd())


@unittest.skipIf(
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
)
Expand All @@ -38,6 +42,12 @@ def test_executor_dependence_mixed(self):
self.assertEqual(fs2.result(), 4)
self.assertTrue(fs2.done())

def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
with FileExecutor(cwd=cwd) as exe:
fs1 = exe.submit(list_files_in_working_directory)
self.assertEqual(fs1.result(), os.listdir(cwd))

Comment on lines +45 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

🛠️ Refactor suggestion

Add test cases for error scenarios with cwd parameter

The review comment is valid. The implementation accepts a cwd parameter without any validation, and there's only one positive test case. Adding test cases for error scenarios is important since:

  1. The parameter directly affects subprocess execution environment
  2. No validation exists in the implementation
  3. Current test coverage only includes the happy path
  4. Working directory issues can lead to runtime failures

The suggested test cases in the review comment appropriately cover:

  • Non-existent directories
  • Invalid paths
  • Permission issues
🔗 Analysis chain

Add test cases for error scenarios.

The current test only covers the happy path. Consider adding test cases for:

  1. Invalid directory paths
  2. Non-existent directories
  3. Directories without read permissions

Here's a suggested additional test:

def test_executor_working_directory_errors(self):
    # Test non-existent directory
    with self.assertRaises(ValueError):
        with FileExecutor(cwd="/nonexistent/path"):
            pass

    # Test invalid directory path
    with self.assertRaises(ValueError):
        with FileExecutor(cwd=""):
            pass

    # Test directory without permissions (Unix-like systems only)
    if os.name != 'nt':
        no_access_dir = os.path.join(self.test_dir, "no_access")
        os.makedirs(no_access_dir, mode=0o000)
        try:
            with self.assertRaises(PermissionError):
                with FileExecutor(cwd=no_access_dir):
                    pass
        finally:
            os.chmod(no_access_dir, 0o755)  # Restore permissions for cleanup
            os.rmdir(no_access_dir)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there are any existing error handling tests for cwd parameter
rg -l "test.*working.*directory.*error" tests/
rg -l "test.*invalid.*directory" tests/

Length of output: 90


Script:

#!/bin/bash
# Let's check for any existing error handling tests for FileExecutor initialization
rg -A 10 "FileExecutor.*cwd" tests/

Length of output: 891


Script:

#!/bin/bash
# Let's check the FileExecutor implementation to understand error handling
rg -A 10 "class FileExecutor" .

Length of output: 678


Script:

#!/bin/bash
# Let's check the implementation details of FileExecutor to see if there's any validation
rg -B 5 -A 15 "def __init__" ./executorlib/cache/executor.py

Length of output: 929

def test_executor_function(self):
fs1 = Future()
q = Queue()
Expand All @@ -51,13 +61,15 @@ def test_executor_function(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
},
)
process.start()
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), 3)
self.assertTrue(fs1.done())
q.put({"shutdown": True, "wait": True})
process.join()

def test_executor_function_dependence_kwargs(self):
fs1 = Future()
Expand All @@ -74,13 +86,15 @@ def test_executor_function_dependence_kwargs(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
},
)
process.start()
self.assertFalse(fs2.done())
self.assertEqual(fs2.result(), 4)
self.assertTrue(fs2.done())
q.put({"shutdown": True, "wait": True})
process.join()

def test_executor_function_dependence_args(self):
fs1 = Future()
Expand All @@ -97,13 +111,15 @@ def test_executor_function_dependence_args(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
},
)
process.start()
self.assertFalse(fs2.done())
self.assertEqual(fs2.result(), 5)
self.assertTrue(fs2.done())
q.put({"shutdown": True, "wait": True})
process.join()

def tearDown(self):
if os.path.exists("cache"):
Expand Down
Loading